Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do fail (raise Exception, should be non-0 exit) download if any download fails #1536

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 113 additions & 81 deletions dandi/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,13 @@

gen_ = (r for dl in downloaders for r in dl.download_generator())

errors = []

def p4e(out):
if out.get("status") == "error":
errors.append(out)
return out

# TODOs:
# - redo frontends similarly to how command_ls did it
# - have a single loop with analysis of `rec` to either any file
Expand All @@ -161,11 +168,11 @@
#
if format is DownloadFormat.DEBUG:
for rec in gen_:
print(rec, flush=True)
print(p4e(rec), flush=True)
elif format is DownloadFormat.PYOUT:
with out:
for rec in gen_:
out(rec)
out(p4e(rec))
else:
raise AssertionError(f"Unhandled DownloadFormat member: {format!r}")

Expand All @@ -191,6 +198,8 @@
break
else:
break
if errors:
raise RuntimeError(f"Encountered {len(errors)} errors while downloading.")


@dataclass
Expand Down Expand Up @@ -690,12 +699,10 @@
"%s - found no digests in hashlib for any of %s", path, str(digests)
)

# TODO: how do we discover the total size????
# TODO: do not do it in-place, but rather into some "hidden" file
resuming = False
attempt = 0
nattempts = 3 # number to do, could be incremented if we downloaded a little
while attempt <= nattempts:
attempts_allowed = 3 # number to do, could be incremented if we downloaded a little
while attempt <= attempts_allowed:
attempt += 1
try:
if digester:
Expand Down Expand Up @@ -724,7 +731,6 @@
downloaded_digest.update(block)
downloaded += len(block)
downloaded_in_attempt += len(block)
# TODO: yield progress etc
out: dict[str, Any] = {"done": downloaded}
if size:
if downloaded > size and not warned:
Expand All @@ -737,7 +743,6 @@
size,
)
out["done%"] = 100 * downloaded / size
# TODO: ETA etc
yield out
dldir.append(block)
break
Expand All @@ -749,87 +754,33 @@
# Catching RequestException lets us retry on timeout & connection
# errors (among others) in addition to HTTP status errors.
except requests.RequestException as exc:
sleep_amount = random.random() * 5 * attempt
if os.environ.get("DANDI_DOWNLOAD_AGGRESSIVE_RETRY"):
# in such a case if we downloaded a little more --
# consider it a successful attempt
if downloaded_in_attempt > 0:
lgr.debug(
"%s - download failed on attempt #%d: %s, "
"but did download %d bytes, so considering "
"it a success and incrementing number of allowed attempts.",
path,
attempt,
exc,
downloaded_in_attempt,
)
nattempts += 1
# TODO: actually we should probably retry only on selected codes,
if exc.response is not None:
if exc.response.status_code not in (
400, # Bad Request, but happened with gider:
# https://github.com/dandi/dandi-cli/issues/87
*RETRY_STATUSES,
):
lgr.debug(
"%s - download failed due to response %d: %s",
path,
exc.response.status_code,
exc,
)
yield {"status": "error", "message": str(exc)}
return
elif retry_after := exc.response.headers.get("Retry-After"):
# playing safe
if not str(retry_after).isdigit():
# our code is wrong, do not crash but issue warning so
# we might get report/fix it up
lgr.warning(
"%s - download failed due to response %d with non-integer"
" Retry-After=%r: %s",
path,
exc.response.status_code,
retry_after,
exc,
)
yield {"status": "error", "message": str(exc)}
return
sleep_amount = int(retry_after)
lgr.debug(
"%s - download failed due to response %d with "
"Retry-After=%d: %s, will sleep and retry",
path,
exc.response.status_code,
sleep_amount,
exc,
)
else:
lgr.debug("%s - download failed: %s", path, exc)
yield {"status": "error", "message": str(exc)}
return
elif attempt >= nattempts:
lgr.debug(
"%s - download failed after %d attempts: %s", path, attempt, exc
)
attempts_allowed = _check_if_more_attempts_allowed(

Check warning on line 757 in dandi/download.py

View check run for this annotation

Codecov / codecov/patch

dandi/download.py#L757

Added line #L757 was not covered by tests
path=path,
exc=exc,
attempt=attempt,
attempts_allowed=attempts_allowed,
downloaded_in_attempt=downloaded_in_attempt,
)
if not attempts_allowed:

Check warning on line 764 in dandi/download.py

View check run for this annotation

Codecov / codecov/patch

dandi/download.py#L764

Added line #L764 was not covered by tests
yield {"status": "error", "message": str(exc)}
return
# if is_access_denied(exc) or attempt >= 2:
# raise
# sleep a little and retry
else:
lgr.debug(
"%s - download failed on attempt #%d: %s, will sleep a bit and retry",
path,
attempt,
exc,
)
time.sleep(sleep_amount)
else:
lgr.warning("downloader logic: We should not be here!")

final_digest = None
if downloaded_digest and not resuming:
assert downloaded_digest is not None
final_digest = downloaded_digest.hexdigest() # we care only about hex
elif resuming:
if resuming:
lgr.debug("%s - resumed download. Need to check full checksum.", path)
else:
assert not downloaded_digest

Check warning

Code scanning / CodeQL

Unreachable code Warning

This statement is unreachable.
lgr.debug(

Check warning on line 779 in dandi/download.py

View check run for this annotation

Codecov / codecov/patch

dandi/download.py#L778-L779

Added lines #L778 - L779 were not covered by tests
"%s - no digest was checked online. Need to check full checksum", path
)
final_digest = get_digest(path, algo)
if final_digest:
if digest_callback is not None:
assert isinstance(algo, str)
digest_callback(algo, final_digest)
Expand All @@ -842,6 +793,7 @@
yield {"checksum": "ok"}
lgr.debug("%s - verified that has correct %s %s", path, algo, digest)
else:
lgr.debug("%s - no digests were provided", path)

Check warning on line 796 in dandi/download.py

View check run for this annotation

Codecov / codecov/patch

dandi/download.py#L796

Added line #L796 was not covered by tests
# shouldn't happen with more recent metadata etc
yield {
"checksum": "-",
Expand Down Expand Up @@ -1085,6 +1037,86 @@
yield {"status": "done"}


def _check_if_more_attempts_allowed(
path: Path,
exc: requests.RequestException,
attempt: int,
attempts_allowed: int,
downloaded_in_attempt: int,
) -> int | None:
Comment on lines +1040 to +1046

Check notice

Code scanning / CodeQL

Explicit returns mixed with implicit (fall through) returns Note

Mixing implicit and explicit returns may indicate an error as implicit returns always return None.
"""Check if we should retry the download, return potentially adjusted 'attempts_allowed'"""
sleep_amount = random.random() * 5 * attempt
if os.environ.get("DANDI_DOWNLOAD_AGGRESSIVE_RETRY"):

Check warning on line 1049 in dandi/download.py

View check run for this annotation

Codecov / codecov/patch

dandi/download.py#L1048-L1049

Added lines #L1048 - L1049 were not covered by tests
# in such a case if we downloaded a little more --
# consider it a successful attempt
if downloaded_in_attempt > 0:
lgr.debug(

Check warning on line 1053 in dandi/download.py

View check run for this annotation

Codecov / codecov/patch

dandi/download.py#L1052-L1053

Added lines #L1052 - L1053 were not covered by tests
"%s - download failed on attempt #%d: %s, "
"but did download %d bytes, so considering "
"it a success and incrementing number of allowed attempts.",
path,
attempt,
exc,
downloaded_in_attempt,
)
attempts_allowed += 1

Check warning on line 1062 in dandi/download.py

View check run for this annotation

Codecov / codecov/patch

dandi/download.py#L1062

Added line #L1062 was not covered by tests
# TODO: actually we should probably retry only on selected codes,
if exc.response is not None:
if exc.response.status_code not in (

Check warning on line 1065 in dandi/download.py

View check run for this annotation

Codecov / codecov/patch

dandi/download.py#L1064-L1065

Added lines #L1064 - L1065 were not covered by tests
400, # Bad Request, but happened with gider:
# https://github.com/dandi/dandi-cli/issues/87
*RETRY_STATUSES,
):
lgr.debug(

Check warning on line 1070 in dandi/download.py

View check run for this annotation

Codecov / codecov/patch

dandi/download.py#L1070

Added line #L1070 was not covered by tests
"%s - download failed due to response %d: %s",
path,
exc.response.status_code,
exc,
)
return
elif retry_after := exc.response.headers.get("Retry-After"):

Check warning on line 1077 in dandi/download.py

View check run for this annotation

Codecov / codecov/patch

dandi/download.py#L1076-L1077

Added lines #L1076 - L1077 were not covered by tests
# playing safe
if not str(retry_after).isdigit():

Check warning on line 1079 in dandi/download.py

View check run for this annotation

Codecov / codecov/patch

dandi/download.py#L1079

Added line #L1079 was not covered by tests
# our code is wrong, do not crash but issue warning so
# we might get report/fix it up
lgr.warning(

Check warning on line 1082 in dandi/download.py

View check run for this annotation

Codecov / codecov/patch

dandi/download.py#L1082

Added line #L1082 was not covered by tests
"%s - download failed due to response %d with non-integer"
" Retry-After=%r: %s",
path,
exc.response.status_code,
retry_after,
exc,
)
return
sleep_amount = int(retry_after)
lgr.debug(

Check warning on line 1092 in dandi/download.py

View check run for this annotation

Codecov / codecov/patch

dandi/download.py#L1090-L1092

Added lines #L1090 - L1092 were not covered by tests
"%s - download failed due to response %d with "
"Retry-After=%d: %s, will sleep and retry",
path,
exc.response.status_code,
sleep_amount,
exc,
)
else:
lgr.debug("%s - download failed: %s", path, exc)
return
elif attempt >= attempts_allowed:
lgr.debug("%s - download failed after %d attempts: %s", path, attempt, exc)
return

Check warning on line 1105 in dandi/download.py

View check run for this annotation

Codecov / codecov/patch

dandi/download.py#L1101-L1105

Added lines #L1101 - L1105 were not covered by tests
# if is_access_denied(exc) or attempt >= 2:
# raise
Comment on lines +1106 to +1107

Check notice

Code scanning / CodeQL

Commented-out code Note

This comment appears to contain commented-out code.
# sleep a little and retry
else:
lgr.debug(

Check warning on line 1110 in dandi/download.py

View check run for this annotation

Codecov / codecov/patch

dandi/download.py#L1110

Added line #L1110 was not covered by tests
"%s - download failed on attempt #%d: %s, will sleep a bit and retry",
path,
attempt,
exc,
)
time.sleep(sleep_amount)
return attempts_allowed

Check warning on line 1117 in dandi/download.py

View check run for this annotation

Codecov / codecov/patch

dandi/download.py#L1116-L1117

Added lines #L1116 - L1117 were not covered by tests


def pairing(p: str, gen: Iterator[dict]) -> Iterator[tuple[str, dict]]:
for d in gen:
yield (p, d)
Expand Down
24 changes: 19 additions & 5 deletions dandi/tests/test_download.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

from collections.abc import Callable
from contextlib import nullcontext
from glob import glob
import json
import logging
Expand Down Expand Up @@ -70,8 +71,9 @@ def test_download_000027(
with pytest.raises(FileExistsError):
download(url, tmp_path, format=DownloadFormat.DEBUG)
assert "FileExistsError" not in capsys.readouterr().out
# but no exception is raised, and rather it gets output to pyout otherwise
download(url, tmp_path)
# Generic error should be raised if we are using pyout/parallelization
with pytest.raises(RuntimeError):
download(url, tmp_path)
assert "FileExistsError" in capsys.readouterr().out

# TODO: somehow get that status report about what was downloaded and what not
Expand Down Expand Up @@ -122,8 +124,9 @@ def test_download_000027_assets_only(url: str, tmp_path: Path) -> None:
@mark.skipif_no_network
@pytest.mark.parametrize("resizer", [lambda sz: 0, lambda sz: sz // 2, lambda sz: sz])
@pytest.mark.parametrize("version", ["0.210831.2033", DRAFT])
@pytest.mark.parametrize("break_download", [False, True])
def test_download_000027_resume(
tmp_path: Path, resizer: Callable[[int], int], version: str
tmp_path: Path, resizer: Callable[[int], int], version: str, break_download: bool
) -> None:
url = f"https://dandiarchive.org/dandiset/000027/{version}"
digester = Digester()
Expand All @@ -137,15 +140,26 @@ def test_download_000027_resume(
nwb.rename(dlfile)
size = dlfile.stat().st_size
os.truncate(dlfile, resizer(size))
if break_download:
bad_load = b"bad"
if resizer(size) == size: # no truncation
os.truncate(dlfile, size - len(bad_load))
with open(dlfile, "ab") as f:
f.write(bad_load)
with (dldir / "checksum").open("w") as fp:
json.dump(digests, fp)
download(url, tmp_path, get_metadata=False)

with pytest.raises(RuntimeError) if break_download else nullcontext():
download(url, tmp_path, get_metadata=False)
assert list_paths(dsdir, dirs=True) == [
dsdir / "sub-RAT123",
dsdir / "sub-RAT123" / "sub-RAT123.nwb",
]
assert nwb.stat().st_size == size
assert digester(str(nwb)) == digests
if break_download:
assert digester(str(nwb)) != digests
else:
assert digester(str(nwb)) == digests


def test_download_newest_version(text_dandiset: SampleDandiset, tmp_path: Path) -> None:
Expand Down
Loading