Skip to content

Commit

Permalink
Merge pull request #1509 from dandi/enh-log2
Browse files Browse the repository at this point in the history
download: more consistent and exhaustive logging, new `DANDI_DEVEL_AGGRESSIVE_RETRY` mode, respect (?) Retry-After
  • Loading branch information
yarikoptic authored Nov 15, 2024
2 parents 06c10cc + 7f97394 commit 3671917
Show file tree
Hide file tree
Showing 7 changed files with 240 additions and 28 deletions.
4 changes: 4 additions & 0 deletions DEVELOPMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ development command line options.
function will patch `requests` to log the results of calls to
`requests.utils.super_len()`

- `DANDI_DOWNLOAD_AGGRESSIVE_RETRY` -- When set, would make `download()` retry
very aggressively - it would keep trying if at least some bytes are downloaded
on each attempt. Typically is not needed and could be a sign of network issues.

## Sourcegraph

The [Sourcegraph](https://sourcegraph.com) browser extension can be used to
Expand Down
16 changes: 11 additions & 5 deletions dandi/cli/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,20 @@ def main(ctx, log_level, pdb=False):
lambda r: r.name != "pyout" and not r.name.startswith("pyout.")
)
root.addHandler(handler)
exts = (
"dandischema",
"h5py",
"hdmf",
"pynwb",
"requests",
"urllib3",
)

lgr.info(
"dandi v%s, dandischema v%s, hdmf v%s, pynwb v%s, h5py v%s",
"python %s, dandi %s, "
+ ", ".join("%s %s" % (e, get_module_version(e)) for e in sorted(exts)),
sys.version.split()[0],
__version__,
get_module_version("dandischema"),
get_module_version("hdmf"),
get_module_version("pynwb"),
get_module_version("h5py"),
extra={"file_only": True},
)
lgr.info("sys.argv = %r", sys.argv, extra={"file_only": True})
Expand Down
8 changes: 6 additions & 2 deletions dandi/cli/tests/test_service_scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from click.testing import CliRunner
from dandischema.consts import DANDI_SCHEMA_VERSION
import pytest
import vcr

from dandi import __version__
from dandi.tests.fixtures import SampleDandiset
Expand Down Expand Up @@ -76,9 +75,14 @@ def test_update_dandiset_from_doi(
dandiset_id = new_dandiset.dandiset_id
repository = new_dandiset.api.instance.gui
monkeypatch.setenv("DANDI_API_KEY", new_dandiset.api.api_key)
if os.environ.get("DANDI_TESTS_NO_VCR", ""):
if os.environ.get("DANDI_TESTS_NO_VCR", "") or sys.version_info <= (3, 10):
# Older vcrpy has an issue with Python 3.9 and newer urllib2 >= 2
# But we require newer urllib2 for more correct operation, and
# do still support 3.9. Remove when 3.9 support is dropped
ctx = nullcontext()
else:
import vcr

ctx = vcr.use_cassette(
str(DATA_DIR / "update_dandiset_from_doi" / f"{name}.vcr.yaml"),
before_record_request=record_only_doi_requests,
Expand Down
15 changes: 14 additions & 1 deletion dandi/dandiapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -1592,10 +1592,23 @@ def downloader(start_at: int = 0) -> Iterator[bytes]:
# TODO: apparently we might need retries here as well etc
# if result.status_code not in (200, 201):
result.raise_for_status()
nbytes, nchunks = 0, 0
for chunk in result.iter_content(chunk_size=chunk_size):
nchunks += 1
if chunk: # could be some "keep alive"?
nbytes += len(chunk)
yield chunk
lgr.info("Asset %s successfully downloaded", self.identifier)
else:
lgr.debug("'Empty' chunk downloaded for %s", url)
lgr.info(
"Asset %s (%d bytes in %d chunks starting from %d) successfully "
"downloaded from %s",
self.identifier,
nbytes,
nchunks,
start_at,
url,
)

return downloader

Expand Down
131 changes: 112 additions & 19 deletions dandi/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from pathlib import Path
import random
from shutil import rmtree
import sys
from threading import Lock
import time
from types import TracebackType
Expand Down Expand Up @@ -692,17 +693,26 @@ def _download_file(
# TODO: how do we discover the total size????
# TODO: do not do it in-place, but rather into some "hidden" file
resuming = False
for attempt in range(3):
attempt = 0
nattempts = 3 # number to do, could be incremented if we downloaded a little
while attempt <= nattempts:
attempt += 1
try:
if digester:
downloaded_digest = digester() # start empty
warned = False
# I wonder if we could make writing async with downloader
with DownloadDirectory(path, digests or {}) as dldir:
assert dldir.offset is not None
downloaded_in_attempt = 0
downloaded = dldir.offset
resuming = downloaded > 0
if size is not None and downloaded == size:
lgr.debug(
"%s - downloaded size matches target size of %d, exiting the loop",
path,
size,
)
# Exit early when downloaded == size, as making a Range
# request in such a case results in a 416 error from S3.
# Problems will result if `size` is None but we've already
Expand All @@ -713,6 +723,7 @@ def _download_file(
assert downloaded_digest is not None
downloaded_digest.update(block)
downloaded += len(block)
downloaded_in_attempt += len(block)
# TODO: yield progress etc
out: dict[str, Any] = {"done": downloaded}
if size:
Expand All @@ -738,30 +749,83 @@ def _download_file(
# Catching RequestException lets us retry on timeout & connection
# errors (among others) in addition to HTTP status errors.
except requests.RequestException as exc:
sleep_amount = random.random() * 5 * attempt
if os.environ.get("DANDI_DOWNLOAD_AGGRESSIVE_RETRY"):
# in such a case if we downloaded a little more --
# consider it a successful attempt
if downloaded_in_attempt > 0:
lgr.debug(
"%s - download failed on attempt #%d: %s, "
"but did download %d bytes, so considering "
"it a success and incrementing number of allowed attempts.",
path,
attempt,
exc,
downloaded_in_attempt,
)
nattempts += 1
# TODO: actually we should probably retry only on selected codes,
# and also respect Retry-After
if attempt >= 2 or (
exc.response is not None
and exc.response.status_code
not in (
if exc.response is not None:
if exc.response.status_code not in (
400, # Bad Request, but happened with gider:
# https://github.com/dandi/dandi-cli/issues/87
*RETRY_STATUSES,
):
lgr.debug(
"%s - download failed due to response %d: %s",
path,
exc.response.status_code,
exc,
)
yield {"status": "error", "message": str(exc)}
return
elif retry_after := exc.response.headers.get("Retry-After"):
# playing safe
if not str(retry_after).isdigit():
# our code is wrong, do not crash but issue warning so
# we might get report/fix it up
lgr.warning(
"%s - download failed due to response %d with non-integer"
" Retry-After=%r: %s",
path,
exc.response.status_code,
retry_after,
exc,
)
yield {"status": "error", "message": str(exc)}
return
sleep_amount = int(retry_after)
lgr.debug(
"%s - download failed due to response %d with "
"Retry-After=%d: %s, will sleep and retry",
path,
exc.response.status_code,
sleep_amount,
exc,
)
else:
lgr.debug("%s - download failed: %s", path, exc)
yield {"status": "error", "message": str(exc)}
return
elif attempt >= nattempts:
lgr.debug(
"%s - download failed after %d attempts: %s", path, attempt, exc
)
):
lgr.debug("%s - download failed: %s", path, exc)
yield {"status": "error", "message": str(exc)}
return
# if is_access_denied(exc) or attempt >= 2:
# raise
# sleep a little and retry
lgr.debug(
"%s - failed to download on attempt #%d: %s, will sleep a bit and retry",
path,
attempt,
exc,
)
time.sleep(random.random() * 5)
else:
lgr.debug(
"%s - download failed on attempt #%d: %s, will sleep a bit and retry",
path,
attempt,
exc,
)
time.sleep(sleep_amount)
else:
lgr.warning("downloader logic: We should not be here!")

if downloaded_digest and not resuming:
assert downloaded_digest is not None
Expand Down Expand Up @@ -829,16 +893,22 @@ def __enter__(self) -> DownloadDirectory:
):
# Pick up where we left off, writing to the end of the file
lgr.debug(
"Download directory exists and has matching checksum; resuming download"
"%s - download directory exists and has matching checksum(s) %s; resuming download",
self.dirpath,
matching_algs,
)
self.fp = self.writefile.open("ab")
else:
# Delete the file (if it even exists) and start anew
if not chkpath.exists():
lgr.debug("Starting new download in new download directory")
lgr.debug(
"%s - no prior digests found; starting new download", self.dirpath
)
else:
lgr.debug(
"Download directory found, but digests do not match; starting new download"
"%s - download directory found, but digests do not match;"
" starting new download",
self.dirpath,
)
try:
self.writefile.unlink()
Expand All @@ -857,12 +927,35 @@ def __exit__(
exc_tb: TracebackType | None,
) -> None:
assert self.fp is not None
if exc_type is not None or exc_val is not None or exc_tb is not None:
lgr.debug(
"%s - entered __exit__ with position %d with exception: %s, %s",
self.dirpath,
self.fp.tell(),
exc_type,
exc_val,
)
else:
lgr.debug(
"%s - entered __exit__ with position %d without any exception",
self.dirpath,
self.fp.tell(),
)
self.fp.close()
try:
if exc_type is None:
try:
self.writefile.replace(self.filepath)
except IsADirectoryError:
except (IsADirectoryError, PermissionError) as exc:
if isinstance(exc, PermissionError):
if not (
sys.platform.startswith("win") and self.filepath.is_dir()
):
raise
lgr.debug(
"Destination path %s is a directory; removing it and retrying",
self.filepath,
)
rmtree(self.filepath)
self.writefile.replace(self.filepath)
finally:
Expand Down
90 changes: 90 additions & 0 deletions dandi/tests/test_download.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from __future__ import annotations

from collections.abc import Callable
from glob import glob
import json
import logging
from multiprocessing import Manager, Process
import os
import os.path
from pathlib import Path
Expand All @@ -21,6 +24,7 @@
from ..consts import DRAFT, dandiset_metadata_file
from ..dandiarchive import DandisetURL
from ..download import (
DownloadDirectory,
Downloader,
DownloadExisting,
DownloadFormat,
Expand Down Expand Up @@ -1038,3 +1042,89 @@ def test_pyouthelper_time_remaining_1339():
assert len(done) == 2
else:
assert done[-1] == f"ETA: {10 - i} seconds<"


@mark.skipif_on_windows # https://github.com/pytest-dev/pytest/issues/12964
def test_DownloadDirectory_basic(tmp_path: Path) -> None:
with DownloadDirectory(tmp_path, digests={}) as dl:
assert dl.dirpath.exists()
assert dl.writefile.exists()
assert dl.writefile.stat().st_size == 0
assert dl.offset == 0

dl.append(b"123")
assert dl.fp is not None
dl.fp.flush() # appends are not flushed automatically
assert dl.writefile.stat().st_size == 3
assert dl.offset == 0 # doesn't change

dl.append(b"456")
inode_number = dl.writefile.stat().st_ino
assert inode_number != tmp_path.stat().st_ino

# but after we are done - should be a full file!
assert tmp_path.stat().st_size == 6
assert tmp_path.read_bytes() == b"123456"
# we moved the file, didn't copy (expensive)
assert inode_number == tmp_path.stat().st_ino

# no problem with overwriting with new content
with DownloadDirectory(tmp_path, digests={}) as dl:
dl.append(b"789")
assert tmp_path.read_bytes() == b"789"

# even if path is a directory which we "overwrite"
tmp_path.unlink()
tmp_path.mkdir()
(tmp_path / "somedata.dat").write_text("content")
with DownloadDirectory(tmp_path, digests={}) as dl:
assert set(glob(f"{tmp_path}*")) == {str(tmp_path), str(dl.dirpath)}
dl.append(b"123")
assert tmp_path.read_bytes() == b"123"

# no temp .dandidownload folder is left behind
assert set(glob(f"{tmp_path}*")) == {str(tmp_path)}

# test locking
with Manager() as manager:
results = manager.list()
with DownloadDirectory(tmp_path, digests={}) as dl:
dl.append(b"123")
p1 = Process(target=_download_directory_subproc, args=(tmp_path, results))
p1.start()
p1.join()
assert len(results) == 1
assert results[0] == f"Could not acquire download lock for {tmp_path}"
assert tmp_path.read_bytes() == b"123"


# needs to be a top-level function for pickling
def _download_directory_subproc(path, results):
try:
with DownloadDirectory(path, digests={}):
results.append("re-entered fine")
except Exception as exc:
results.append(str(exc))


def test_DownloadDirectory_exc(
tmp_path: Path, caplog: pytest.LogCaptureFixture
) -> None:
caplog.set_level(logging.DEBUG, logger="dandi")
# and now let's exit with exception
with pytest.raises(RuntimeError):
with DownloadDirectory(tmp_path, digests={}) as dl:
dl.append(b"456")
raise RuntimeError("Boom")
assert (
"dandi",
10,
f"{dl.dirpath} - entered __exit__ with position 3 with exception: "
"<class 'RuntimeError'>, Boom",
) == caplog.record_tuples[-1]
# and we left without cleanup but closed things up after ourselves
assert tmp_path.exists()
assert tmp_path.is_dir()
assert dl.dirpath.exists()
assert dl.fp is None
assert dl.writefile.read_bytes() == b"456"
Loading

0 comments on commit 3671917

Please sign in to comment.