Source code for dandi.files.zarr

from __future__ import annotations

from base64 import b64encode
from collections import Counter
from collections.abc import Generator, Iterator
from concurrent.futures import Future, ThreadPoolExecutor, as_completed
from contextlib import closing
from dataclasses import dataclass, field, replace
from datetime import datetime
from enum import Enum
import json
import math
import os
import os.path
from pathlib import Path
import random
from time import sleep
from typing import Any, Optional
import urllib.parse

from dandischema.models import BareAsset, DigestType
from pydantic import BaseModel, ConfigDict, ValidationError
import requests
from zarr_checksum.tree import ZarrChecksumTree

from dandi import __version__ as dandi_version
from dandi import get_logger
from dandi.consts import (
    MAX_ZARR_DEPTH,
    S3_MAX_SINGLE_PART_UPLOAD,
    ZARR_DELETE_BATCH_SIZE,
    ZARR_MIME_TYPE,
    ZARR_UPLOAD_BATCH_SIZE,
)
from dandi.dandiapi import (
    RemoteAsset,
    RemoteDandiset,
    RemoteZarrAsset,
    RemoteZarrEntry,
    RESTFullAPIClient,
)
from dandi.exceptions import UploadError
from dandi.metadata.core import get_default_metadata
from dandi.misctypes import DUMMY_DANDI_ZARR_CHECKSUM, BasePath, Digest
from dandi.utils import (
    chunked,
    exclude_from_zarr,
    pluralize,
    post_upload_size_check,
    pre_upload_size_check,
)

from .bases import LocalDirectoryAsset
from ..validate._types import (
    ORIGIN_VALIDATION_DANDI_ZARR,
    MissingFileContent,
    Origin,
    OriginType,
    Scope,
    Severity,
    Standard,
    ValidationResult,
    Validator,
)

lgr = get_logger()


class _Zarr3Metadata(BaseModel):
    """
    Metadata for Zarr format V3 stored in the zarr.json file

    Note
    ----
        This will not be needed once the upgrade to zarr-python 3.x is done and
        should be removed.
    """

    node_type: str

    model_config = ConfigDict(strict=True)


def get_zarr_format_version(path: Path) -> Optional[str]:
    """
    Get the Zarr format version from a Zarr object, a Zarr group or array

    Parameters
    ----------
    path : The path to the store of the Zarr object in the filesystem

    Returns
    -------
    str
        The Zarr format version, https://zarr-specs.readthedocs.io/en/latest/specs.html,
        the Zarr object conforms to if it can be determined, otherwise None

    Note
    ----
        Currently, this function can only handle Zarr objects that have a storage of
        `zarr.storage.LocalStore` in zarr-python 3.x or `zarr.storage.DirectoryStore`
        in zarr-python 2.x. For Zarr objects that have a different storage, this
        function will return None. Upgrading to zarr-python 3.x will eliminate this
        limitation.

        This function is currently implemented by "manually" reading the content
        of a Zarr store. Once, upgrade to zarr-python 3.x is done, we can use the
        zarr.open() method to obtain the Zarr object from its store that has an `info`
        attribute that contains the Zarr format version.
    """

    if not path.is_dir():
        return None

    if (path / "zarr.json").is_file():
        # Zarr format V3
        return "3"
    if (path / ".zgroup").is_file() or (path / ".zarray").is_file():
        # Zarr format V2
        return "2"

    return None


def _ts_validate_zarr3(path: Path, devel_debug: bool = False) -> list[ValidationResult]:
    """
    Validate a Zarr format V3 LocalStore with the tensorstore package

    Parameters
    ----------
    path : The path to the Zarr format V3 LocalStore in the filesystem
    devel_debug : bool
        If True, re-raise an exception instead of returning it packaged in a
        `ValidationResult` object

    Returns
    -------
    list[ValidationResult]
        A list of validation results representing validation errors encountered

    Raises
    -------
    ValueError
        If the path is not a directory


    Note
    ----
        Since tensorstore does not support the concept of a Zarr group, this function
        validates a Zarr format V3 LocalStore by opening all the contained arrays with
        tensorstore individually.

        This function will no longer be needed once the upgrade to zarr-python 3.x is
        done and should be removed.
    """

    if not path.is_dir():
        raise ValueError(f"Path {path} is not a directory")

    meta_fname = "zarr.json"

    results: list[ValidationResult] = []

    root_meta_path = path / meta_fname
    if not root_meta_path.is_file():
        # meta file doesn't exist in the LocalStore
        results.append(
            ValidationResult(
                id="zarr.missing_zarr_json",
                origin=Origin(
                    type=OriginType.VALIDATION,
                    validator=Validator.dandi_zarr,
                    validator_version=dandi_version,
                    standard=Standard.ZARR,
                    standard_version="3",
                ),
                scope=Scope.FILE,
                severity=Severity.ERROR,
                message=f"Zarr format V3 LocalStore at {path} is missing the zarr.json "
                f"file",
                path=path,
            )
        )

    for root, dirs, files in os.walk(path):
        if meta_fname in files:
            meta_path = Path(root) / meta_fname
            meta_text = meta_path.read_text()
            try:
                meta = _Zarr3Metadata.model_validate_json(meta_text)
            except ValidationError as e:
                if devel_debug:
                    raise
                results.append(
                    ValidationResult(
                        id="zarr.invalid_zarr_json",
                        origin=Origin(
                            type=OriginType.VALIDATION,
                            validator=Validator.dandi_zarr,
                            validator_version=dandi_version,
                            standard=Standard.ZARR,
                            standard_version="3",
                        ),
                        scope=Scope.FILE,
                        origin_result=e,
                        severity=Severity.ERROR,
                        message="Invalid zarr.json file",
                        path=meta_path,
                    )
                )
            else:
                # Check if the directory is a Zarr array
                if meta.node_type == "array":
                    results.extend(_ts_validate_zarr3_array(Path(root), devel_debug))
                    dirs.clear()  # Skip subdirectories

    return results


def _ts_validate_zarr3_array(
    path: Path, devel_debug: bool = False
) -> list[ValidationResult]:
    """
    Validate a Zarr format V3 array in a LocalStore with the tensorstore package

    Parameters
    ----------
    path : The path to the Zarr format V3 array in the filesystem
    devel_debug : bool
        If True, re-raise an exception instead of returning it packaged in a
        `ValidationResult` object

    Returns
    -------
    list[ValidationResult]
        A list of validation results representing validation errors encountered

    Note
    ----
        This function will no longer be needed once the upgrade to zarr-python 3.x is
        done and should be removed.
    """
    # Avoid heavy import by importing within function
    from importlib.metadata import version

    import tensorstore as ts  # type: ignore[import]

    results: list[ValidationResult] = []

    # TensorStore spec describing where and how to read the Zarr array
    spec = {"driver": "zarr3", "kvstore": {"driver": "file", "path": str(path)}}

    try:
        ts.open(spec, read=True, write=False).result()
    except Exception as e:
        if devel_debug:
            raise
        results.append(
            ValidationResult(
                id="zarr.tensorstore_cannot_open",
                origin=Origin(
                    type=OriginType.INTERNAL,
                    validator=Validator.tensorstore,
                    validator_version=version("tensorstore"),
                    standard=Standard.ZARR,
                    standard_version="3",
                ),
                scope=Scope.FILE,
                origin_result=e,
                severity=Severity.ERROR,
                message="Error opening Zarr array with tensorstore",
                path=path,
            )
        )

    return results


def _is_empty_group(group: Any) -> bool:
    """
    Return whether a `zarr.Group` has no child arrays or groups.

    In zarr-python 2.x ``bool(group)`` / ``len(group)`` is a cheap check, but in
    zarr-python 3.x both eagerly iterate the group's members, which raises if
    the underlying directory contains non-Zarr files (e.g. an arbitrary
    ``a/b/c/...`` tree) or arrays with metadata that the new library can't
    parse (e.g. legacy ``>u1`` dtypes). Treat any such failure as "not empty"
    — there is *some* content there, even if it's not a valid Zarr member —
    so we don't spuriously report ``dandi_zarr.empty_group``.
    """
    try:
        return len(group) == 0
    except Exception as exc:
        lgr.debug("Could not determine emptiness of Zarr group %r: %s", group, exc)
        return False


[docs] @dataclass class LocalZarrEntry(BasePath): """A file or directory within a `ZarrAsset`""" #: The path to the root of the Zarr file tree zarr_basepath: Path @property def filepath(self) -> Path: """The path to the actual file or directory on disk""" return self.zarr_basepath.joinpath(*self.parts) def _get_subpath(self, name: str) -> LocalZarrEntry: if not name or "/" in name: raise ValueError(f"Invalid path component: {name!r}") elif name == ".": return self elif name == "..": return self.parent else: return replace(self, parts=self.parts + (name,)) @property def parent(self) -> LocalZarrEntry: if self.is_root(): return self else: return replace(self, parts=self.parts[:-1])
[docs] def exists(self) -> bool: return os.path.lexists(self.filepath)
[docs] def is_file(self) -> bool: return self.filepath.is_file()
[docs] def is_dir(self) -> bool: return self.filepath.is_dir()
[docs] def iterdir(self) -> Iterator[LocalZarrEntry]: for p in self.filepath.iterdir(): if exclude_from_zarr(p): continue if p.is_dir() and not any(p.iterdir()): # Ignore empty directories continue yield self._get_subpath(p.name)
[docs] def get_digest(self) -> Digest: """ Calculate the DANDI etag digest for the entry. If the entry is a directory, the algorithm will be the DANDI Zarr checksum algorithm; if it is a file, it will be MD5. """ # Avoid heavy import by importing within function: from dandi.support.digests import get_digest, get_zarr_checksum if self.is_dir(): return Digest.dandi_zarr(get_zarr_checksum(self.filepath)) else: return Digest( algorithm=DigestType.md5, value=get_digest(self.filepath, "md5") )
@property def size(self) -> int: """ The size of the entry. For a directory, this is the total size of all entries within it. """ if self.is_dir(): return sum(p.size for p in self.iterdir()) else: return os.path.getsize(self.filepath) @property def modified(self) -> datetime: """The time at which the entry was last modified""" # TODO: Should this be overridden for directories? return datetime.fromtimestamp(self.filepath.stat().st_mtime).astimezone()
[docs] @dataclass class ZarrStat: """Details about a Zarr asset""" #: The total size of the asset size: int #: The DANDI Zarr checksum of the asset digest: Digest #: A list of all files in the asset in unspecified order files: list[LocalZarrEntry]
class UploadStatus(Enum): SUCCESS = "success" RETRY_NEEDED = "retry_needed" # 403 error - need new URL FAILED = "failed" # Other error - don't retry @dataclass class UploadResult: """Result of a single file upload attempt""" item: UploadItem status: UploadStatus size: int = 0 error: Exception | None = None
[docs] class ZarrAsset(LocalDirectoryAsset[LocalZarrEntry]): """Representation of a local Zarr directory""" _DUMMY_DIGEST = DUMMY_DANDI_ZARR_CHECKSUM @property def filetree(self) -> LocalZarrEntry: """ The `LocalZarrEntry` for the root of the hierarchy of files within the Zarr asset """ return LocalZarrEntry(zarr_basepath=self.filepath, parts=())
[docs] def stat(self) -> ZarrStat: """Return various details about the Zarr asset""" def dirstat(dirpath: LocalZarrEntry) -> ZarrStat: # Avoid heavy import by importing within function: from dandi.support.digests import checksum_zarr_dir, md5file_nocache size = 0 dir_info = {} file_info = {} files = [] for p in dirpath.iterdir(): if p.is_dir(): st = dirstat(p) size += st.size dir_info[p.name] = (st.digest.value, st.size) files.extend(st.files) else: size += p.size file_info[p.name] = (md5file_nocache(p.filepath), p.size) files.append(p) return ZarrStat( size=size, digest=Digest.dandi_zarr(checksum_zarr_dir(file_info, dir_info)), files=files, ) return dirstat(self.filetree)
[docs] def get_digest(self) -> Digest: """Calculate a dandi-zarr-checksum digest for the asset""" # Avoid heavy import by importing within function: from dandi.support.digests import get_zarr_checksum return Digest.dandi_zarr(get_zarr_checksum(self.filepath))
[docs] def get_metadata( self, digest: Digest | None = None, ignore_errors: bool = True, ) -> BareAsset: metadata = get_default_metadata(self.filepath, digest=digest) metadata.encodingFormat = ZARR_MIME_TYPE metadata.path = self.path return metadata
[docs] def get_validation_errors( self, schema_version: str | None = None, devel_debug: bool = False, missing_file_content: MissingFileContent | None = None, ) -> list[ValidationResult]: # Avoid heavy import by importing within function: import zarr errors: list[ValidationResult] = [] origin_internal_zarr: Origin = Origin( type=OriginType.INTERNAL, validator=Validator.zarr, validator_version=zarr.__version__, standard=Standard.ZARR, ) # Zarr V3 stores are validated by `_ts_validate_zarr3` regardless of # zarr-python version. With zarr-python 2.x, `zarr.open()` cannot # handle V3 at all (raises `PathNotFoundError`); with zarr-python 3.x # it can sometimes partially open malformed V3 stores (e.g. valid # root metadata, bad child metadata), which would cause us to miss # the `_ts_validate_zarr3` path. Detecting the format up front keeps # the validation behaviour consistent across versions. format_version = get_zarr_format_version(self.filepath) if format_version == "3": errors.extend(_ts_validate_zarr3(self.filepath, devel_debug)) data = None else: try: data = zarr.open(str(self.filepath), mode="r") except Exception as e: if devel_debug: raise message = ( "Error opening file and Zarr format cannot be determined" if format_version is None else "Error opening file." ) errors.append( ValidationResult( id="zarr.cannot_open", origin=origin_internal_zarr, scope=Scope.FILE, origin_result=e, severity=Severity.ERROR, message=message, path=self.filepath, ) ) data = None if isinstance(data, zarr.Group) and _is_empty_group(data): errors.append( ValidationResult( origin=ORIGIN_VALIDATION_DANDI_ZARR, severity=Severity.ERROR, id="dandi_zarr.empty_group", scope=Scope.FILE, path=self.filepath, message="Zarr group is empty.", ) ) if self._is_too_deep(): msg = f"Zarr directory tree more than {MAX_ZARR_DEPTH} directories deep" if devel_debug: raise ValueError(msg) errors.append( ValidationResult( origin=ORIGIN_VALIDATION_DANDI_ZARR, severity=Severity.ERROR, id="dandi_zarr.tree_depth_exceeded", scope=Scope.FILE, path=self.filepath, message=msg, ) ) return errors + super().get_validation_errors( schema_version=schema_version, devel_debug=devel_debug, missing_file_content=missing_file_content, )
def _is_too_deep(self) -> bool: for e in self.iterfiles(): if len(e.parts) >= MAX_ZARR_DEPTH + 1: return True return False
[docs] def iter_upload( self, dandiset: RemoteDandiset, metadata: dict[str, Any], jobs: int | None = None, replacing: RemoteAsset | None = None, ) -> Iterator[dict]: """ Upload the Zarr directory as an asset with the given metadata to the given Dandiset, returning a generator of status `dict`\\s. :param RemoteDandiset dandiset: the Dandiset to which the Zarr will be uploaded :param dict metadata: Metadata for the uploaded asset. The "path" field will be set to the value of the instance's ``path`` attribute if no such field is already present. :param int jobs: Number of threads to use for uploading; defaults to 5 :param RemoteAsset replacing: If set, replace the given asset, which must have the same path as the new asset; if the old asset is a Zarr, the Zarr will be updated & reused for the new asset :returns: A generator of `dict`\\s containing at least a ``"status"`` key. Upon successful upload, the last `dict` will have a status of ``"done"`` and an ``"asset"`` key containing the resulting `RemoteAsset`. """ asset_path = metadata.setdefault("path", self.path) client = dandiset.client lgr.debug("%s: Producing asset", asset_path) yield {"status": "producing asset"} def mkzarr() -> str: try: r = client.post( "/zarr/", json={"name": asset_path, "dandiset": dandiset.identifier}, ) except requests.HTTPError as e: if e.response is not None and ( "Zarr already exists" in e.response.text or "dandiset, name must make a unique set" in e.response.text ): lgr.warning( "%s: Found pre-existing Zarr at same path not" " associated with any asset; reusing", asset_path, ) (old_zarr,) = client.paginate( "/zarr/", params={ "dandiset": dandiset.identifier, "name": asset_path, }, ) zarr_id = old_zarr["zarr_id"] else: raise else: zarr_id = r["zarr_id"] assert isinstance(zarr_id, str) return zarr_id if replacing is not None: lgr.debug("%s: Replacing pre-existing asset", asset_path) if isinstance(replacing, RemoteZarrAsset): lgr.debug( "%s: Pre-existing asset is a Zarr; reusing & updating", asset_path ) zarr_id = replacing.zarr else: lgr.debug( "%s: Pre-existing asset is not a Zarr; minting new Zarr", asset_path ) zarr_id = mkzarr() r = client.put( replacing.api_path, json={"metadata": metadata, "zarr_id": zarr_id}, ) else: lgr.debug("%s: Minting new Zarr", asset_path) zarr_id = mkzarr() r = client.post( f"{dandiset.version_api_path}assets/", json={"metadata": metadata, "zarr_id": zarr_id}, ) a = RemoteAsset.from_data(dandiset, r) assert isinstance(a, RemoteZarrAsset) mismatched = True first_run = True while mismatched: zcc = ZarrChecksumTree() old_zarr_entries: dict[str, RemoteZarrEntry] = { str(e): e for e in a.iterfiles() } total_size = 0 to_upload = EntryUploadTracker() if old_zarr_entries: to_delete: list[RemoteZarrEntry] = [] digesting: list[Future[tuple[LocalZarrEntry, str, bool]]] = [] yield {"status": "comparing against remote Zarr"} with ThreadPoolExecutor(max_workers=jobs or 5) as executor: for local_entry in self.iterfiles(): total_size += local_entry.size try: remote_entry = old_zarr_entries.pop(str(local_entry)) except KeyError: for pp in local_entry.parents: pps = str(pp) if pps in old_zarr_entries: lgr.debug( "%s: Parent path %s of file %s" " corresponds to a remote file;" " deleting remote", asset_path, pps, local_entry, ) to_delete.append(old_zarr_entries.pop(pps)) break else: eprefix = str(local_entry) + "/" sub_e = [ (k, v) for k, v in old_zarr_entries.items() if k.startswith(eprefix) ] if sub_e: lgr.debug( "%s: Path %s of local file is a directory" " in remote Zarr; deleting remote", asset_path, local_entry, ) for k, v in sub_e: old_zarr_entries.pop(k) to_delete.append(v) lgr.debug( "%s: Path %s not present in remote Zarr; uploading", asset_path, local_entry, ) to_upload.register(local_entry) else: digesting.append( executor.submit( _cmp_digests, asset_path, local_entry, remote_entry.digest.value, ) ) for dgstfut in as_completed(digesting): try: item = dgstfut.result() except Exception: for d in digesting: d.cancel() raise else: local_entry, local_digest, differs = item if differs: to_upload.register(local_entry, local_digest) else: zcc.add_leaf( Path(str(local_entry)), local_entry.size, local_digest, ) if to_delete: yield from _rmfiles( asset=a, entries=to_delete, status="deleting conflicting remote files", ) else: yield {"status": "traversing local Zarr"} for local_entry in self.iterfiles(): total_size += local_entry.size to_upload.register(local_entry) yield {"status": "initiating upload", "size": total_size} lgr.debug("%s: Beginning upload", asset_path) changed = False with ( RESTFullAPIClient("http://nil.nil") as storage, closing(to_upload.get_items()) as upload_items, ): bytes_uploaded = 0 for i, items in enumerate( chunked(upload_items, ZARR_UPLOAD_BATCH_SIZE), start=1 ): # Items to upload in this batch (may be retried e.g. due to # 403 errors because of timed-out upload URLs) items_to_upload = list(items) max_retries = 5 retry_count = 0 current_jobs = jobs or 5 # Add all items to checksum tree (only done once) for it in items_to_upload: zcc.add_leaf(Path(it.entry_path), it.size, it.digest) while items_to_upload and retry_count <= max_retries: # Prepare upload requests for current items uploading = [it.upload_request() for it in items_to_upload] if retry_count == 0: lgr.debug( "%s: Uploading Zarr file batch #%d (%s)", asset_path, i, pluralize(len(uploading), "file"), ) else: lgr.debug( "%s: Retrying %s from batch #%d (attempt %d/%d)", asset_path, pluralize(len(uploading), "file"), i, retry_count, max_retries, ) # Get signed URLs for items r = client.post(f"/zarr/{zarr_id}/files/", json=uploading) # Upload files in parallel with ThreadPoolExecutor(max_workers=current_jobs) as executor: futures = [ executor.submit( _upload_zarr_file, storage_session=storage, dandiset=dandiset, upload_url=signed_url, item=it, ) for (signed_url, it) in zip(r, items_to_upload) ] changed = True retry_items = [] failed_items = [] for fut in as_completed(futures): result = fut.result() if result.status == UploadStatus.SUCCESS: bytes_uploaded += result.size yield { "status": "uploading", "progress": 100 * bytes_uploaded / to_upload.total_size, "current": bytes_uploaded, } elif result.status == UploadStatus.RETRY_NEEDED: retry_items.append(result.item) else: assert result.status == UploadStatus.FAILED failed_items.append((result.item, result.error)) # Handle failed items (non-403 errors) if failed_items: _handle_failed_items_and_raise( executor, failed_items, futures ) # Prepare for next iteration with retry items if items_to_upload := retry_items: retry_count += 1 current_jobs = max(1, math.ceil(current_jobs / 2)) if retry_count <= max_retries: lgr.info( "%s: %s got 403 errors, requesting new URLs" " (attempt %d/%d, workers: %d)", asset_path, pluralize(len(items_to_upload), "file"), retry_count, max_retries, current_jobs, ) # Exponential backoff with jitter before retry sleep( min(2**retry_count * 5, 120) + random.uniform(0, 5) ) # Check if we exhausted retries if items_to_upload: nfiles_str = pluralize(len(items_to_upload), "file") raise UploadError( f"{asset_path}: failed to upload {nfiles_str} " f"after {max_retries} retries due to repeated 403 errors" ) lgr.debug("%s: Completing upload of batch #%d", asset_path, i) lgr.debug("%s: All files uploaded", asset_path) old_zarr_files = list(old_zarr_entries.values()) if old_zarr_files: lgr.debug( "%s: Deleting %s in remote Zarr not present locally", asset_path, pluralize(len(old_zarr_files), "file"), ) yield from _rmfiles( asset=a, entries=old_zarr_files, status="deleting extra remote files", ) changed = True if changed: lgr.debug( "%s: Waiting for server to calculate Zarr checksum", asset_path ) yield {"status": "server calculating checksum"} client.post(f"/zarr/{zarr_id}/finalize/") while True: sleep(2) r = client.get(f"/zarr/{zarr_id}/") if r["status"] == "Complete": our_checksum = str(zcc.process()) server_checksum = r["checksum"] if our_checksum == server_checksum: mismatched = False else: mismatched = True lgr.info( "%s: Asset checksum mismatch (local: %s;" " server: %s); redoing upload", asset_path, our_checksum, server_checksum, ) yield {"status": "Checksum mismatch"} break elif mismatched and not first_run: lgr.error( "%s: Previous upload loop resulted in checksum mismatch," " and no discrepancies between local and remote Zarr were found", asset_path, ) raise RuntimeError("Unresolvable Zarr checksum mismatch") else: mismatched = False lgr.info("%s: No changes made to Zarr", asset_path) first_run = False lgr.info("%s: Asset successfully uploaded", asset_path) yield {"status": "done", "asset": a}
def _handle_failed_items_and_raise( executor: ThreadPoolExecutor, failed_items: list, futures: list ) -> None: # Cancel any remaining futures for f in futures: f.cancel() executor.shutdown() # Log all failures for item, error in failed_items: lgr.error("Failed to upload %s (%d bytes): %s", item.filepath, item.size, error) # Summary diagnostics exc_counts = Counter(type(error).__name__ for _, error in failed_items) exc_summary = ", ".join(f"{k}: {v}" for k, v in exc_counts.most_common()) lgr.error( "Upload failure summary: %d/%d files failed; exception types: {%s}%s", len(failed_items), len(futures), exc_summary, " (systematic — all same exception type)" if len(exc_counts) == 1 else "", ) # Raise the first error raise failed_items[0][1] def _upload_zarr_file( storage_session: RESTFullAPIClient, dandiset: RemoteDandiset, upload_url: str, item: UploadItem, ) -> UploadResult: """Upload a single Zarr file and return the result status.""" try: headers = {"Content-MD5": item.base64_digest} if item.content_type is not None: headers["Content-Type"] = item.content_type # TODO: Replace below implementation with this once LINC and EMBER # update their API to match dandi-archive >= 0.12.14 # if dandiset.embargo_status == EmbargoStatus.EMBARGOED: # # Technically, a header can be specified more than once. # # Join all together just in case. # signed_headers = ";".join( # urllib.parse.parse_qs(upload_url).get("X-Amz-SignedHeaders", []) # ).split(";") # # Check that the tagging header is present, and error if not. If not present, it will # # error either way, but error from AWS would be much less specific # # and hard to decipher # if "x-amz-tagging" not in signed_headers: # raise ValueError( # "'x-amz-tagging' header not included in embargoed pre-signed URL" # ) # Technically, a header can be specified more than once. Join all together just in case. signed_headers = ";".join( urllib.parse.parse_qs(upload_url).get("X-Amz-SignedHeaders", []) ).split(";") if "x-amz-tagging" in signed_headers: headers["x-amz-tagging"] = "embargoed=true" with item.filepath.open("rb") as fp: storage_session.put( upload_url, data=fp, json_resp=False, retry_if=_retry_zarr_file, headers=headers, timeout=(60, 7200), ) except requests.HTTPError as e: post_upload_size_check(item.filepath, item.size, True) # Check if this is a 403 error that we should retry with a new URL if e.response is not None and e.response.status_code == 403: lgr.debug( "Got 403 error uploading %s (%d bytes), will retry with new URL: %s", item.filepath, item.size, str(e), ) return UploadResult(item=item, status=UploadStatus.RETRY_NEEDED, error=e) else: lgr.warning( "HTTP error uploading %s (%d bytes): %s", item.filepath, item.size, e, ) return UploadResult(item=item, status=UploadStatus.FAILED, error=e) except Exception as e: post_upload_size_check(item.filepath, item.size, True) lgr.warning( "Error uploading %s (%d bytes): %s: %s", item.filepath, item.size, type(e).__name__, e, ) return UploadResult(item=item, status=UploadStatus.FAILED, error=e) else: post_upload_size_check(item.filepath, item.size, False) return UploadResult(item=item, status=UploadStatus.SUCCESS, size=item.size) def _retry_zarr_file(r: requests.Response) -> bool: return ( # Some sort of filesystem hiccup can cause requests to be unable to get the # filesize, leading to it falling back to "chunked" transfer encoding, # which S3 doesn't support. r.status_code == 501 and "header you provided implies functionality that is not implemented" in r.text ) or ( # Network issue or rate limiting can cause a timeout, which results in a 400. # Case: https://github.com/dandi/dandi-cli/issues/1662 r.status_code == 400 and "was not read from or written to within the timeout period" in r.text ) @dataclass class EntryUploadTracker: """ Class for keeping track of `LocalZarrEntry` instances to upload :meta private: """ total_size: int = 0 digested_entries: list[UploadItem] = field(default_factory=list) fresh_entries: list[LocalZarrEntry] = field(default_factory=list) def register(self, e: LocalZarrEntry, digest: str | None = None) -> None: if digest is not None: self.digested_entries.append(UploadItem.from_entry(e, digest)) else: self.fresh_entries.append(e) self.total_size += e.size @staticmethod def _mkitem(e: LocalZarrEntry) -> UploadItem: # Avoid heavy import by importing within function: from dandi.support.digests import md5file_nocache digest = md5file_nocache(e.filepath) return UploadItem.from_entry(e, digest) def get_items(self, jobs: int = 5) -> Generator[UploadItem, None, None]: # Note: In order for the ThreadPoolExecutor to be closed if an error # occurs during upload, the method must be used like this: # # with contextlib.closing(to_upload.get_items()) as upload_items: # for item in upload_items: # ... yield from self.digested_entries if not self.fresh_entries: return with ThreadPoolExecutor(max_workers=jobs) as executor: futures = [executor.submit(self._mkitem, e) for e in self.fresh_entries] for fut in as_completed(futures): try: yield fut.result() # Use BaseException to also catch GeneratorExit thrown by # closing() except BaseException: for f in futures: f.cancel() raise @dataclass class UploadItem: """:meta private:""" entry_path: str filepath: Path digest: str size: int content_type: str | None @classmethod def from_entry(cls, e: LocalZarrEntry, digest: str) -> UploadItem: # JSON metadata files. ``.zarray`` / ``.zattrs`` / ``.zgroup`` / # ``.zmetadata`` are the V2 names; ``zarr.json`` is the V3 name (a # single file per group/array containing all metadata). if e.name in {".zarray", ".zattrs", ".zgroup", ".zmetadata", "zarr.json"}: try: with e.filepath.open("rb") as fp: json.load(fp) except Exception: content_type = None else: content_type = "application/json" else: content_type = None size = pre_upload_size_check(e.filepath) if size > S3_MAX_SINGLE_PART_UPLOAD: raise ValueError( f"Zarr chunk {e.filepath} is {size / 1024**3:.2f} GiB," f" exceeding the S3 single-part upload limit of" f" {S3_MAX_SINGLE_PART_UPLOAD / 1024**3:.0f} GiB." f" Multipart upload for zarr chunks is not yet supported." ) return cls( entry_path=str(e), filepath=e.filepath, digest=digest, size=size, content_type=content_type, ) @property def base64_digest(self) -> str: return b64encode(bytes.fromhex(self.digest)).decode("us-ascii") def upload_request(self) -> dict[str, str | None]: return {"path": self.entry_path, "base64md5": self.base64_digest} def _cmp_digests( asset_path: str, local_entry: LocalZarrEntry, remote_digest: str ) -> tuple[LocalZarrEntry, str, bool]: # Avoid heavy import by importing within function: from dandi.support.digests import md5file_nocache local_digest = md5file_nocache(local_entry.filepath) if local_digest != remote_digest: lgr.debug( "%s: Path %s in Zarr differs from local file; re-uploading", asset_path, local_entry, ) return (local_entry, local_digest, True) else: lgr.debug("%s: File %s already on server; skipping", asset_path, local_entry) return (local_entry, local_digest, False) def _rmfiles( asset: RemoteZarrAsset, entries: list[RemoteZarrEntry], status: str ) -> Iterator[dict]: # Do the batching outside of the rmfiles() method so that we can report # progress on the completion of each batch yield { "status": status, "progress": 0, "current": 0, } deleted = 0 for ents in chunked(entries, ZARR_DELETE_BATCH_SIZE): asset.rmfiles(ents, reingest=False) deleted += len(ents) yield { "status": status, "progress": deleted / len(entries) * 100, "current": deleted, }