Source code for dandi.delete

from __future__ import annotations

from collections.abc import Iterable, Iterator
from dataclasses import dataclass, field
from operator import attrgetter
from pathlib import Path

import click
from yarl import URL

from .consts import DRAFT, ZARR_EXTENSIONS, DandiInstance, dandiset_metadata_file
from .dandiapi import DandiAPIClient, RemoteAsset, RemoteDandiset
from .dandiarchive import BaseAssetIDURL, DandisetURL, ParsedDandiURL, parse_dandi_url
from .dandiset import Dandiset
from .exceptions import NotFoundError
from .support import pyout as pyouts
from .utils import get_instance, is_url


[docs]@dataclass class Deleter: """ Class for registering assets & Dandisets to delete and then deleting them """ client: DandiAPIClient | None = None dandiset: RemoteDandiset | None = None #: Whether we are deleting an entire Dandiset (true) or just assets (false) deleting_dandiset: bool = False skip_missing: bool = False remote_assets: list[RemoteAsset] = field(default_factory=list) def __bool__(self) -> bool: return self.deleting_dandiset or bool(self.remote_assets)
[docs] def set_dandiset(self, instance: DandiInstance, dandiset_id: str) -> bool: """ Returns `False` if no action should be taken due to the Dandiset not existing """ if self.client is None: self.client = DandiAPIClient.for_dandi_instance(instance, authenticate=True) try: self.dandiset = self.client.get_dandiset(dandiset_id, DRAFT, lazy=False) except NotFoundError: if self.skip_missing: return False else: raise elif not is_same_url(self.client.api_url, instance.api): raise ValueError("Cannot delete assets from multiple API instances at once") else: assert self.dandiset is not None if self.dandiset.identifier != dandiset_id: raise ValueError("Cannot delete assets from multiple Dandisets at once") return True
[docs] def add_asset(self, asset: RemoteAsset) -> None: # Ensure the list is free of duplicates so that we don't try to delete # the same asset twice if not any(a.identifier == asset.identifier for a in self.remote_assets): self.remote_assets.append(asset)
[docs] def register_dandiset(self, instance: DandiInstance, dandiset_id: str) -> None: if not self.set_dandiset(instance, dandiset_id): return self.deleting_dandiset = True
[docs] def register_asset( self, instance: DandiInstance, dandiset_id: str, version_id: str, asset_path: str, ) -> None: if not self.set_dandiset(instance, dandiset_id): return assert self.dandiset is not None try: asset = self.dandiset.get_asset_by_path(asset_path) except NotFoundError: if self.skip_missing: return else: raise NotFoundError( f"Asset at path {asset_path!r} not found in Dandiset {dandiset_id}" ) self.add_asset(asset)
[docs] def register_asset_folder( self, instance: DandiInstance, dandiset_id: str, version_id: str, folder_path: str, ) -> None: if not self.set_dandiset(instance, dandiset_id): return any_assets = False assert self.dandiset is not None for asset in self.dandiset.get_assets_with_path_prefix(folder_path): self.add_asset(asset) any_assets = True if not any_assets and not self.skip_missing: raise NotFoundError( f"No assets under path {folder_path!r} found in Dandiset {dandiset_id}" )
[docs] def register_assets_url(self, url: str, parsed_url: ParsedDandiURL) -> None: if isinstance(parsed_url, BaseAssetIDURL): raise ValueError("Cannot delete an asset identified by just an ID") assert parsed_url.dandiset_id is not None if not self.set_dandiset(parsed_url.instance, parsed_url.dandiset_id): return any_assets = False assert self.client is not None for a in parsed_url.get_assets(self.client): assert isinstance(a, RemoteAsset) self.add_asset(a) any_assets = True if not any_assets and not self.skip_missing: raise NotFoundError(f"No assets found for {url}")
[docs] def register_url(self, url: str) -> None: parsed_url = parse_dandi_url(url) if isinstance(parsed_url, DandisetURL): if parsed_url.version_id is not None: raise NotImplementedError( "Dandi API server does not support deletion of individual" " versions of a dandiset" ) assert parsed_url.dandiset_id is not None self.register_dandiset(parsed_url.instance, parsed_url.dandiset_id) else: if parsed_url.version_id is None: parsed_url.version_id = DRAFT self.register_assets_url(url, parsed_url)
[docs] def register_local_path_equivalent( self, instance_name: str | DandiInstance, filepath: str ) -> None: instance = get_instance(instance_name) dandiset_id, asset_path = find_local_asset(filepath) if not self.set_dandiset(instance, dandiset_id): return if asset_path.endswith("/"): self.register_asset_folder(instance, dandiset_id, DRAFT, asset_path) else: self.register_asset(instance, dandiset_id, DRAFT, asset_path)
[docs] def confirm(self) -> bool: if self.dandiset is None: raise ValueError("confirm() called before registering anything to delete") if self.deleting_dandiset: msg = f"Delete Dandiset {self.dandiset.identifier}?" else: msg = ( f"Delete {len(self.remote_assets)} assets on server from" f" Dandiset {self.dandiset.identifier}?" ) return click.confirm(msg)
[docs] def delete_dandiset(self) -> None: if self.deleting_dandiset: assert self.dandiset is not None self.dandiset.delete() else: raise RuntimeError( "delete_dandiset() called when Dandiset not registered for deletion" )
def _process_asset(self, asset: RemoteAsset) -> Iterator[dict]: yield {"status": "Deleting"} try: asset.delete() except Exception as e: yield {"status": "Error", "message": f"{type(e).__name__}: {e}"} else: yield {"status": "Deleted"}
[docs] def process_assets_pyout(self) -> Iterator[dict]: for asset in sorted(self.remote_assets, key=attrgetter("path")): yield { "path": asset.path, ("status", "message"): self._process_asset(asset), }
[docs] def process_assets_debug(self) -> Iterator[Iterator[dict]]: for asset in sorted(self.remote_assets, key=attrgetter("path")): yield ({"path": asset.path, **d} for d in self._process_asset(asset))
[docs]def delete( paths: Iterable[str], dandi_instance: str | DandiInstance = "dandi", devel_debug: bool = False, jobs: int | None = None, force: bool = False, skip_missing: bool = False, ) -> None: """Delete dandisets and assets from the server. PATH could be a local path or a URL to an asset, directory, or an entire dandiset. """ deleter = Deleter(skip_missing=skip_missing) for p in paths: if is_url(p): deleter.register_url(p) else: deleter.register_local_path_equivalent(dandi_instance, p) if deleter and (force or deleter.confirm()): if deleter.deleting_dandiset: deleter.delete_dandiset() elif devel_debug: for gen in deleter.process_assets_debug(): for r in gen: print(r, flush=True) else: pyout_style = pyouts.get_style(hide_if_missing=False) rec_fields = ("path", "status", "message") out = pyouts.LogSafeTabular( style=pyout_style, columns=rec_fields, max_workers=jobs ) with out: for r in deleter.process_assets_pyout(): out(r)
[docs]def find_local_asset(filepath: str) -> tuple[str, str]: """ Given a path to a local file, return the ID of the Dandiset in which it is located and the path to the file relative to the root of said Dandiset. If the file is a directory, the path will end with a trailing slash. """ path = Path(filepath).absolute() dandiset = Dandiset.find(path.parent) if dandiset is None: raise RuntimeError( f"Found no {dandiset_metadata_file} anywhere. " "Use 'dandi download' or 'organize' first" ) relpath = path.relative_to(dandiset.path).as_posix() if path.is_dir() and path.suffix not in ZARR_EXTENSIONS: relpath += "/" return (dandiset.identifier, relpath)
[docs]def is_same_url(url1: str, url2: str) -> bool: u1 = URL(url1) u1 = u1.with_path(u1.path.rstrip("/")) u2 = URL(url2) u2 = u2.with_path(u2.path.rstrip("/")) return u1 == u2