3
3
from time import sleep
4
4
from typing import Mapping , Union
5
5
from urllib .parse import urlparse , urljoin
6
+ import os
6
7
7
8
import requests
8
9
from dagster import (
@@ -35,6 +36,7 @@ def download_file(
35
36
chunk_size : int = 1024 ,
36
37
parameters : Mapping = None ,
37
38
verify : bool = True ,
39
+ attempt_resume : bool = False ,
38
40
) -> Union [Path , None ]:
39
41
"""Download a large file and save it to disk.
40
42
@@ -59,6 +61,42 @@ def download_file(
59
61
session = requests .Session () # https://stackoverflow.com/a/63417213
60
62
61
63
try :
64
+ head = session .head (url , allow_redirects = True )
65
+ head .raise_for_status ()
66
+ remote_size = int (head .headers .get ("content-length" , 0 ))
67
+ headers = {}
68
+ if attempt_resume and os .path .exists (fpath ):
69
+ local_size = os .path .getsize (fpath )
70
+ headers = {"Range" : f"bytes={ local_size } -" }
71
+ logger .info (f"Resuming download at { local_size } /{ remote_size } bytes" )
72
+ else :
73
+ local_size = 0
74
+ logger .info (f"Starting download of { remote_size } bytes" )
75
+
76
+ with fpath .open ("ab" ) as fd :
77
+ with session .get (
78
+ url , headers = headers , stream = True , verify = verify , params = parameters
79
+ ) as r :
80
+ if local_size and r .status_code != 206 :
81
+ raise ValueError ("Server does not support range requests" )
82
+ elif r .status_code not in (200 , 206 ):
83
+ r .raise_for_status ()
84
+
85
+ bytes_written = local_size
86
+ last_logged_percent = (
87
+ int ((bytes_written / remote_size ) * 100 ) if remote_size else 0
88
+ )
89
+ for data in r .iter_content (chunk_size = chunk_size ):
90
+ fd .write (data )
91
+ bytes_written += len (data )
92
+ if remote_size :
93
+ percent = int ((bytes_written / remote_size ) * 100 )
94
+ if percent > last_logged_percent and percent % 10 == 0 :
95
+ logger .info (
96
+ f"Download progress: { percent } % ({ bytes_written } /{ remote_size } bytes)"
97
+ )
98
+ last_logged_percent = percent
99
+
62
100
r = session .get (url , params = parameters , stream = True , verify = verify )
63
101
if r .ok :
64
102
with fpath .open ("wb" ) as fd :
@@ -69,9 +107,11 @@ def download_file(
69
107
r .raise_for_status ()
70
108
r .close ()
71
109
except (
110
+ requests .RequestException ,
72
111
requests .exceptions .BaseHTTPError ,
73
112
requests .exceptions .HTTPError ,
74
113
requests .exceptions .ChunkedEncodingError ,
114
+ ValueError ,
75
115
) as e : # pragma: no cover
76
116
logger .exception (e )
77
117
return None
0 commit comments