Skip to content

Commit 1cc9531

Browse files
committed
implement resuming ahn downloads on retry
1 parent d695506 commit 1cc9531

File tree

2 files changed

+45
-3
lines changed

2 files changed

+45
-3
lines changed

packages/common/src/bag3d/common/utils/requests.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1+
from multiprocessing.dummy import Value
12
from datetime import datetime
23
from pathlib import Path
34
from time import sleep
45
from typing import Mapping, Union
56
from urllib.parse import urlparse, urljoin
7+
import os
68

79
import requests
810
from dagster import (
@@ -35,6 +37,7 @@ def download_file(
3537
chunk_size: int = 1024,
3638
parameters: Mapping = None,
3739
verify: bool = True,
40+
attempt_resume: bool = False,
3841
) -> Union[Path, None]:
3942
"""Download a large file and save it to disk.
4043
@@ -59,6 +62,42 @@ def download_file(
5962
session = requests.Session() # https://stackoverflow.com/a/63417213
6063

6164
try:
65+
head = session.head(url, allow_redirects=True)
66+
head.raise_for_status()
67+
remote_size = int(head.headers.get("content-length", 0))
68+
headers = {}
69+
if attempt_resume and os.path.exists(fpath):
70+
local_size = os.path.getsize(fpath)
71+
headers = {"Range": f"bytes={local_size}-"}
72+
logger.info(f"Resuming download at {local_size}/{remote_size} bytes")
73+
else:
74+
local_size = 0
75+
logger.info(f"Starting download of {remote_size} bytes")
76+
77+
with fpath.open("ab") as fd:
78+
with session.get(
79+
url, headers=headers, stream=True, verify=verify, params=parameters
80+
) as r:
81+
if local_size and r.status_code != 206:
82+
raise ValueError("Server does not support range requests")
83+
elif r.status_code not in (200, 206):
84+
r.raise_for_status()
85+
86+
bytes_written = local_size
87+
last_logged_percent = (
88+
int((bytes_written / remote_size) * 100) if remote_size else 0
89+
)
90+
for data in r.iter_content(chunk_size=chunk_size):
91+
fd.write(data)
92+
bytes_written += len(data)
93+
if remote_size:
94+
percent = int((bytes_written / remote_size) * 100)
95+
if percent > last_logged_percent and percent % 10 == 0:
96+
logger.info(
97+
f"Download progress: {percent}% ({bytes_written}/{remote_size} bytes)"
98+
)
99+
last_logged_percent = percent
100+
62101
r = session.get(url, params=parameters, stream=True, verify=verify)
63102
if r.ok:
64103
with fpath.open("wb") as fd:
@@ -69,9 +108,11 @@ def download_file(
69108
r.raise_for_status()
70109
r.close()
71110
except (
111+
requests.RequestException,
72112
requests.exceptions.BaseHTTPError,
73113
requests.exceptions.HTTPError,
74114
requests.exceptions.ChunkedEncodingError,
115+
ValueError,
75116
) as e: # pragma: no cover
76117
logger.exception(e)
77118
return None

packages/core/src/bag3d/core/assets/ahn/download.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ class LazFilesConfig(Config):
158158
required_resource_keys={"file_store"},
159159
partitions_def=partition_definition_ahn,
160160
tags={"dagster/concurrency_key": "laz_download"},
161-
pool="laz_download"
161+
pool="laz_download",
162162
)
163163
def laz_files_ahn3(context, config: LazFilesConfig, md5_ahn3, tile_index_ahn):
164164
"""AHN3 LAZ files as they are downloaded from PDOK.
@@ -213,7 +213,7 @@ def laz_files_ahn3(context, config: LazFilesConfig, md5_ahn3, tile_index_ahn):
213213
required_resource_keys={"file_store"},
214214
partitions_def=partition_definition_ahn,
215215
tags={"dagster/concurrency_key": "laz_download"},
216-
pool="laz_download"
216+
pool="laz_download",
217217
)
218218
def laz_files_ahn4(context, config: LazFilesConfig, md5_ahn4, tile_index_ahn):
219219
"""AHN4 LAZ files as they are downloaded from PDOK.
@@ -271,7 +271,7 @@ def laz_files_ahn4(context, config: LazFilesConfig, md5_ahn4, tile_index_ahn):
271271
required_resource_keys={"file_store"},
272272
partitions_def=partition_definition_ahn,
273273
tags={"dagster/concurrency_key": "laz_download"},
274-
pool="laz_download"
274+
pool="laz_download",
275275
)
276276
def laz_files_ahn5(context, config: LazFilesConfig, sha256_ahn5, tile_index_ahn):
277277
"""AHN5 LAZ files as they are downloaded from PDOK.
@@ -421,6 +421,7 @@ def download_laz(
421421
target_path=fpath,
422422
chunk_size=1024 * 1024,
423423
verify=verify_ssl,
424+
attempt_resume=True,
424425
)
425426
if fpath_download is None:
426427
# Download failed

0 commit comments

Comments
 (0)