# emacs: -*- mode: python; py-indent-offset: 4; tab-width: 4; indent-tabs-mode: nil -*-
# ex: set sts=4 ts=4 sw=4 noet:
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
#
# See COPYING file distributed along with the dandi package for the
# copyright and license terms.
#
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
"""Provides helper to compute digests (md5 etc) on files
"""
# Importing this module imports fscacher, which imports joblib, which imports
# numpy, which is a "heavy" import, so avoid importing this module at the top
# level of a module.
from __future__ import annotations
from collections.abc import Callable
from dataclasses import dataclass, field
import hashlib
import logging
import os.path
from pathlib import Path
from dandischema.digests.dandietag import DandiETag
from fscacher import PersistentCache
from zarr_checksum.checksum import ZarrChecksum, ZarrChecksumManifest
from zarr_checksum.tree import ZarrChecksumTree
from .threaded_walk import threaded_walk
from ..utils import Hasher, exclude_from_zarr
lgr = logging.getLogger("dandi.support.digests")
[docs]
@dataclass
class Digester:
"""Helper to compute multiple digests in one pass for a file"""
# Loosely based on snippet by PM 2Ring 2014.10.23
# http://unix.stackexchange.com/a/163769/55543
# Ideally we should find an efficient way to parallelize this but
# atm this one is sufficiently speedy
#: List of any supported algorithm labels, such as md5, sha1, etc.
digests: list[str] = field(
default_factory=lambda: ["md5", "sha1", "sha256", "sha512"]
)
#: Chunk size (in bytes) by which to consume a file.
blocksize: int = 1 << 16
digest_funcs: list[Callable[[], Hasher]] = field(init=False, repr=False)
def __post_init__(self) -> None:
self.digest_funcs = [getattr(hashlib, digest) for digest in self.digests]
def __call__(self, fpath: str | Path) -> dict[str, str]:
"""
fpath : str
File path for which a checksum shall be computed.
Return
------
dict
Keys are algorithm labels, and values are checksum strings
"""
lgr.debug("Estimating digests for %s" % fpath)
digests = [x() for x in self.digest_funcs]
with open(fpath, "rb") as f:
while True:
block = f.read(self.blocksize)
if not block:
break
for d in digests:
d.update(block)
return {n: d.hexdigest() for n, d in zip(self.digests, digests)}
checksums = PersistentCache(name="dandi-checksums", envvar="DANDI_CACHE")
[docs]
@checksums.memoize_path
def get_digest(filepath: str | Path, digest: str = "sha256") -> str:
if digest == "dandi-etag":
s = get_dandietag(filepath).as_str()
assert isinstance(s, str)
return s
elif digest == "zarr-checksum":
return get_zarr_checksum(Path(filepath))
else:
return Digester([digest])(filepath)[digest]
[docs]
@checksums.memoize_path
def get_dandietag(filepath: str | Path) -> DandiETag:
return DandiETag.from_file(filepath)
[docs]
def get_zarr_checksum(path: Path, known: dict[str, str] | None = None) -> str:
"""
Compute the Zarr checksum for a file or directory tree.
If the digests for any files in the Zarr are already known, they can be
passed in the ``known`` argument, which must be a `dict` mapping
slash-separated paths relative to the root of the Zarr to hex digests.
"""
if path.is_file():
s = get_digest(path, "md5")
assert isinstance(s, str)
return s
if known is None:
known = {}
def digest_file(f: Path) -> tuple[Path, str, int]:
assert known is not None
relpath = f.relative_to(path).as_posix()
try:
dgst = known[relpath]
except KeyError:
dgst = md5file_nocache(f)
return (f, dgst, os.path.getsize(f))
zcc = ZarrChecksumTree()
for p, digest, size in threaded_walk(path, digest_file, exclude=exclude_from_zarr):
zcc.add_leaf(p.relative_to(path), size, digest)
return str(zcc.process())
[docs]
def md5file_nocache(filepath: str | Path) -> str:
"""
Compute the MD5 digest of a file without caching with fscacher, which has
been shown to slow things down for the large numbers of files typically
present in Zarrs
"""
return Digester(["md5"])(filepath)["md5"]
[docs]
def checksum_zarr_dir(
files: dict[str, tuple[str, int]], directories: dict[str, tuple[str, int]]
) -> str:
"""
Calculate the Zarr checksum of a directory only from information about the
files and subdirectories immediately within it.
:param files:
A mapping from names of files in the directory to pairs of their MD5
digests and sizes
:param directories:
A mapping from names of subdirectories in the directory to pairs of
their Zarr checksums and the sum of the sizes of all files recursively
within them
"""
manifest = ZarrChecksumManifest(
files=[
ZarrChecksum(digest=digest, name=name, size=size)
for name, (digest, size) in files.items()
],
directories=[
ZarrChecksum(digest=digest, name=name, size=size)
for name, (digest, size) in directories.items()
],
)
return manifest.generate_digest().digest