from __future__ import annotations
from collections import defaultdict
from collections.abc import Iterator, Sequence
from contextlib import ExitStack
from enum import Enum
from functools import reduce
import io
import os.path
from pathlib import Path
import re
import time
from time import sleep
from typing import Any, TypedDict, cast
from unittest.mock import patch
import click
from packaging.version import Version
from . import __version__, lgr
from .consts import (
DRAFT,
DandiInstance,
dandiset_identifier_regex,
dandiset_metadata_file,
)
from .dandiapi import DandiAPIClient, RemoteAsset
from .dandiset import Dandiset
from .exceptions import NotFoundError, UploadError
from .files import (
DandiFile,
DandisetMetadataFile,
LocalAsset,
LocalDirectoryAsset,
ZarrAsset,
)
from .misctypes import Digest
from .support import pyout as pyouts
from .support.pyout import naturalsize
from .utils import ensure_datetime, path_is_subpath, pluralize
from .validate_types import Severity
[docs]
class Uploaded(TypedDict):
size: int
errors: list[str]
[docs]
class UploadExisting(str, Enum):
ERROR = "error"
SKIP = "skip"
FORCE = "force"
OVERWRITE = "overwrite"
REFRESH = "refresh"
def __str__(self) -> str:
return self.value
[docs]
class UploadValidation(str, Enum):
REQUIRE = "require"
SKIP = "skip"
IGNORE = "ignore"
def __str__(self) -> str:
return self.value
[docs]
def upload(
paths: Sequence[str | Path] | None = None,
existing: UploadExisting = UploadExisting.REFRESH,
validation: UploadValidation = UploadValidation.REQUIRE,
dandi_instance: str | DandiInstance = "dandi",
allow_any_path: bool = False,
upload_dandiset_metadata: bool = False,
devel_debug: bool = False,
jobs: int | None = None,
jobs_per_file: int | None = None,
sync: bool = False,
) -> None:
if paths:
paths = [Path(p).absolute() for p in paths]
dandiset = Dandiset.find(os.path.commonpath(paths))
else:
dandiset = Dandiset.find(None)
if dandiset is None:
raise RuntimeError(
f"Found no {dandiset_metadata_file} anywhere in common ancestor of"
" paths. Use 'dandi download' or 'organize' first."
)
with ExitStack() as stack:
# We need to use the client as a context manager in order to ensure the
# session gets properly closed. Otherwise, pytest sometimes complains
# under obscure conditions.
client = stack.enter_context(DandiAPIClient.for_dandi_instance(dandi_instance))
client.check_schema_version()
client.dandi_authenticate()
if os.environ.get("DANDI_DEVEL_INSTRUMENT_REQUESTS_SUPERLEN"):
from requests.utils import super_len
def check_len(obj: io.IOBase, name: Any) -> None:
first = True
for i in range(10):
if first:
first = False
else:
lgr.debug("- Will sleep and then stat %r again", name)
sleep(1)
try:
st = os.stat(name)
except OSError as e:
lgr.debug(
"- Attempt to stat %r failed with %s: %s",
name,
type(e).__name__,
e,
)
stat_size = None
else:
lgr.debug("- stat(%r) = %r", name, st)
stat_size = st.st_size
try:
fileno = obj.fileno()
except Exception:
lgr.debug(
"- I/O object for %r has no fileno; cannot fstat", name
)
fstat_size = None
else:
try:
st = os.fstat(fileno)
except OSError as e:
lgr.debug(
"- Attempt to fstat %r failed with %s: %s",
name,
type(e).__name__,
e,
)
fstat_size = None
else:
lgr.debug("- fstat(%r) = %r", name, st)
fstat_size = st.st_size
if stat_size not in (None, 0):
raise RuntimeError(
f"requests.utils.super_len() reported size of 0 for"
f" {name!r}, but os.stat() reported size"
f" {stat_size} bytes {i + 1} tries later"
)
if fstat_size not in (None, 0):
raise RuntimeError(
f"requests.utils.super_len() reported size of 0 for"
f" {name!r}, but os.fstat() reported size"
f" {fstat_size} bytes {i + 1} tries later"
)
lgr.debug(
"- Size of %r still appears to be 0 after 10 rounds of"
" stat'ing; returning size 0",
name,
)
def new_super_len(o: Any) -> int:
try:
n = super_len(o)
except Exception:
lgr.debug(
"requests.utils.super_len() failed on %r:", o, exc_info=True
)
raise
else:
lgr.debug("requests.utils.super_len() reported %d for %r", n, o)
if (
n == 0
and isinstance(o, io.IOBase)
and (name := getattr(o, "name", None)) not in (None, "")
):
lgr.debug(
"- Size of 0 is suspicious; double-checking that NFS isn't lying"
)
check_len(o, name)
return cast(int, n)
stack.enter_context(patch("requests.models.super_len", new_super_len))
ds_identifier = dandiset.identifier
remote_dandiset = client.get_dandiset(ds_identifier, DRAFT)
if not re.match(dandiset_identifier_regex, str(ds_identifier)):
raise ValueError(
f"Dandiset identifier {ds_identifier} does not follow expected "
f"convention {dandiset_identifier_regex!r}."
)
# Avoid heavy import by importing within function:
from .pynwb_utils import ignore_benign_pynwb_warnings
ignore_benign_pynwb_warnings() # so validate doesn't whine
if not paths:
paths = [dandiset.path]
# DO NOT FACTOR OUT THIS VARIABLE! It stores any
# BIDSDatasetDescriptionAsset instances for the Dandiset, which need to
# remain alive until we're done working with all BIDS assets.
assets = dandiset.assets(allow_all=allow_any_path)
dandi_files: list[DandiFile] = []
# Build the list step by step so as not to confuse mypy
dandi_files.append(dandiset.metadata_file())
dandi_files.extend(
assets.under_paths(Path(p).relative_to(dandiset.path) for p in paths)
)
lgr.info(f"Found {len(dandi_files)} files to consider")
# We will keep a shared set of "being processed" paths so
# we could limit the number of them until
# https://github.com/pyout/pyout/issues/87
# properly addressed
process_paths: set[str] = set()
uploaded_paths: dict[str, Uploaded] = defaultdict(
lambda: {"size": 0, "errors": []}
)
upload_err: Exception | None = None
validate_ok = True
# TODO: we might want to always yield a full record so no field is not
# provided to pyout to cause it to halt
def process_path(dfile: DandiFile) -> Iterator[dict]:
"""
Parameters
----------
dfile: DandiFile
Yields
------
dict
Records for pyout
"""
nonlocal upload_err, validate_ok
strpath = str(dfile.filepath)
try:
if not isinstance(dfile, LocalDirectoryAsset):
try:
yield {"size": dfile.size}
except FileNotFoundError:
raise UploadError("File not found")
except Exception as exc:
# without limiting [:50] it might cause some pyout indigestion
raise UploadError(str(exc)[:50])
#
# Validate first, so we do not bother server at all if not kosher
#
# TODO: enable back validation of dandiset.yaml
if (
isinstance(dfile, LocalAsset)
and validation != UploadValidation.SKIP
):
yield {"status": "pre-validating"}
validation_statuses = dfile.get_validation_errors()
validation_errors = [
s for s in validation_statuses if s.severity == Severity.ERROR
]
yield {"errors": len(validation_errors)}
# TODO: split for dandi, pynwb errors
if validation_errors:
if validation is UploadValidation.REQUIRE:
lgr.warning(
"%r had %d validation errors preventing its upload:",
strpath,
len(validation_errors),
)
for i, e in enumerate(validation_errors, start=1):
lgr.warning(" Error %d: %s", i, e)
validate_ok = False
raise UploadError("failed validation")
else:
yield {"status": "validated"}
else:
# yielding empty causes pyout to get stuck or crash
# https://github.com/pyout/pyout/issues/91
# yield {"errors": '',}
pass
#
# Special handling for dandiset.yaml
# Yarik hates it but that is life for now. TODO
#
if isinstance(dfile, DandisetMetadataFile):
# TODO This is a temporary measure to avoid breaking web UI
# dandiset metadata schema assumptions. All edits should happen
# online.
if upload_dandiset_metadata:
yield {"status": "updating metadata"}
assert dandiset is not None
assert dandiset.metadata is not None
remote_dandiset.set_raw_metadata(dandiset.metadata)
yield {"status": "updated metadata"}
else:
yield skip_file("should be edited online")
return
assert isinstance(dfile, LocalAsset)
#
# Compute checksums
#
file_etag: Digest | None
if isinstance(dfile, ZarrAsset):
file_etag = None
else:
yield {"status": "digesting"}
try:
file_etag = dfile.get_digest()
except Exception as exc:
raise UploadError("failed to compute digest: %s" % str(exc))
try:
extant = remote_dandiset.get_asset_by_path(dfile.path)
except NotFoundError:
extant = None
else:
assert extant is not None
replace, out = check_replace_asset(
local_asset=dfile,
remote_asset=extant,
existing=existing,
local_etag=file_etag,
)
yield out
if not replace:
return
#
# Extract metadata - delayed since takes time, but is done before
# actual upload, so we could skip if this fails
#
# Extract metadata before actual upload and skip if fails
# TODO: allow for for non-nwb files to skip this step
# ad-hoc for dandiset.yaml for now
yield {"status": "extracting metadata"}
try:
metadata = dfile.get_metadata(
digest=file_etag, ignore_errors=allow_any_path
).model_dump(mode="json", exclude_none=True)
except Exception as e:
raise UploadError("failed to extract metadata: %s" % str(e))
#
# Upload file
#
yield {"status": "uploading"}
validating = False
for r in dfile.iter_upload(
remote_dandiset, metadata, jobs=jobs_per_file, replacing=extant
):
r.pop("asset", None) # to keep pyout from choking
if r["status"] == "uploading":
uploaded_paths[strpath]["size"] = r.pop("current")
yield r
elif r["status"] == "post-validating":
# Only yield the first "post-validating" status
if not validating:
yield r
validating = True
else:
yield r
yield {"status": "done"}
except Exception as exc:
if upload_err is None:
upload_err = exc
if devel_debug:
raise
lgr.exception("Error uploading %s:", strpath)
# Custom formatting for some exceptions we know to extract
# user-meaningful message
message = str(exc)
uploaded_paths[strpath]["errors"].append(message)
yield error_file(message)
finally:
process_paths.remove(strpath)
# We will again use pyout to provide a neat table summarizing our progress
# with upload etc
# for the upload speeds we need to provide a custom aggregate
t0 = time.time()
def upload_agg(*ignored: Any) -> str:
dt = time.time() - t0
# to help avoiding dict length changes during upload
# might be not a proper solution
# see https://github.com/dandi/dandi-cli/issues/502 for more info
uploaded_recs = list(uploaded_paths.values())
total = sum(v["size"] for v in uploaded_recs)
if not total:
return ""
speed = total / dt if dt else 0
return "%s/s" % naturalsize(speed)
pyout_style = pyouts.get_style(hide_if_missing=False)
pyout_style["progress"]["aggregate"] = upload_agg
rec_fields = ["path", "size", "errors", "progress", "status", "message"]
out = pyouts.LogSafeTabular(
style=pyout_style, columns=rec_fields, max_workers=jobs or 5
)
with out:
for dfile in dandi_files:
while len(process_paths) >= 10:
lgr.log(2, "Sleep waiting for some paths to finish processing")
time.sleep(0.5)
process_paths.add(str(dfile.filepath))
rec: dict[Any, Any]
if isinstance(dfile, DandisetMetadataFile):
rec = {"path": dandiset_metadata_file}
else:
assert isinstance(dfile, LocalAsset)
rec = {"path": dfile.path}
try:
if devel_debug:
# DEBUG: do serially
for v in process_path(dfile):
print(str(v), flush=True)
else:
rec[tuple(rec_fields[1:])] = process_path(dfile)
except ValueError as exc:
rec.update(error_file(exc))
out(rec)
if not validate_ok:
lgr.warning(
"One or more assets failed validation. Consult the logfile for"
" details."
)
if upload_err is not None:
try:
import etelemetry
latest_version = etelemetry.get_project("dandi/dandi-cli")["version"]
except Exception:
pass
else:
if Version(latest_version) > Version(__version__):
lgr.warning(
"Upload failed, and you are not using the latest"
" version of dandi. We suggest upgrading dandi to v%s"
" and trying again.",
latest_version,
)
raise upload_err
if sync:
relpaths: list[str] = []
for p in paths:
rp = os.path.relpath(p, dandiset.path)
relpaths.append("" if rp == "." else rp)
path_prefix = reduce(os.path.commonprefix, relpaths) # type: ignore[arg-type]
to_delete = []
for asset in remote_dandiset.get_assets_with_path_prefix(path_prefix):
if any(
p == "" or path_is_subpath(asset.path, p) for p in relpaths
) and not os.path.lexists(Path(dandiset.path, asset.path)):
to_delete.append(asset)
if to_delete and click.confirm(
f"Delete {pluralize(len(to_delete), 'asset')} on server?"
):
for asset in to_delete:
asset.delete()
[docs]
def check_replace_asset(
local_asset: LocalAsset,
remote_asset: RemoteAsset,
existing: UploadExisting,
local_etag: Digest | None,
) -> tuple[bool, dict[str, str]]:
# Returns a (replace asset, message to yield) tuple
if isinstance(local_asset, ZarrAsset):
return (True, {"message": "exists - reuploading"})
assert local_etag is not None
metadata = remote_asset.get_raw_metadata()
local_mtime = local_asset.modified
remote_mtime_str = metadata.get("blobDateModified")
# TODO: Should this error if the digest is missing?
remote_etag = metadata.get("digest", {}).get(local_etag.algorithm.value)
if remote_mtime_str is not None:
remote_mtime = ensure_datetime(remote_mtime_str)
remote_file_status = (
"same"
if remote_etag == local_etag.value and remote_mtime == local_mtime
else (
"newer"
if remote_mtime > local_mtime
else ("older" if remote_mtime < local_mtime else "diff")
)
)
else:
remote_mtime = None
remote_file_status = "no mtime"
exists_msg = f"exists ({remote_file_status})"
if existing is UploadExisting.ERROR:
# as promised -- not gentle at all!
raise FileExistsError(exists_msg)
if existing is UploadExisting.SKIP:
return (False, skip_file(exists_msg))
# Logic below only for overwrite and reupload
if existing is UploadExisting.OVERWRITE:
if remote_etag == local_etag.value:
return (False, skip_file(exists_msg))
elif existing is UploadExisting.REFRESH:
if remote_etag == local_etag.value:
return (False, skip_file("file exists"))
elif remote_mtime is not None and remote_mtime >= local_mtime:
return (False, skip_file(exists_msg))
elif existing is UploadExisting.FORCE:
pass
else:
raise AssertionError(f"Unhandled UploadExisting member: {existing!r}")
return (True, {"message": f"{exists_msg} - reuploading"})
[docs]
def skip_file(msg: Any) -> dict[str, str]:
return {"status": "skipped", "message": str(msg)}
[docs]
def error_file(msg: Any) -> dict[str, str]:
return {"status": "ERROR", "message": str(msg)}