from __future__ import annotations
from collections import Counter, deque
from collections.abc import Callable, Iterable, Iterator, Sequence
from dataclasses import InitVar, dataclass, field
from datetime import datetime
from enum import Enum
from functools import partial
import hashlib
import json
import os
import os.path as op
from pathlib import Path
import random
from shutil import rmtree
from threading import Lock
import time
from types import TracebackType
from typing import IO, Any, Literal
from dandischema.digests.dandietag import ETagHashlike
from dandischema.models import DigestType
from fasteners import InterProcessLock
import humanize
from interleave import FINISH_CURRENT, interleave
import requests
from . import get_logger
from .consts import RETRY_STATUSES, dandiset_metadata_file
from .dandiapi import AssetType, BaseRemoteZarrAsset, RemoteDandiset
from .dandiarchive import (
AssetItemURL,
DandisetURL,
ParsedDandiURL,
SingleAssetURL,
parse_dandi_url,
)
from .dandiset import Dandiset
from .exceptions import NotFoundError
from .files import LocalAsset, find_dandi_files
from .support import pyout as pyouts
from .support.iterators import IteratorWithAggregation
from .support.pyout import naturalsize
from .utils import (
Hasher,
abbrev_prompt,
ensure_datetime,
exclude_from_zarr,
flattened,
is_same_time,
path_is_subpath,
pluralize,
yaml_load,
)
lgr = get_logger()
[docs]class DownloadExisting(str, Enum):
ERROR = "error"
SKIP = "skip"
OVERWRITE = "overwrite"
OVERWRITE_DIFFERENT = "overwrite-different"
REFRESH = "refresh"
def __str__(self) -> str:
return self.value
[docs]class PathType(str, Enum):
EXACT = "exact"
GLOB = "glob"
def __str__(self) -> str:
return self.value
[docs]def download(
urls: str | Sequence[str],
output_dir: str | Path,
*,
format: DownloadFormat = DownloadFormat.PYOUT,
existing: DownloadExisting = DownloadExisting.ERROR,
jobs: int = 1,
jobs_per_zarr: int | None = None,
get_metadata: bool = True,
get_assets: bool = True,
sync: bool = False,
path_type: PathType = PathType.EXACT,
) -> None:
# TODO: unduplicate with upload. For now stole from that one
# We will again use pyout to provide a neat table summarizing our progress
# with upload etc
urls = flattened([urls])
if not urls:
# if no paths provided etc, we will download dandiset path
# we are at, BUT since we are not git -- we do not even know
# on which instance it exists! Thus ATM we would do nothing but crash
raise NotImplementedError("No URLs were provided. Cannot download anything")
parsed_urls = [parse_dandi_url(u, glob=path_type is PathType.GLOB) for u in urls]
# dandi.cli.formatters are used in cmd_ls to provide switchable
pyout_style = pyouts.get_style(hide_if_missing=False)
rec_fields = ("path", "size", "done", "done%", "checksum", "status", "message")
out = pyouts.LogSafeTabular(style=pyout_style, columns=rec_fields, max_workers=jobs)
out_helper = PYOUTHelper()
pyout_style["done"] = pyout_style["size"].copy()
pyout_style["size"]["aggregate"] = out_helper.agg_size
pyout_style["done"]["aggregate"] = out_helper.agg_done
# I thought I was making a beautiful flower but ended up with cacti
# which never blooms... All because assets are looped through inside download_generator
# TODO: redo
kw = dict(assets_it=out_helper.it)
if jobs > 1:
if format is DownloadFormat.PYOUT:
# It could handle delegated to generator downloads
kw["yield_generator_for_fields"] = rec_fields[1:] # all but path
else:
lgr.warning(
"Parallel downloads are not yet implemented for non-pyout format=%r. "
"Download will proceed serially.",
str(format),
)
downloaders = [
Downloader(
url=purl,
output_dir=output_dir,
existing=existing,
get_metadata=get_metadata,
get_assets=get_assets,
jobs_per_zarr=jobs_per_zarr,
on_error="yield" if format is DownloadFormat.PYOUT else "raise",
**kw,
)
for purl in parsed_urls
]
gen_ = (r for dl in downloaders for r in dl.download_generator())
# TODOs:
# - redo frontends similarly to how command_ls did it
# - have a single loop with analysis of `rec` to either any file
# has failed to download. If any was: exception should probably be
# raised. API discussion for Python side of API:
#
if format is DownloadFormat.DEBUG:
for rec in gen_:
print(rec, flush=True)
elif format is DownloadFormat.PYOUT:
with out:
for rec in gen_:
out(rec)
else:
raise AssertionError(f"Unhandled DownloadFormat member: {format!r}")
if sync:
to_delete = [p for dl in downloaders for p in dl.delete_for_sync()]
if to_delete:
while True:
opt = abbrev_prompt(
f"Delete {pluralize(len(to_delete), 'local asset')}?",
"yes",
"no",
"list",
)
if opt == "list":
for p in to_delete:
print(p)
elif opt == "yes":
for p in to_delete:
if p.is_dir():
rmtree(p)
else:
p.unlink()
break
else:
break
@dataclass
class Downloader:
""":meta private:"""
url: ParsedDandiURL
output_dir: InitVar[str | Path]
output_prefix: Path = field(init=False)
output_path: Path = field(init=False)
existing: DownloadExisting
get_metadata: bool
get_assets: bool
jobs_per_zarr: int | None
on_error: Literal["raise", "yield"]
#: which will be set .gen to assets. Purpose is to make it possible to get
#: summary statistics while already downloading. TODO: reimplement
#: properly!
assets_it: IteratorWithAggregation | None = None
yield_generator_for_fields: tuple[str, ...] | None = None
asset_download_paths: set[str] = field(init=False, default_factory=set)
def __post_init__(self, output_dir: str | Path) -> None:
# TODO: if we are ALREADY in a dandiset - we can validate that it is
# the same dandiset and use that dandiset path as the one to download
# under
if isinstance(self.url, DandisetURL):
assert self.url.dandiset_id is not None
self.output_prefix = Path(self.url.dandiset_id)
else:
self.output_prefix = Path()
self.output_path = Path(output_dir, self.output_prefix)
def download_generator(self) -> Iterator[dict]:
"""
A generator for downloads of files, folders, or entire dandiset from
DANDI (as identified by URL)
This function is a generator which would yield records on ongoing
activities. Activities include traversal of the remote resource (DANDI
archive), download of individual assets while yielding records (TODO:
schema) while validating their checksums "on the fly", etc.
"""
with self.url.navigate(strict=True) as (client, dandiset, assets):
if (
isinstance(self.url, DandisetURL)
or (
isinstance(self.url, AssetItemURL)
and self.url.path == "dandiset.yaml"
)
) and self.get_metadata:
assert dandiset is not None
for resp in _populate_dandiset_yaml(
self.output_path, dandiset, self.existing
):
yield {
"path": str(self.output_prefix / dandiset_metadata_file),
**resp,
}
if isinstance(self.url, AssetItemURL):
return
# TODO: do analysis of assets for early detection of needed renames
# etc to avoid any need for late treatment of existing and also for
# more efficient download if files are just renamed etc
if not self.get_assets:
return
if self.assets_it:
assets = self.assets_it.feed(assets)
lock = Lock()
for asset in assets:
path = self.url.get_asset_download_path(asset)
self.asset_download_paths.add(path)
download_path = Path(self.output_path, path)
path = str(self.output_prefix / path)
try:
metadata = asset.get_raw_metadata()
except NotFoundError as e:
yield {"path": path, "status": "error", "message": str(e)}
continue
d = metadata.get("digest", {})
if asset.asset_type is AssetType.BLOB:
if "dandi:dandi-etag" in d:
digests = {"dandi-etag": d["dandi:dandi-etag"]}
else:
raise RuntimeError(
f"dandi-etag not available for asset. Known digests: {d}"
)
try:
digests["sha256"] = d["dandi:sha2-256"]
except KeyError:
pass
try:
mtime = ensure_datetime(metadata["blobDateModified"])
except KeyError:
mtime = None
if mtime is None:
lgr.warning(
"Asset %s is missing blobDateModified metadata field",
asset.path,
)
mtime = asset.modified
_download_generator = _download_file(
asset.get_download_file_iter(),
download_path,
toplevel_path=self.output_path,
# size and modified generally should be there but
# better to redownload than to crash
size=asset.size,
mtime=mtime,
existing=self.existing,
digests=digests,
lock=lock,
)
else:
assert isinstance(
asset, BaseRemoteZarrAsset
), f"Asset {asset.path} is neither blob nor Zarr"
_download_generator = _download_zarr(
asset,
download_path,
toplevel_path=self.output_path,
existing=self.existing,
jobs=self.jobs_per_zarr,
lock=lock,
)
# If exception is raised we might just raise it, or yield
# an error record
gen = {
"raise": _download_generator,
"yield": _download_generator_guard(path, _download_generator),
}[self.on_error]
if self.yield_generator_for_fields:
yield {"path": path, self.yield_generator_for_fields: gen}
else:
for resp in gen:
yield {**resp, "path": path}
def delete_for_sync(self) -> list[Path]:
"""
Returns the paths of local files that need to be deleted in order to
sync the contents of `output_path` with the remote URL
"""
if isinstance(self.url, SingleAssetURL):
return []
to_delete = []
for df in find_dandi_files(
self.output_path, dandiset_path=self.output_path, allow_all=True
):
if not isinstance(df, LocalAsset):
continue
if (
self.url.is_under_download_path(df.path)
and df.path not in self.asset_download_paths
):
to_delete.append(df.filepath)
return to_delete
def _download_generator_guard(path: str, generator: Iterator[dict]) -> Iterator[dict]:
try:
yield from generator
except Exception as exc:
lgr.exception("Caught while downloading %s:", path)
yield {
"status": "error",
"message": str(exc.__class__.__name__),
}
[docs]class ItemsSummary:
"""A helper "structure" to accumulate information about assets to be downloaded
To be used as a callback to IteratorWithAggregation
"""
def __init__(self) -> None:
self.files = 0
# TODO: get rid of needing it
self.t0: float | None = None # when first record is seen
self.size = 0
self.has_unknown_sizes = False
[docs] def as_dict(self) -> dict:
return {
"files": self.files,
"size": self.size,
"has_unknown_sizes": self.has_unknown_sizes,
}
# TODO: Determine the proper annotation for `rec`
def __call__(self, rec: Any, prior: ItemsSummary | None = None) -> ItemsSummary:
assert prior in (None, self)
if not self.files:
self.t0 = time.time()
self.files += 1
self.size += rec.size
return self
[docs]class PYOUTHelper:
"""Helper for PYOUT styling
Provides aggregation callbacks for PyOUT and also an iterator to be wrapped around
iterating over assets, so it would get "totals" as soon as they are available.
"""
def __init__(self):
# Establish "fancy" download while still possibly traversing the dandiset
# functionality.
self.items_summary = ItemsSummary()
self.it = IteratorWithAggregation(
# unfortunately Yarik missed the point that we need to wrap
# "assets" generator within downloader_generator
# so we do not have assets here! Ad-hoc solution for now is to
# pass this beast so it could get .gen set within downloader_generator
None, # download_generator(urls, output_dir, existing=existing),
self.items_summary,
)
[docs] def agg_files(self, *ignored: Any) -> str:
ret = str(self.items_summary.files)
if not self.it.finished:
ret += "+"
return ret
[docs] def agg_size(self, sizes: Iterable[int]) -> str | list[str]:
"""Formatter for "size" column where it would show
how much is "active" (or done)
+how much yet to be "shown".
"""
active = sum(sizes)
if (active, self.items_summary.size) == (0, 0):
return ""
v = [naturalsize(active)]
if not self.it.finished or (
active != self.items_summary.size or self.items_summary.has_unknown_sizes
):
extra = self.items_summary.size - active
if extra < 0:
lgr.debug("Extra size %d < 0 -- must not happen", extra)
else:
extra_str = "+%s" % naturalsize(extra)
if not self.it.finished:
extra_str = ">" + extra_str
if self.items_summary.has_unknown_sizes:
extra_str += "+?"
v.append(extra_str)
return v
[docs] def agg_done(self, done_sizes: Iterator[int]) -> list[str]:
"""Formatter for "DONE" column"""
done = sum(done_sizes)
if self.it.finished and done == 0 and self.items_summary.size == 0:
# even with 0s everywhere consider it 100%
r = 1.0
elif self.items_summary.size:
r = done / self.items_summary.size
else:
r = 0
pref = ""
if not self.it.finished:
pref += "<"
if self.items_summary.has_unknown_sizes:
pref += "?"
v = [naturalsize(done), f"{pref}{100 * r:.2f}%"]
if (
done
and self.items_summary.t0 is not None
and r
and self.items_summary.size != 0
):
dt = time.time() - self.items_summary.t0
more_time = (dt / r) - dt if r != 1 else 0
more_time_str = humanize.naturaldelta(more_time)
if not self.it.finished:
more_time_str += "<"
if self.items_summary.has_unknown_sizes:
more_time_str += "+?"
if more_time:
v.append("ETA: %s" % more_time_str)
return v
def _skip_file(msg: Any) -> dict:
return {"status": "skipped", "message": str(msg)}
def _populate_dandiset_yaml(
dandiset_path: str | Path, dandiset: RemoteDandiset, existing: DownloadExisting
) -> Iterator[dict]:
metadata = dandiset.get_raw_metadata()
if not metadata:
lgr.warning(
"Got completely empty metadata record for dandiset, not producing dandiset.yaml"
)
return
dandiset_yaml = op.join(dandiset_path, dandiset_metadata_file)
yield {"message": "updating"}
lgr.debug("Updating %s from obtained dandiset metadata", dandiset_metadata_file)
mtime = dandiset.modified
if op.lexists(dandiset_yaml):
with open(dandiset_yaml) as fp:
if yaml_load(fp, typ="safe") == metadata:
yield _skip_file("no change")
return
if existing is DownloadExisting.ERROR:
yield {"status": "error", "message": "already exists"}
return
elif existing is DownloadExisting.REFRESH and op.lexists(
op.join(dandiset_path, ".git", "annex")
):
raise RuntimeError("Not refreshing path in git annex repository")
elif existing is DownloadExisting.SKIP or (
existing is DownloadExisting.REFRESH
and os.lstat(dandiset_yaml).st_mtime >= mtime.timestamp()
):
yield _skip_file("already exists")
return
ds = Dandiset(dandiset_path, allow_empty=True)
ds.path_obj.mkdir(exist_ok=True) # exist_ok in case of parallel race
old_metadata = ds.metadata
ds.update_metadata(metadata)
os.utime(dandiset_yaml, (time.time(), mtime.timestamp()))
yield {
"status": "done",
"message": "updated" if metadata != old_metadata else "same",
}
def _download_file(
downloader: Callable[[int], Iterator[bytes]],
path: Path,
toplevel_path: str | Path,
lock: Lock,
size: int | None = None,
mtime: datetime | None = None,
existing: DownloadExisting = DownloadExisting.ERROR,
digests: dict[str, str] | None = None,
digest_callback: Callable[[str, str], Any] | None = None,
) -> Iterator[dict]:
"""
Common logic for downloading a single file.
Yields progress records that take the following forms::
{"status": "skipped", "message": "<MESSAGE>"}
{"size": <int>}
{"status": "downloading"}
{"done": <bytes downloaded>[, "done%": <percentage done, from 0 to 100>]}
{"status": "error", "message": "<MESSAGE>"}
{"checksum": "differs", "status": "error", "message": "<MESSAGE>"}
{"checksum": "ok"}
{"checksum": "-"} # No digests were provided
{"status": "setting mtime"}
{"status": "done"}
Parameters
----------
downloader: callable returning a generator
A backend-specific fixture for downloading some file into path. It should
be a generator yielding downloaded blocks.
size: int, optional
Target size if known
digests: dict, optional
possible checksums or other digests provided for the file. Only one
will be used to verify download
"""
# Avoid heavy import by importing within function:
from .support.digests import get_digest
if op.lexists(path):
annex_path = op.join(toplevel_path, ".git", "annex")
if existing is DownloadExisting.ERROR:
raise FileExistsError(f"File {path!r} already exists")
elif existing is DownloadExisting.SKIP:
yield _skip_file("already exists")
return
elif existing is DownloadExisting.OVERWRITE:
pass
elif existing is DownloadExisting.OVERWRITE_DIFFERENT:
realpath = op.realpath(path)
key_parts = op.basename(realpath).split("-")
if size is not None and os.stat(realpath).st_size != size:
lgr.debug(
"Size of %s does not match size on server; redownloading", path
)
elif (
op.lexists(annex_path)
and op.islink(path)
and path_is_subpath(realpath, op.abspath(annex_path))
and key_parts[0] == "SHA256E"
and digests
and "sha256" in digests
):
if key_parts[-1].partition(".")[0] == digests["sha256"]:
yield _skip_file("already exists")
return
else:
lgr.debug(
"%s is in git-annex, and hash does not match hash on server; redownloading",
path,
)
elif (
digests is not None
and "dandi-etag" in digests
and get_digest(path, "dandi-etag") == digests["dandi-etag"]
):
yield _skip_file("already exists")
return
elif (
digests is not None
and "dandi-etag" not in digests
and "md5" in digests
and get_digest(path, "md5") == digests["md5"]
):
yield _skip_file("already exists")
return
else:
lgr.debug(
"Etag of %s does not match etag on server; redownloading", path
)
elif existing is DownloadExisting.REFRESH:
if op.lexists(annex_path):
raise RuntimeError("Not refreshing path in git annex repository")
if mtime is None:
lgr.warning(
f"{path!r} - no mtime or ctime in the record, redownloading"
)
else:
stat = os.stat(op.realpath(path))
same = []
if is_same_time(stat.st_mtime, mtime):
same.append("mtime")
if size is not None and stat.st_size == size:
same.append("size")
# TODO: use digests if available? or if e.g. size is identical
# but mtime is different
if same == ["mtime", "size"]:
# TODO: add recording and handling of .nwb object_id
yield _skip_file("same time and size")
return
lgr.debug(f"{path!r} - same attributes: {same}. Redownloading")
if size is not None:
yield {"size": size}
destdir = Path(op.dirname(path))
with lock:
for p in (destdir, *destdir.parents):
if p.is_file():
p.unlink()
break
elif p.is_dir():
break
destdir.mkdir(parents=True, exist_ok=True)
yield {"status": "downloading"}
algo: str | None = None
digester: Callable[[], Hasher] | None = None
digest: str | None = None
downloaded_digest: Hasher | None = None
if digests:
# choose first available for now.
# TODO: reuse that sorting based on speed
for algo, digest in digests.items():
if algo == "dandi-etag" and size is not None:
# Instantiate outside the lambda so that mypy is assured that
# `size` is not None:
hasher = ETagHashlike(size)
digester = lambda: hasher # noqa: E731
else:
digester = getattr(hashlib, algo, None)
if digester is not None:
break
if digester is None:
lgr.warning("Found no digests in hashlib for any of %s", str(digests))
# TODO: how do we discover the total size????
# TODO: do not do it in-place, but rather into some "hidden" file
resuming = False
for attempt in range(3):
try:
if digester:
downloaded_digest = digester() # start empty
warned = False
# I wonder if we could make writing async with downloader
with DownloadDirectory(path, digests or {}) as dldir:
assert dldir.offset is not None
downloaded = dldir.offset
resuming = downloaded > 0
if size is not None and downloaded == size:
# Exit early when downloaded == size, as making a Range
# request in such a case results in a 416 error from S3.
# Problems will result if `size` is None but we've already
# downloaded everything.
break
for block in downloader(dldir.offset):
if digester:
assert downloaded_digest is not None
downloaded_digest.update(block)
downloaded += len(block)
# TODO: yield progress etc
out: dict[str, Any] = {"done": downloaded}
if size:
if downloaded > size and not warned:
warned = True
# Yield ERROR?
lgr.warning(
"Downloaded %d bytes although size was told to be just %d",
downloaded,
size,
)
out["done%"] = 100 * downloaded / size
# TODO: ETA etc
yield out
dldir.append(block)
break
except ValueError:
# When `requests` raises a ValueError, it's because the caller
# provided invalid parameters (e.g., an invalid URL), and so
# retrying won't change anything.
raise
# Catching RequestException lets us retry on timeout & connection
# errors (among others) in addition to HTTP status errors.
except requests.RequestException as exc:
# TODO: actually we should probably retry only on selected codes,
# and also respect Retry-After
if attempt >= 2 or (
exc.response is not None
and exc.response.status_code
not in (
400, # Bad Request, but happened with gider:
# https://github.com/dandi/dandi-cli/issues/87
*RETRY_STATUSES,
)
):
lgr.debug("Download failed: %s", exc)
yield {"status": "error", "message": str(exc)}
return
# if is_access_denied(exc) or attempt >= 2:
# raise
# sleep a little and retry
lgr.debug(
"Failed to download on attempt #%d: %s, will sleep a bit and retry",
attempt,
exc,
)
time.sleep(random.random() * 5)
if downloaded_digest and not resuming:
assert downloaded_digest is not None
final_digest = downloaded_digest.hexdigest() # we care only about hex
if digest_callback is not None:
assert isinstance(algo, str)
digest_callback(algo, final_digest)
if digest != final_digest:
msg = f"{algo}: downloaded {final_digest} != {digest}"
yield {"checksum": "differs", "status": "error", "message": msg}
lgr.debug("%s is different: %s.", path, msg)
return
else:
yield {"checksum": "ok"}
lgr.debug("Verified that %s has correct %s %s", path, algo, digest)
else:
# shouldn't happen with more recent metadata etc
yield {
"checksum": "-",
# "message": "no digests were provided"
}
# TODO: dissolve attrs and pass specific mtime?
if mtime is not None:
yield {"status": "setting mtime"}
os.utime(path, (time.time(), mtime.timestamp()))
yield {"status": "done"}
[docs]class DownloadDirectory:
def __init__(self, filepath: str | Path, digests: dict[str, str]) -> None:
#: The path to which to save the file after downloading
self.filepath = Path(filepath)
#: Expected hashes of the downloaded data, as a mapping from algorithm
#: names to digests
self.digests = digests
#: The working directory in which downloaded data will be temporarily
#: stored
self.dirpath = self.filepath.with_name(self.filepath.name + ".dandidownload")
#: The file in `dirpath` to which data will be written as it is
#: received
self.writefile = self.dirpath / "file"
#: A `fasteners.InterProcessLock` on `dirpath`
self.lock: InterProcessLock | None = None
#: An open filehandle to `writefile`
self.fp: IO[bytes] | None = None
#: How much of the data has been downloaded so far
self.offset: int | None = None
def __enter__(self) -> DownloadDirectory:
self.dirpath.mkdir(parents=True, exist_ok=True)
self.lock = InterProcessLock(str(self.dirpath / "lock"))
if not self.lock.acquire(blocking=False):
raise RuntimeError(f"Could not acquire download lock for {self.filepath}")
chkpath = self.dirpath / "checksum"
try:
with chkpath.open() as fp:
digests = json.load(fp)
except (FileNotFoundError, ValueError):
digests = {}
matching_algs = self.digests.keys() & digests.keys()
if matching_algs and all(
self.digests[alg] == digests[alg] for alg in matching_algs
):
# Pick up where we left off, writing to the end of the file
lgr.debug(
"Download directory exists and has matching checksum; resuming download"
)
self.fp = self.writefile.open("ab")
else:
# Delete the file (if it even exists) and start anew
if not chkpath.exists():
lgr.debug("Starting new download in new download directory")
else:
lgr.debug(
"Download directory found, but digests do not match; starting new download"
)
try:
self.writefile.unlink()
except FileNotFoundError:
pass
self.fp = self.writefile.open("wb")
with chkpath.open("w") as fp:
json.dump(self.digests, fp)
self.offset = self.fp.tell()
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
assert self.fp is not None
self.fp.close()
try:
if exc_type is None:
try:
self.writefile.replace(self.filepath)
except IsADirectoryError:
rmtree(self.filepath)
self.writefile.replace(self.filepath)
finally:
assert self.lock is not None
self.lock.release()
if exc_type is None:
rmtree(self.dirpath, ignore_errors=True)
self.lock = None
self.fp = None
self.offset = None
[docs] def append(self, blob: bytes) -> None:
if self.fp is None:
raise ValueError(
"DownloadDirectory.append() called outside of context manager"
)
self.fp.write(blob)
def _download_zarr(
asset: BaseRemoteZarrAsset,
download_path: Path,
toplevel_path: str | Path,
existing: DownloadExisting,
lock: Lock,
jobs: int | None = None,
) -> Iterator[dict]:
# Avoid heavy import by importing within function:
from .support.digests import get_zarr_checksum
download_gens = {}
entries = list(asset.iterfiles())
digests = {}
def digest_callback(path: str, algoname: str, d: str) -> None:
if algoname == "md5":
digests[path] = d
for entry in entries:
etag = entry.digest
assert etag.algorithm is DigestType.md5
download_gens[str(entry)] = _download_file(
entry.get_download_file_iter(),
download_path / str(entry),
toplevel_path=toplevel_path,
size=entry.size,
mtime=entry.modified,
existing=existing,
digests={"md5": etag.value},
lock=lock,
digest_callback=partial(digest_callback, str(entry)),
)
pc = ProgressCombiner(zarr_size=asset.size, file_qty=len(download_gens))
final_out: dict | None = None
with interleave(
[pairing(p, gen) for p, gen in download_gens.items()],
onerror=FINISH_CURRENT,
max_workers=jobs or 4,
) as it:
for path, status in it:
for out in pc.feed(path, status):
if out.get("status") == "done":
final_out = out
else:
yield out
if final_out is not None:
break
else:
return
yield {"status": "deleting extra files"}
remote_paths = set(map(str, entries))
zarr_basepath = Path(download_path)
dirs = deque([zarr_basepath])
empty_dirs: deque[Path] = deque()
while dirs:
d = dirs.popleft()
is_empty = True
for p in list(d.iterdir()):
if exclude_from_zarr(p):
is_empty = False
elif (
p.is_file()
and p.relative_to(zarr_basepath).as_posix() not in remote_paths
):
try:
p.unlink()
except OSError:
is_empty = False
elif p.is_dir():
dirs.append(p)
is_empty = False
else:
is_empty = False
if is_empty and d != zarr_basepath:
empty_dirs.append(d)
while empty_dirs:
d = empty_dirs.popleft()
d.rmdir()
if d.parent != zarr_basepath and not any(d.parent.iterdir()):
empty_dirs.append(d.parent)
if "skipped" not in final_out["message"]:
zarr_checksum = asset.get_digest().value
local_checksum = get_zarr_checksum(zarr_basepath, known=digests)
if zarr_checksum != local_checksum:
msg = f"Zarr checksum: downloaded {local_checksum} != {zarr_checksum}"
yield {"checksum": "differs", "status": "error", "message": msg}
lgr.debug("%s is different: %s.", zarr_basepath, msg)
return
else:
yield {"checksum": "ok"}
lgr.debug(
"Verified that %s has correct Zarr checksum %s",
zarr_basepath,
zarr_checksum,
)
yield {"status": "done"}
[docs]def pairing(p: str, gen: Iterator[dict]) -> Iterator[tuple[str, dict]]:
for d in gen:
yield (p, d)
DLState = Enum("DLState", "STARTING DOWNLOADING SKIPPED ERROR CHECKSUM_ERROR DONE")
[docs]@dataclass
class DownloadProgress:
state: DLState = DLState.STARTING
downloaded: int = 0
size: int | None = None
[docs]@dataclass
class ProgressCombiner:
zarr_size: int
file_qty: int
files: dict[str, DownloadProgress] = field(default_factory=dict)
#: Total size of all files that were not skipped and did not error out
#: during download
maxsize: int = 0
prev_status: str = ""
yielded_size: bool = False
@property
def message(self) -> str:
done = 0
errored = 0
skipped = 0
for s in self.files.values():
if s.state is DLState.DONE:
done += 1
elif s.state in (DLState.ERROR, DLState.CHECKSUM_ERROR):
errored += 1
elif s.state is DLState.SKIPPED:
skipped += 1
parts = []
if done:
parts.append(f"{done} done")
if errored:
parts.append(f"{errored} errored")
if skipped:
parts.append(f"{skipped} skipped")
return ", ".join(parts)
[docs] def get_done(self) -> dict:
total_downloaded = sum(
s.downloaded
for s in self.files.values()
if s.state in (DLState.DOWNLOADING, DLState.CHECKSUM_ERROR, DLState.DONE)
)
return {
"done": total_downloaded,
"done%": total_downloaded / self.maxsize * 100,
}
[docs] def set_status(self, statusdict: dict) -> None:
state_qtys = Counter(s.state for s in self.files.values())
total = len(self.files)
if (
total == self.file_qty
and state_qtys[DLState.STARTING] == state_qtys[DLState.DOWNLOADING] == 0
):
# All files have finished
if state_qtys[DLState.ERROR] or state_qtys[DLState.CHECKSUM_ERROR]:
new_status = "error"
elif state_qtys[DLState.DONE]:
new_status = "done"
else:
new_status = "skipped"
elif total - state_qtys[DLState.STARTING] - state_qtys[DLState.SKIPPED] > 0:
new_status = "downloading"
else:
new_status = ""
if new_status != self.prev_status:
statusdict["status"] = new_status
self.prev_status = new_status
[docs] def feed(self, path: str, status: dict) -> Iterator[dict]:
keys = list(status.keys())
self.files.setdefault(path, DownloadProgress())
if status.get("status") == "skipped":
self.files[path].state = DLState.SKIPPED
out = {"message": self.message}
self.set_status(out)
yield out
elif keys == ["size"]:
if not self.yielded_size:
yield {"size": self.zarr_size}
self.yielded_size = True
self.files[path].size = status["size"]
self.maxsize += status["size"]
if any(s.state is DLState.DOWNLOADING for s in self.files.values()):
yield self.get_done()
elif status == {"status": "downloading"}:
self.files[path].state = DLState.DOWNLOADING
out = {}
self.set_status(out)
if out:
yield out
elif "done" in status:
self.files[path].downloaded = status["done"]
yield self.get_done()
elif status.get("status") == "error":
if "checksum" in status:
self.files[path].state = DLState.CHECKSUM_ERROR
out = {"message": self.message}
self.set_status(out)
yield out
else:
self.files[path].state = DLState.ERROR
out = {"message": self.message}
self.set_status(out)
yield out
sz = self.files[path].size
if sz is not None:
self.maxsize -= sz
yield self.get_done()
elif keys == ["checksum"]:
pass
elif status == {"status": "setting mtime"}:
pass
elif status == {"status": "done"}:
self.files[path].state = DLState.DONE
out = {"message": self.message}
self.set_status(out)
yield out
else:
lgr.warning(
"Unexpected download status dict for %r received: %r", path, status
)