"""Download assets from DANDI Archive.
This module provides functionality for downloading files and Zarr archives
from DANDI Archive instances. It supports:
- Individual file downloads with integrity verification
- Zarr archive downloads with parallel entry handling
- Resume capability for interrupted downloads
- Progress tracking and error recovery
"""
from __future__ import annotations
from collections import Counter, deque
from collections.abc import Callable, Iterable, Iterator, Sequence
from dataclasses import InitVar, dataclass, field
from datetime import datetime
from enum import Enum
from functools import partial
import hashlib
import inspect
import json
import os
import os.path as op
from pathlib import Path
import random
from shutil import rmtree
import sys
from threading import Lock
import time
from types import TracebackType
from typing import IO, Any, Literal
from dandischema.digests.dandietag import ETagHashlike
from dandischema.models import DigestType
from fasteners import InterProcessLock
import humanize
from interleave import FINISH_CURRENT, lazy_interleave
import requests
from . import get_logger
from .consts import DOWNLOAD_SUFFIX, RETRY_STATUSES, SyncMode, dandiset_metadata_file
from .dandiapi import AssetType, BaseRemoteZarrAsset, RemoteDandiset
from .dandiarchive import (
AssetItemURL,
DandisetURL,
ParsedDandiURL,
SingleAssetURL,
parse_dandi_url,
)
from .dandiset import Dandiset
from .exceptions import NotFoundError
from .files import LocalAsset, find_dandi_files
from .support import pyout as pyouts
from .support.iterators import IteratorWithAggregation
from .support.pyout import naturalsize
from .utils import (
Hasher,
abbrev_prompt,
ensure_datetime,
exclude_from_zarr,
flattened,
get_retry_after,
is_same_time,
path_is_subpath,
pluralize,
yaml_load,
)
lgr = get_logger()
[docs]
class DownloadExisting(str, Enum):
ERROR = "error"
SKIP = "skip"
OVERWRITE = "overwrite"
OVERWRITE_DIFFERENT = "overwrite-different"
REFRESH = "refresh"
def __str__(self) -> str:
return self.value
[docs]
class PathType(str, Enum):
EXACT = "exact"
GLOB = "glob"
def __str__(self) -> str:
return self.value
[docs]
def download(
urls: str | Sequence[str],
output_dir: str | Path,
*,
format: DownloadFormat = DownloadFormat.PYOUT,
existing: DownloadExisting = DownloadExisting.ERROR,
jobs: int = 1,
jobs_per_zarr: int | None = None,
get_metadata: bool = True,
get_assets: bool = True,
preserve_tree: bool = False,
sync: bool | SyncMode | None = False,
path_type: PathType = PathType.EXACT,
) -> None:
# TODO: unduplicate with upload. For now stole from that one
# We will again use pyout to provide a neat table summarizing our progress
# with upload etc
urls = flattened([urls])
if not urls:
# if no paths provided etc, we will download dandiset path
# we are at, BUT since we are not git -- we do not even know
# on which instance it exists! Thus ATM we would do nothing but crash
raise NotImplementedError("No URLs were provided. Cannot download anything")
parsed_urls = [parse_dandi_url(u, glob=path_type is PathType.GLOB) for u in urls]
# dandi.cli.formatters are used in cmd_ls to provide switchable
pyout_style = pyouts.get_style(hide_if_missing=False)
rec_fields = ("path", "size", "done", "done%", "checksum", "status", "message")
out = pyouts.LogSafeTabular(style=pyout_style, columns=rec_fields, max_workers=jobs)
out_helper = PYOUTHelper()
pyout_style["done"] = pyout_style["size"].copy()
pyout_style["size"]["aggregate"] = out_helper.agg_size
pyout_style["done"]["aggregate"] = out_helper.agg_done
# I thought I was making a beautiful flower but ended up with cacti
# which never blooms... All because assets are looped through inside download_generator
# TODO: redo
kw = dict(assets_it=out_helper.it)
if jobs > 1:
if format is DownloadFormat.PYOUT:
# It could handle delegated to generator downloads
kw["yield_generator_for_fields"] = rec_fields[1:] # all but path
else:
lgr.warning(
"Parallel downloads are not yet implemented for non-pyout format=%r. "
"Download will proceed serially.",
str(format),
)
downloaders = [
Downloader(
url=purl,
output_dir=output_dir,
existing=existing,
get_metadata=get_metadata,
get_assets=get_assets,
preserve_tree=preserve_tree,
jobs_per_zarr=jobs_per_zarr,
on_error="yield" if format is DownloadFormat.PYOUT else "raise",
**kw,
)
for purl in parsed_urls
]
gen_ = (r for dl in downloaders for r in dl.download_generator())
# Constructs to capture errors and handle them at the end
errors = []
def p4e_gen(callback):
for v in callback:
yield p4e(v)
def p4e(out):
if out.get("status") == "error":
if out not in errors:
errors.append(out)
else:
# If generator was yielded, we need to wrap it also with
# our handling
for k, v in out.items():
if inspect.isgenerator(v):
rec[k] = p4e_gen(v)
return out
# TODOs:
# - redo frontends similarly to how command_ls did it
# - have a single loop with analysis of `rec` to either any file
# has failed to download. If any was: exception should probably be
# raised. API discussion for Python side of API:
#
if format is DownloadFormat.DEBUG:
for rec in gen_:
print(p4e(rec), flush=True)
elif format is DownloadFormat.PYOUT:
with out:
for rec in gen_:
out(p4e(rec))
else:
raise AssertionError(f"Unhandled DownloadFormat member: {format!r}")
if sync:
# Normalize legacy bool True to SyncMode.ASK
if sync is True:
sync = SyncMode.ASK
to_delete = [p for dl in downloaders for p in dl.delete_for_sync()]
if to_delete:
do_delete = False
if sync is SyncMode.DO:
do_delete = True
else:
while True:
opt = abbrev_prompt(
f"Delete {pluralize(len(to_delete), 'local asset')}?",
"yes",
"no",
"list",
)
if opt == "list":
for p in to_delete:
print(p)
elif opt == "yes":
do_delete = True
break
else:
break
if do_delete:
for p in to_delete:
if p.is_dir():
rmtree(p)
else:
p.unlink()
if errors:
error_msg = f"Encountered {pluralize(len(errors), 'error')} while downloading."
# Also log the first error for easier debugging
lgr.error("%s The first error: %r", error_msg, errors[0])
raise RuntimeError(error_msg)
@dataclass
class Downloader:
""":meta private:"""
url: ParsedDandiURL
output_dir: InitVar[str | Path]
output_prefix: Path = field(init=False)
output_path: Path = field(init=False)
existing: DownloadExisting
get_metadata: bool
get_assets: bool
preserve_tree: bool
jobs_per_zarr: int | None
on_error: Literal["raise", "yield"]
#: which will be set .gen to assets. Purpose is to make it possible to get
#: summary statistics while already downloading. TODO: reimplement
#: properly!
assets_it: IteratorWithAggregation | None = None
yield_generator_for_fields: tuple[str, ...] | None = None
asset_download_paths: set[str] = field(init=False, default_factory=set)
def __post_init__(self, output_dir: str | Path) -> None:
# TODO: if we are ALREADY in a dandiset - we can validate that it is
# the same dandiset and use that dandiset path as the one to download
# under
if isinstance(self.url, DandisetURL) or (
self.preserve_tree and self.url.dandiset_id is not None
):
assert self.url.dandiset_id is not None
self.output_prefix = Path(self.url.dandiset_id)
else:
self.output_prefix = Path()
self.output_path = Path(output_dir, self.output_prefix)
def is_dandiset_yaml(self) -> bool:
return isinstance(self.url, AssetItemURL) and self.url.path == "dandiset.yaml"
def download_generator(self) -> Iterator[dict]:
"""
A generator for downloads of files, folders, or entire dandiset from
DANDI (as identified by URL)
This function is a generator which yields records on ongoing
activities. Activities include traversal of the remote resource (DANDI
Archive), download of individual assets while yielding records (TODO:
schema) while validating their checksums "on the fly", etc.
"""
with self.url.navigate(strict=True) as (client, dandiset, assets):
if (
(
isinstance(self.url, DandisetURL)
or self.is_dandiset_yaml()
or self.preserve_tree
)
and self.get_metadata
and dandiset is not None
):
for resp in _populate_dandiset_yaml(
self.output_path, dandiset, self.existing
):
yield {
"path": str(self.output_prefix / dandiset_metadata_file),
**resp,
}
if self.is_dandiset_yaml():
return
# TODO: do analysis of assets for early detection of needed renames
# etc to avoid any need for late treatment of existing and also for
# more efficient download if files are just renamed etc
if not self.get_assets:
return
if self.assets_it:
assets = self.assets_it.feed(assets)
lock = Lock()
for asset in assets:
path = self.url.get_asset_download_path(
asset, preserve_tree=self.preserve_tree
)
self.asset_download_paths.add(path)
download_path = Path(self.output_path, path)
path = str(self.output_prefix / path)
try:
metadata = asset.get_raw_metadata()
except NotFoundError as e:
yield {"path": path, "status": "error", "message": str(e)}
continue
d = metadata.get("digest", {})
if asset.asset_type is AssetType.BLOB:
if "dandi:dandi-etag" in d:
digests = {"dandi-etag": d["dandi:dandi-etag"]}
else:
raise RuntimeError(
f"dandi-etag not available for asset. Known digests: {d}"
)
try:
digests["sha256"] = d["dandi:sha2-256"]
except KeyError:
pass
try:
mtime = ensure_datetime(metadata["blobDateModified"])
except KeyError:
mtime = None
if mtime is None:
lgr.warning(
"Asset %s is missing blobDateModified metadata field",
asset.path,
)
mtime = asset.modified
_download_generator = _download_file(
asset.get_download_file_iter(),
download_path,
toplevel_path=self.output_path,
# size and modified generally should be there but
# better to redownload than to crash
size=asset.size,
mtime=mtime,
existing=self.existing,
digests=digests,
lock=lock,
)
else:
assert isinstance(
asset, BaseRemoteZarrAsset
), f"Asset {asset.path} is neither blob nor Zarr"
_download_generator = _download_zarr(
asset,
download_path,
toplevel_path=self.output_path,
existing=self.existing,
jobs=self.jobs_per_zarr,
lock=lock,
)
def _progress_filter(gen):
"""To reduce load on pyout etc, make progress reports only if enough time
from prior report has passed (over 2 seconds) or we are done (got 100%).
Note that it requires "awareness" from the code below to issue other messages
with bundling with done% reporting if reporting on progress of some kind (e.g.,
adjusting "message").
"""
prior_time = 0
warned = False
for rec in gen:
current_time = time.time()
if done_perc := rec.get("done%", 0):
if isinstance(done_perc, (int, float)):
if current_time - prior_time < 2 and done_perc != 100:
continue
elif not warned:
warned = True
lgr.warning(
"Received non numeric done%%: %r", done_perc
)
prior_time = current_time
yield rec
# If exception is raised we might just raise it, or yield
# an error record
gen = {
"raise": _download_generator,
"yield": _download_generator_guard(path, _download_generator),
}[self.on_error]
gen = _progress_filter(gen)
if self.yield_generator_for_fields:
yield {"path": path, self.yield_generator_for_fields: gen}
else:
for resp in gen:
yield {**resp, "path": path}
def delete_for_sync(self) -> list[Path]:
"""
Returns the paths of local files that need to be deleted in order to
sync the contents of `output_path` with the remote URL
"""
if isinstance(self.url, SingleAssetURL):
return []
to_delete = []
for df in find_dandi_files(
self.output_path, dandiset_path=self.output_path, allow_all=True
):
if not isinstance(df, LocalAsset):
continue
if (
self.url.is_under_download_path(df.path)
and df.path not in self.asset_download_paths
):
to_delete.append(df.filepath)
return to_delete
def _download_generator_guard(path: str, generator: Iterator[dict]) -> Iterator[dict]:
try:
yield from generator
except Exception as exc:
lgr.exception("Caught while downloading %s:", path)
yield {
"status": "error",
"message": str(exc.__class__.__name__),
}
[docs]
class ItemsSummary:
"""A helper "structure" to accumulate information about assets to be downloaded
To be used as a callback to IteratorWithAggregation
"""
def __init__(self) -> None:
self.files = 0
# TODO: get rid of needing it
self.t0: float | None = None # when first record is seen
self.size: int = 0
self.has_unknown_sizes: bool = False
[docs]
def as_dict(self) -> dict:
return {
"files": self.files,
"size": self.size,
"has_unknown_sizes": self.has_unknown_sizes,
}
# TODO: Determine the proper annotation for `rec`
def __call__(self, rec: Any, prior: ItemsSummary | None = None) -> ItemsSummary:
assert prior in (None, self)
if not self.files:
self.t0 = time.time()
self.files += 1
self.size += rec.size
return self
[docs]
class PYOUTHelper:
"""Helper for PYOUT styling
Provides aggregation callbacks for PyOUT and also an iterator to be wrapped around
iterating over assets, so it would get "totals" as soon as they are available.
"""
def __init__(self):
# Establish "fancy" download while still possibly traversing the dandiset
# functionality.
self.items_summary = ItemsSummary()
self.it = IteratorWithAggregation(
# unfortunately Yarik missed the point that we need to wrap
# "assets" generator within downloader_generator
# so we do not have assets here! Ad-hoc solution for now is to
# pass this beast so it could get .gen set within downloader_generator
None, # download_generator(urls, output_dir, existing=existing),
self.items_summary,
)
[docs]
def agg_files(self, *ignored: Any) -> str:
ret = str(self.items_summary.files)
if not self.it.finished:
ret += "+"
return ret
[docs]
def agg_size(self, sizes: Iterable[int]) -> str | list[str]:
"""Formatter for "size" column where it would show
how much is "active" (or done)
+how much yet to be "shown".
"""
active = sum(sizes)
if (active, self.items_summary.size) == (0, 0):
return ""
v = [naturalsize(active)]
if not self.it.finished or (
active != self.items_summary.size or self.items_summary.has_unknown_sizes
):
extra = self.items_summary.size - active
if extra < 0:
lgr.debug("Extra size %d < 0 -- must not happen", extra)
else:
extra_str = "+%s" % naturalsize(extra)
if not self.it.finished:
extra_str = ">" + extra_str
if self.items_summary.has_unknown_sizes:
extra_str += "+?"
v.append(extra_str)
return v
[docs]
def agg_done(self, done_sizes: Iterator[int]) -> list[str]:
"""Formatter for "DONE" column"""
done = sum(done_sizes)
if self.it.finished and done == 0 and self.items_summary.size == 0:
# even with 0s everywhere consider it 100%
r = 1.0
elif self.items_summary.size:
r = done / self.items_summary.size
else:
r = 0
pref = ""
if not self.it.finished:
pref += "<"
if self.items_summary.has_unknown_sizes:
pref += "?"
v = [naturalsize(done), f"{pref}{100 * r:.2f}%"]
if (
done
and self.items_summary.t0 is not None
and r
and self.items_summary.size != 0
):
dt = time.time() - self.items_summary.t0
more_time = (dt / r) - dt if r != 1 else 0
more_time_str = humanize.naturaldelta(more_time)
if not self.it.finished:
more_time_str += "<"
if self.items_summary.has_unknown_sizes:
more_time_str += "+?"
if more_time:
v.append("ETA: %s" % more_time_str)
return v
def _skip_file(msg: Any, **kwargs: Any) -> dict:
return {"status": "skipped", "message": str(msg), **kwargs}
def _populate_dandiset_yaml(
dandiset_path: str | Path, dandiset: RemoteDandiset, existing: DownloadExisting
) -> Iterator[dict]:
metadata = dandiset.get_raw_metadata()
if not metadata:
lgr.warning(
"Got completely empty metadata record for dandiset, not producing dandiset.yaml"
)
return
dandiset_yaml = op.join(dandiset_path, dandiset_metadata_file)
yield {"message": "updating"}
lgr.debug("Updating %s from obtained dandiset metadata", dandiset_metadata_file)
mtime = dandiset.modified
if op.lexists(dandiset_yaml):
with open(dandiset_yaml) as fp:
if yaml_load(fp, typ="safe") == metadata:
yield _skip_file("no change")
return
if existing is DownloadExisting.ERROR:
yield {"status": "error", "message": "already exists"}
return
elif existing is DownloadExisting.REFRESH and op.lexists(
op.join(dandiset_path, ".git", "annex")
):
raise RuntimeError("Not refreshing path in git annex repository")
elif existing is DownloadExisting.SKIP or (
existing is DownloadExisting.REFRESH
and os.lstat(dandiset_yaml).st_mtime >= mtime.timestamp()
):
yield _skip_file("already exists")
return
ds = Dandiset(dandiset_path, allow_empty=True)
ds.path_obj.mkdir(exist_ok=True) # exist_ok in case of parallel race
old_metadata = ds.metadata
ds.update_metadata(metadata)
os.utime(dandiset_yaml, (time.time(), mtime.timestamp()))
yield {
"status": "done",
"message": "updated" if metadata != old_metadata else "same",
}
def _download_file(
downloader: Callable[[int], Iterator[bytes]],
path: Path,
toplevel_path: str | Path,
lock: Lock,
size: int | None = None,
mtime: datetime | None = None,
existing: DownloadExisting = DownloadExisting.ERROR,
digests: dict[str, str] | None = None,
digest_callback: Callable[[str, str], Any] | None = None,
) -> Iterator[dict]:
"""
Common logic for downloading a single file.
Yields progress records that take the following forms::
{"status": "skipped", "message": "<MESSAGE>"}
{"size": <int>}
{"status": "downloading"}
{"done": <bytes downloaded>[, "done%": <percentage done, from 0 to 100>]}
{"status": "error", "message": "<MESSAGE>"}
{"checksum": "differs", "status": "error", "message": "<MESSAGE>"}
{"checksum": "ok"}
{"checksum": "-"} # No digests were provided
{"status": "setting mtime"}
{"status": "done"}
Parameters
----------
downloader: callable returning a generator
A backend-specific fixture for downloading some file into path. It should
be a generator yielding downloaded blocks.
size: int, optional
Target size if known
digests: dict, optional
possible checksums or other digests provided for the file. Only one
will be used to verify download
"""
# Avoid heavy import by importing within function:
from .support.digests import get_digest
if op.lexists(path):
annex_path = op.join(toplevel_path, ".git", "annex")
if existing is DownloadExisting.ERROR:
raise FileExistsError(f"File {path!r} already exists")
elif existing is DownloadExisting.SKIP:
yield _skip_file("already exists")
return
elif existing is DownloadExisting.OVERWRITE:
pass
elif existing is DownloadExisting.OVERWRITE_DIFFERENT:
realpath = op.realpath(path)
key_parts = op.basename(realpath).split("-")
if size is not None and os.stat(realpath).st_size != size:
lgr.debug(
"Size of %s does not match size on server; redownloading", path
)
elif (
op.lexists(annex_path)
and op.islink(path)
and path_is_subpath(realpath, op.abspath(annex_path))
and key_parts[0] == "SHA256E"
and digests
and "sha256" in digests
):
if key_parts[-1].partition(".")[0] == digests["sha256"]:
yield _skip_file("already exists")
return
else:
lgr.debug(
"%s is in git-annex, and hash does not match hash on server; redownloading",
path,
)
elif (
digests is not None
and "dandi-etag" in digests
and get_digest(path, "dandi-etag") == digests["dandi-etag"]
):
yield _skip_file("already exists")
return
elif (
digests is not None
and "dandi-etag" not in digests
and "md5" in digests
and get_digest(path, "md5") == digests["md5"]
):
yield _skip_file("already exists")
return
else:
lgr.debug(
"Etag of %s does not match etag on server; redownloading", path
)
elif existing is DownloadExisting.REFRESH:
if op.lexists(annex_path):
raise RuntimeError("Not refreshing path in git annex repository")
if mtime is None:
lgr.warning(
f"{path!r} - no mtime or ctime in the record, redownloading"
)
else:
stat = os.stat(op.realpath(path))
same = []
if is_same_time(stat.st_mtime, mtime):
same.append("mtime")
if size is not None and stat.st_size == size:
same.append("size")
# TODO: use digests if available? or if e.g. size is identical
# but mtime is different
if same == ["mtime", "size"]:
# TODO: add recording and handling of .nwb object_id
yield _skip_file("same time and size", size=size)
return
lgr.debug(f"{path!r} - same attributes: {same}. Redownloading")
if size is not None:
yield {"size": size}
destdir = Path(op.dirname(path))
with lock:
for p in (destdir, *destdir.parents):
if p.is_file():
p.unlink()
break
elif p.is_dir():
break
destdir.mkdir(parents=True, exist_ok=True)
yield {"status": "downloading"}
algo: str | None = None
digester: Callable[[], Hasher] | None = None
digest: str | None = None
downloaded_digest: Hasher | None = None
if digests:
# choose first available for now.
# TODO: reuse that sorting based on speed
for algo, digest in digests.items():
if algo == "dandi-etag" and size is not None:
# Instantiate outside the lambda so that mypy is assured that
# `size` is not None:
hasher = ETagHashlike(size)
digester = lambda: hasher # noqa: E731
else:
digester = getattr(hashlib, algo, None)
if digester is not None:
break
if digester is None:
lgr.warning(
"%s - found no digests in hashlib for any of %s", path, str(digests)
)
resuming = False
attempt = 1
attempts_allowed: int = (
10 # number to do, could be incremented if we downloaded a little
)
while attempt <= attempts_allowed:
try:
if digester:
downloaded_digest = digester() # start empty
warned = False
# I wonder if we could make writing async with downloader
with DownloadDirectory(path, digests or {}) as dldir:
assert dldir.offset is not None
downloaded_in_attempt = 0
downloaded = dldir.offset
resuming = downloaded > 0
if size is not None and downloaded == size:
lgr.debug(
"%s - downloaded size matches target size of %d, exiting the loop",
path,
size,
)
# Exit early when downloaded == size, as making a Range
# request in such a case results in a 416 error from S3.
# Problems will result if `size` is None but we've already
# downloaded everything.
break
for block in downloader(dldir.offset):
if digester:
assert downloaded_digest is not None
downloaded_digest.update(block)
downloaded += len(block)
downloaded_in_attempt += len(block)
out: dict[str, Any] = {"done": downloaded}
if size:
if downloaded > size and not warned:
warned = True
# Yield ERROR?
lgr.warning(
"%s - downloaded %d bytes although size was told to be just %d",
path,
downloaded,
size,
)
out["done%"] = 100 * downloaded / size
yield out
dldir.append(block)
break
except ValueError:
# When `requests` raises a ValueError, it's because the caller
# provided invalid parameters (e.g., an invalid URL), and so
# retrying won't change anything.
raise
# Catching RequestException lets us retry on timeout & connection
# errors (among others) in addition to HTTP status errors.
except requests.RequestException as exc:
if not (
attempts_allowed := _check_attempts_and_sleep(
path=path,
exc=exc,
attempt=attempt,
attempts_allowed=attempts_allowed,
downloaded_in_attempt=downloaded_in_attempt,
)
):
yield {"status": "error", "message": str(exc)}
return
finally:
attempt += 1
else:
lgr.warning("downloader logic: We should not be here!")
final_digest = None
if downloaded_digest and not resuming:
assert downloaded_digest is not None
final_digest = downloaded_digest.hexdigest() # we care only about hex
elif digests:
if resuming:
lgr.debug("%s - resumed download. Need to check full checksum.", path)
else:
assert not downloaded_digest
lgr.debug(
"%s - no digest was checked online. Need to check full checksum", path
)
final_digest = get_digest(path, algo)
if final_digest:
if digest_callback is not None:
assert isinstance(algo, str)
digest_callback(algo, final_digest)
if digest != final_digest:
msg = f"{algo}: downloaded {final_digest} != {digest}"
yield {"checksum": "differs", "status": "error", "message": msg}
lgr.debug("%s - is different: %s.", path, msg)
return
else:
yield {"checksum": "ok"}
lgr.debug("%s - verified that has correct %s %s", path, algo, digest)
else:
lgr.debug("%s - no digests were provided", path)
# shouldn't happen with more recent metadata etc
yield {
"checksum": "-",
# "message": "no digests were provided"
}
# TODO: dissolve attrs and pass specific mtime?
if mtime is not None:
yield {"status": "setting mtime"}
os.utime(path, (time.time(), mtime.timestamp()))
yield {"status": "done"}
[docs]
class DownloadDirectory:
def __init__(self, filepath: str | Path, digests: dict[str, str]) -> None:
#: The path to which to save the file after downloading
self.filepath = Path(filepath)
#: Expected hashes of the downloaded data, as a mapping from algorithm
#: names to digests
self.digests = digests
#: The working directory in which downloaded data will be temporarily
#: stored
self.dirpath = self.filepath.with_name(self.filepath.name + DOWNLOAD_SUFFIX)
#: The file in `dirpath` to which data will be written as it is
#: received
self.writefile = self.dirpath / "file"
#: A `fasteners.InterProcessLock` on `dirpath`
self.lock: InterProcessLock | None = None
#: An open filehandle to `writefile`
self.fp: IO[bytes] | None = None
#: How much of the data has been downloaded so far
self.offset: int | None = None
def __enter__(self) -> DownloadDirectory:
self.dirpath.mkdir(parents=True, exist_ok=True)
self.lock = InterProcessLock(str(self.dirpath / "lock"))
if not self.lock.acquire(blocking=False):
raise RuntimeError(f"Could not acquire download lock for {self.filepath}")
chkpath = self.dirpath / "checksum"
try:
with chkpath.open() as fp:
digests = json.load(fp)
except (FileNotFoundError, ValueError):
digests = {}
matching_algs = self.digests.keys() & digests.keys()
if matching_algs and all(
self.digests[alg] == digests[alg] for alg in matching_algs
):
# Pick up where we left off, writing to the end of the file
lgr.debug(
"%s - download directory exists and has matching checksum(s) %s; resuming download",
self.dirpath,
matching_algs,
)
self.fp = self.writefile.open("ab")
else:
# Delete the file (if it even exists) and start anew
if not chkpath.exists():
lgr.debug(
"%s - no prior digests found; starting new download", self.dirpath
)
else:
lgr.debug(
"%s - download directory found, but digests do not match;"
" starting new download",
self.dirpath,
)
try:
self.writefile.unlink()
except FileNotFoundError:
pass
self.fp = self.writefile.open("wb")
with chkpath.open("w") as fp:
json.dump(self.digests, fp)
self.offset = self.fp.tell()
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
assert self.fp is not None
if exc_type is not None or exc_val is not None or exc_tb is not None:
lgr.debug(
"%s - entered __exit__ with position %d with exception: %s, %s",
self.dirpath,
self.fp.tell(),
exc_type,
exc_val,
)
else:
lgr.debug(
"%s - entered __exit__ with position %d without any exception",
self.dirpath,
self.fp.tell(),
)
self.fp.close()
try:
if exc_type is None:
try:
self.writefile.replace(self.filepath)
except (IsADirectoryError, PermissionError) as exc:
if isinstance(exc, PermissionError):
if not (
sys.platform.startswith("win") and self.filepath.is_dir()
):
raise
lgr.debug(
"Destination path %s is a directory; removing it and retrying",
self.filepath,
)
rmtree(self.filepath)
self.writefile.replace(self.filepath)
finally:
assert self.lock is not None
self.lock.release()
if exc_type is None:
rmtree(self.dirpath, ignore_errors=True)
self.lock = None
self.fp = None
self.offset = None
[docs]
def append(self, blob: bytes) -> None:
if self.fp is None:
raise ValueError(
"DownloadDirectory.append() called outside of context manager"
)
self.fp.write(blob)
def _download_zarr(
asset: BaseRemoteZarrAsset,
download_path: Path,
toplevel_path: str | Path,
existing: DownloadExisting,
lock: Lock,
jobs: int | None = None,
) -> Iterator[dict]:
# Avoid heavy import by importing within function:
from .support.digests import get_zarr_checksum
# we will collect them all while starting the download
# with the first page of entries received from the server.
entries = []
digests = {}
pc = ProgressCombiner(zarr_size=asset.size)
def digest_callback(path: str, algoname: str, d: str) -> None:
if algoname == "md5":
digests[path] = d
def downloads_gen():
for entry in asset.iterfiles():
entries.append(entry)
etag = entry.digest
assert etag.algorithm is DigestType.md5
yield pairing(
str(entry),
_download_file(
entry.get_download_file_iter(),
download_path / str(entry),
toplevel_path=toplevel_path,
size=entry.size,
mtime=entry.modified,
existing=existing,
digests={"md5": etag.value},
lock=lock,
digest_callback=partial(digest_callback, str(entry)),
),
)
pc.file_qty = len(entries)
final_out: dict | None = None
with lazy_interleave(
downloads_gen(),
onerror=FINISH_CURRENT,
max_workers=jobs or 4,
) as it:
for path, status in it:
for out in pc.feed(path, status):
if out.get("status") == "done":
final_out = out
else:
yield out
if final_out is not None:
break
else:
return
yield {"status": "deleting extra files"}
remote_paths = set(map(str, entries))
zarr_basepath = Path(download_path)
dirs = deque([zarr_basepath])
empty_dirs: deque[Path] = deque()
while dirs:
d = dirs.popleft()
is_empty = True
for p in list(d.iterdir()):
if exclude_from_zarr(p):
is_empty = False
elif (
p.is_file()
and p.relative_to(zarr_basepath).as_posix() not in remote_paths
):
try:
p.unlink()
except OSError:
is_empty = False
elif p.is_dir():
dirs.append(p)
is_empty = False
else:
is_empty = False
if is_empty and d != zarr_basepath:
empty_dirs.append(d)
while empty_dirs:
d = empty_dirs.popleft()
d.rmdir()
if d.parent != zarr_basepath and not any(d.parent.iterdir()):
empty_dirs.append(d.parent)
if "skipped" not in final_out["message"]:
zarr_checksum = asset.get_digest().value
local_checksum = get_zarr_checksum(zarr_basepath, known=digests)
if zarr_checksum != local_checksum:
msg = f"Zarr checksum: downloaded {local_checksum} != {zarr_checksum}"
yield {"checksum": "differs", "status": "error", "message": msg}
lgr.debug("%s is different: %s.", zarr_basepath, msg)
return
else:
yield {"checksum": "ok"}
lgr.debug(
"Verified that %s has correct Zarr checksum %s",
zarr_basepath,
zarr_checksum,
)
yield {"status": "done"}
def _check_attempts_and_sleep(
path: Path,
exc: requests.RequestException,
attempt: int,
attempts_allowed: int,
downloaded_in_attempt: int = 0,
) -> int:
"""
Check if we should retry the download, sleep if still allowed,
and return potentially adjusted 'attempts_allowed'
:param path: Destination of the download
:param exc: Exception raised during the last download attempt
:param attempt: The index of the last download attempt
:param attempts_allowed: The number of download attempts currently allowed
:param downloaded_in_attempt: The number of bytes downloaded in the last attempt
:returns: The number of download attempts allowed overall, potentially adjusted,
if download should be retried. It would return 0 if no more attempts allowed.
Note: If download should be retried, this function sleeps before returning.
otherwise, it returns immediately.
"""
sleep_amount: float | None = None
if os.environ.get("DANDI_DOWNLOAD_AGGRESSIVE_RETRY"):
# in such a case if we downloaded a little more --
# consider it a successful attempt
if downloaded_in_attempt > 0:
lgr.debug(
"%s - download failed on attempt #%d: %s, "
"but did download %d bytes, so considering "
"it a success and incrementing number of allowed attempts.",
path,
attempt,
exc,
downloaded_in_attempt,
)
attempts_allowed += 1
if attempt >= attempts_allowed:
# The last allowed attempt has failed,
# so there is no point of retrying or sleeping any longer - return right away
# and we would record that error as the error which caused the download to fail.
lgr.debug("%s - download failed after %d attempts: %s", path, attempt, exc)
return 0
if exc.response is not None:
if exc.response.status_code not in (
400, # Bad Request, but happened with girder:
# https://github.com/dandi/dandi-cli/issues/87
*RETRY_STATUSES,
):
lgr.debug(
"%s - download failed due to response %d: %s",
path,
exc.response.status_code,
exc,
)
return 0
sleep_amount = get_retry_after(exc.response)
if sleep_amount is None:
# it was not Retry-after set, so we come up with random duration to sleep
sleep_amount = random.random() * 5 * attempt
lgr.debug(
"%s - download failed on attempt #%d: %s, will sleep %f and retry",
path,
attempt,
exc,
sleep_amount,
)
time.sleep(sleep_amount)
return attempts_allowed
[docs]
def pairing(p: str, gen: Iterator[dict]) -> Iterator[tuple[str, dict]]:
for d in gen:
yield (p, d)
DLState = Enum("DLState", "STARTING DOWNLOADING SKIPPED ERROR DONE")
[docs]
@dataclass
class DownloadProgress:
downloaded: int = 0
size: int | None = None
[docs]
@dataclass
class ProgressCombiner:
zarr_size: int
file_qty: int | None = (
None # set to specific known value whenever full sweep is complete
)
downloading: dict[str, DownloadProgress] = field(default_factory=dict)
#: The total number of bytes downloaded so far, including all files
#: currently downloading, skipped, or finished downloading (even if
#: the checksum check failed)
total_downloaded: int = 0
#: Total size of all files that were not skipped and did not error out
#: during download
maxsize: int = 0
prev_status: str = ""
yielded_size: bool = False
file_states: Counter[DLState] = field(default_factory=Counter)
files_fed: int = 0
[docs]
def get_done(self) -> dict:
return {
"done": self.total_downloaded,
"done%": (
self.total_downloaded / self.zarr_size * 100 if self.zarr_size else 0
),
}
[docs]
def get_status(self, report_done: bool = True) -> dict:
if (
self.file_qty is not None # if already known
and self.files_fed == self.file_qty
and self.file_states[DLState.STARTING]
== self.file_states[DLState.DOWNLOADING]
== 0
):
# All files have finished
if self.file_states[DLState.ERROR]:
new_status = "error"
elif self.file_states[DLState.DONE]:
new_status = "done"
else:
new_status = "skipped"
elif (
self.files_fed
- self.file_states[DLState.STARTING]
- self.file_states[DLState.SKIPPED]
> 0
):
new_status = "downloading"
else:
new_status = ""
statusdict = {}
if report_done:
msg_comps = []
for msg_label, state in [
("done", DLState.DONE),
("errored", DLState.ERROR),
("skipped", DLState.SKIPPED),
]:
if count := self.file_states[state]:
msg_comps.append(f"{count} {msg_label}")
if msg_comps:
statusdict["message"] = ", ".join(msg_comps)
if new_status != self.prev_status:
self.prev_status = statusdict["status"] = new_status
if report_done and self.zarr_size:
statusdict.update(self.get_done())
return statusdict
[docs]
def feed(self, path: str, status: dict) -> Iterator[dict]:
keys = list(status.keys())
size = status.get("size")
if size is not None:
if not self.yielded_size:
# this thread will yield
self.yielded_size = True
yield {"size": self.zarr_size}
if status.get("status") == "skipped":
self.files_fed += 1
self.file_states[DLState.SKIPPED] += 1
if path in self.downloading:
lgr.debug(
"We were downloading %s, which we just skipped -- must not happen",
path,
)
# To avoid double-accounting etc.
self.total_downloaded -= self.downloading.pop(path).downloaded
# Treat skipped as "downloaded" for the matter of accounting
if size is not None:
self.total_downloaded += size
self.maxsize += size
yield self.get_status()
elif keys == ["size"]:
self.files_fed += 1
self.file_states[DLState.STARTING] += 1
assert size is not None
assert path not in self.downloading
self.downloading[path] = DownloadProgress(size=size)
self.maxsize += size
if self.file_states[DLState.DOWNLOADING]:
yield self.get_done()
elif status == {"status": "downloading"}:
self.file_states[DLState.DOWNLOADING] += 1
if path not in self.downloading:
self.files_fed += 1
self.downloading[path] = DownloadProgress()
else:
self.file_states[DLState.STARTING] -= 1
if out := self.get_status(report_done=False):
yield out
elif "done" in status:
prev_done = self.downloading[path].downloaded
self.total_downloaded += status["done"] - prev_done
self.downloading[path].downloaded = status["done"]
yield self.get_done()
elif status.get("status") == "error":
self.file_states[DLState.DOWNLOADING] -= 1
self.file_states[DLState.ERROR] += 1
progress = self.downloading.pop(path)
if "checksum" not in status:
if progress.size is not None:
self.maxsize -= progress.size
self.total_downloaded -= progress.downloaded
yield self.get_status()
elif keys == ["checksum"]:
pass
elif status == {"status": "setting mtime"}:
pass
elif status == {"status": "done"}:
del self.downloading[path]
self.file_states[DLState.DOWNLOADING] -= 1
self.file_states[DLState.DONE] += 1
yield self.get_status()
else:
lgr.warning(
"Unexpected download status dict received for %r: %r", path, status
)