4
4
5
5
Documentation for DRMS: https://docs.sunpy.org/projects/drms/en/latest/
6
6
"""
7
-
7
+ from typing import Optional
8
8
import argparse
9
9
import logging
10
10
import multiprocessing
11
11
import os
12
12
from datetime import timedelta , datetime
13
13
from urllib import request
14
14
15
-
15
+ import tqdm
16
+ import warnings
17
+ import os
16
18
import drms
17
19
import numpy as np
18
20
import pandas as pd
19
21
from astropy .io import fits
20
22
from sunpy .io ._fits import header_to_fits
21
23
from sunpy .util import MetaDict
24
+ from helio_tools ._src .utils .time import check_datetime_format
25
+ import typer
26
+ from loguru import logger
22
27
23
28
DEFAULT_WAVELENGTHS = [171 , 193 , 211 , 304 ]
24
29
25
30
26
31
class SDODownloader :
27
- def __init__ (self , base_path : str = None ,
28
- email : str = None ,
29
- wavelengths : list [str | int | float ] = DEFAULT_WAVELENGTHS ,
30
- n_workers : int = 5 ) -> None :
32
+ def __init__ (
33
+ self ,
34
+ base_path : str = None ,
35
+ email : str = None ,
36
+ wavelengths : list [str | int | float ] = DEFAULT_WAVELENGTHS ,
37
+ n_workers : int = 5 ,
38
+ ) -> None :
31
39
"""The SDO Downloader is an efficent way to download data from the SDO database.
32
40
33
41
Args:
@@ -45,33 +53,38 @@ def __init__(self, base_path: str = None,
45
53
self .ds_path = base_path
46
54
self .wavelengths = [str (wl ) for wl in wavelengths ]
47
55
self .n_workers = n_workers
48
- [os .makedirs (os .path .join (base_path , wl ), exist_ok = True )
49
- for wl in self .wavelengths + ['6173' ]]
56
+ [
57
+ os .makedirs (os .path .join (base_path , wl ), exist_ok = True )
58
+ for wl in self .wavelengths + ["6173" ]
59
+ ]
50
60
51
61
self .drms_client = drms .Client (email = email )
52
62
53
63
def downloadDate (self , date : datetime ):
54
- """Download FITS data for a specific date.
55
- """
64
+ """Download FITS data for a specific date."""
56
65
id = date .isoformat ()
57
- logging .info (' Start download: %s' % id )
58
- time_param = ' %sZ' % date .isoformat ('_' , timespec = ' seconds' )
66
+ logging .info (" Start download: %s" % id )
67
+ time_param = " %sZ" % date .isoformat ("_" , timespec = " seconds" )
59
68
60
69
# query Magnetogram Instrument
61
- ds_hmi = ' hmi.M_720s[%s]{magnetogram}' % time_param
70
+ ds_hmi = " hmi.M_720s[%s]{magnetogram}" % time_param
62
71
keys_hmi = self .drms_client .keys (ds_hmi )
63
72
header_hmi , segment_hmi = self .drms_client .query (
64
- ds_hmi , key = ',' .join (keys_hmi ), seg = 'magnetogram' )
73
+ ds_hmi , key = "," .join (keys_hmi ), seg = "magnetogram"
74
+ )
65
75
if len (header_hmi ) != 1 or np .any (header_hmi .QUALITY != 0 ):
66
76
self .fetchDataFallback (date )
67
77
return
68
78
69
79
# query EUV Instrument
70
- ds_euv = 'aia.lev1_euv_12s[%s][%s]{image}' % (
71
- time_param , ',' .join (self .wavelengths ))
80
+ ds_euv = "aia.lev1_euv_12s[%s][%s]{image}" % (
81
+ time_param ,
82
+ "," .join (self .wavelengths ),
83
+ )
72
84
keys_euv = self .drms_client .keys (ds_euv )
73
85
header_euv , segment_euv = self .drms_client .query (
74
- ds_euv , key = ',' .join (keys_euv ), seg = 'image' )
86
+ ds_euv , key = "," .join (keys_euv ), seg = "image"
87
+ )
75
88
if len (header_euv ) != len (self .wavelengths ) or np .any (header_euv .QUALITY != 0 ):
76
89
self .fetchDataFallback (date )
77
90
return
@@ -84,91 +97,97 @@ def downloadDate(self, date: datetime):
84
97
85
98
with multiprocessing .Pool (self .n_workers ) as p :
86
99
p .map (self .download , queue )
87
- logging .info (' Finished: %s' % id )
100
+ logging .info (" Finished: %s" % id )
88
101
89
102
def download (self , sample : tuple [dict , str , datetime ]):
90
103
header , segment , t = sample
91
104
try :
92
- dir = os .path .join (self .ds_path , '%d' % header ['WAVELNTH' ])
93
- map_path = os .path .join (dir , '%s.fits' %
94
- t .isoformat ('T' , timespec = 'seconds' ))
105
+ dir = os .path .join (self .ds_path , "%d" % header ["WAVELNTH" ])
106
+ map_path = os .path .join (
107
+ dir , "%s.fits" % t .isoformat ("T" , timespec = "seconds" )
108
+ )
95
109
if os .path .exists (map_path ):
96
110
return map_path
97
111
# load map
98
- url = ' http://jsoc.stanford.edu' + segment
112
+ url = " http://jsoc.stanford.edu" + segment
99
113
request .urlretrieve (url , filename = map_path )
100
114
101
- header [' DATE_OBS' ] = header [' DATE__OBS' ]
115
+ header [" DATE_OBS" ] = header [" DATE__OBS" ]
102
116
header = header_to_fits (MetaDict (header ))
103
- with fits .open (map_path , ' update' ) as f :
117
+ with fits .open (map_path , " update" ) as f :
104
118
hdr = f [1 ].header
105
119
for k , v in header .items ():
106
120
if pd .isna (v ):
107
121
continue
108
122
hdr [k ] = v
109
- f .verify (' silentfix' )
123
+ f .verify (" silentfix" )
110
124
111
125
return map_path
112
126
except Exception as ex :
113
- logging .info (' Download failed: %s (requeue)' % header [' DATE__OBS' ])
127
+ logging .info (" Download failed: %s (requeue)" % header [" DATE__OBS" ])
114
128
logging .info (ex )
115
129
raise ex
116
130
117
131
def fetchDataFallback (self , date : datetime ):
118
132
id = date .isoformat ()
119
133
120
- logging .info (' Fallback download: %s' % id )
134
+ logging .info (" Fallback download: %s" % id )
121
135
# query Magnetogram
122
136
t = date - timedelta (hours = 24 )
123
- ds_hmi = 'hmi.M_720s[%sZ/12h@720s]{magnetogram}' % t .replace (
124
- tzinfo = None ).isoformat ('_' , timespec = 'seconds' )
137
+ ds_hmi = "hmi.M_720s[%sZ/12h@720s]{magnetogram}" % t .replace (
138
+ tzinfo = None
139
+ ).isoformat ("_" , timespec = "seconds" )
125
140
keys_hmi = self .drms_client .keys (ds_hmi )
126
141
header_tmp , segment_tmp = self .drms_client .query (
127
- ds_hmi , key = ',' .join (keys_hmi ), seg = 'magnetogram' )
128
- assert len (header_tmp ) != 0 , 'No data found!'
129
- date_str = header_tmp ['DATE__OBS' ].replace (
130
- 'MISSING' , '' ).str .replace ('60' , '59' ) # fix date format
131
- date_diff = np .abs (pd .to_datetime (
132
- date_str ).dt .tz_localize (None ) - date )
142
+ ds_hmi , key = "," .join (keys_hmi ), seg = "magnetogram"
143
+ )
144
+ assert len (header_tmp ) != 0 , "No data found!"
145
+ date_str = (
146
+ header_tmp ["DATE__OBS" ].replace ("MISSING" , "" ).str .replace ("60" , "59" )
147
+ ) # fix date format
148
+ date_diff = np .abs (pd .to_datetime (date_str ).dt .tz_localize (None ) - date )
133
149
# sort and filter
134
- header_tmp [' date_diff' ] = date_diff
135
- header_tmp .sort_values (' date_diff' )
136
- segment_tmp [' date_diff' ] = date_diff
137
- segment_tmp .sort_values (' date_diff' )
150
+ header_tmp [" date_diff" ] = date_diff
151
+ header_tmp .sort_values (" date_diff" )
152
+ segment_tmp [" date_diff" ] = date_diff
153
+ segment_tmp .sort_values (" date_diff" )
138
154
cond_tmp = header_tmp .QUALITY == 0
139
155
header_tmp = header_tmp [cond_tmp ]
140
156
segment_tmp = segment_tmp [cond_tmp ]
141
- assert len (header_tmp ) > 0 , ' No valid quality flag found'
157
+ assert len (header_tmp ) > 0 , " No valid quality flag found"
142
158
# replace invalid
143
- header_hmi = header_tmp .iloc [0 ].drop (' date_diff' )
144
- segment_hmi = segment_tmp .iloc [0 ].drop (' date_diff' )
159
+ header_hmi = header_tmp .iloc [0 ].drop (" date_diff" )
160
+ segment_hmi = segment_tmp .iloc [0 ].drop (" date_diff" )
145
161
############################################################
146
162
# query EUV
147
163
header_euv , segment_euv = [], []
148
164
t = date - timedelta (hours = 6 )
149
165
for wl in self .wavelengths :
150
- euv_ds = 'aia.lev1_euv_12s[%sZ/12h@12s][%s]{image}' % (
151
- t .replace (tzinfo = None ).isoformat ('_' , timespec = 'seconds' ), wl )
166
+ euv_ds = "aia.lev1_euv_12s[%sZ/12h@12s][%s]{image}" % (
167
+ t .replace (tzinfo = None ).isoformat ("_" , timespec = "seconds" ),
168
+ wl ,
169
+ )
152
170
keys_euv = self .drms_client .keys (euv_ds )
153
171
header_tmp , segment_tmp = self .drms_client .query (
154
- euv_ds , key = ',' .join (keys_euv ), seg = 'image' )
155
- assert len (header_tmp ) != 0 , 'No data found!'
156
- date_str = header_tmp ['DATE__OBS' ].replace (
157
- 'MISSING' , '' ).str .replace ('60' , '59' ) # fix date format
158
- date_diff = (pd .to_datetime (
159
- date_str ).dt .tz_localize (None ) - date ).abs ()
172
+ euv_ds , key = "," .join (keys_euv ), seg = "image"
173
+ )
174
+ assert len (header_tmp ) != 0 , "No data found!"
175
+ date_str = (
176
+ header_tmp ["DATE__OBS" ].replace ("MISSING" , "" ).str .replace ("60" , "59" )
177
+ ) # fix date format
178
+ date_diff = (pd .to_datetime (date_str ).dt .tz_localize (None ) - date ).abs ()
160
179
# sort and filter
161
- header_tmp [' date_diff' ] = date_diff
162
- header_tmp .sort_values (' date_diff' )
163
- segment_tmp [' date_diff' ] = date_diff
164
- segment_tmp .sort_values (' date_diff' )
180
+ header_tmp [" date_diff" ] = date_diff
181
+ header_tmp .sort_values (" date_diff" )
182
+ segment_tmp [" date_diff" ] = date_diff
183
+ segment_tmp .sort_values (" date_diff" )
165
184
cond_tmp = header_tmp .QUALITY == 0
166
185
header_tmp = header_tmp [cond_tmp ]
167
186
segment_tmp = segment_tmp [cond_tmp ]
168
- assert len (header_tmp ) > 0 , ' No valid quality flag found'
187
+ assert len (header_tmp ) > 0 , " No valid quality flag found"
169
188
# replace invalid
170
- header_euv .append (header_tmp .iloc [0 ].drop (' date_diff' ))
171
- segment_euv .append (segment_tmp .iloc [0 ].drop (' date_diff' ))
189
+ header_euv .append (header_tmp .iloc [0 ].drop (" date_diff" ))
190
+ segment_euv .append (segment_tmp .iloc [0 ].drop (" date_diff" ))
172
191
173
192
queue = []
174
193
queue += [(header_hmi .to_dict (), segment_hmi .magnetogram , date )]
@@ -178,24 +197,48 @@ def fetchDataFallback(self, date: datetime):
178
197
with multiprocessing .Pool (self .n_workers ) as p :
179
198
p .map (self .download , queue )
180
199
181
- logging .info ('Finished: %s' % id )
200
+ logging .info ("Finished: %s" % id )
201
+
182
202
203
+ def download_sdo_data (
204
+ start_date : str = "2022-3-1" ,
205
+ end_date : str = "2023-3-2" ,
206
+ email : Optional [str ] = None ,
207
+ base_path : Optional [str ] = None ,
208
+ n_workers : int = 8 ,
209
+ ):
210
+ if base_path is None :
211
+ base_path = os .path .join (os .path .expanduser ("~" ), "sdo-path" )
183
212
184
- def main ():
185
- import os
186
- email = os .getenv ('SDO_EMAIL' )
187
- base_path = os .path .join (os .path .expanduser ('~' ), 'sdo-data' )
213
+ logger .info (f"BasePath: { base_path } " )
188
214
215
+ # check datetime object
216
+ start_date : datetime = check_datetime_format (start_date , sensor = "sodo" )
217
+ end_date : datetime = check_datetime_format (end_date , sensor = "sodo" )
218
+
219
+ logger .info (f"Period: { start_date } -{ end_date } " )
220
+
221
+ if email is None :
222
+ email = os .getenv ("SDO_EMAIL" )
223
+ logger .info (f"Email: { email } " )
189
224
downloader_sdo = SDODownloader (
190
- base_path = base_path , email = email , n_workers = 8 )
225
+ base_path = base_path , email = email , n_workers = n_workers
226
+ )
227
+
228
+ dates = [
229
+ start_date + i * timedelta (hours = 12 )
230
+ for i in range ((end_date - start_date ) // timedelta (hours = 12 ))
231
+ ]
232
+
233
+ pbar = tqdm .tqdm (dates )
234
+
235
+ with warnings .catch_warnings ():
236
+ warnings .simplefilter ("ignore" )
191
237
192
- start_date = datetime (2022 , 3 , 1 )
193
- end_date = datetime (2023 , 3 , 2 )
194
- import tqdm
195
- for d in tqdm .tqdm ([start_date + i * timedelta (hours = 12 ) for i in
196
- range ((end_date - start_date ) // timedelta (hours = 12 ))]):
197
- downloader_sdo .downloadDate (d )
238
+ for idate in pbar :
239
+ pbar .set_description (f"Date: { idate } " )
240
+ downloader_sdo .downloadDate (idate )
198
241
199
242
200
- if __name__ == ' __main__' :
201
- main ( )
243
+ if __name__ == " __main__" :
244
+ typer . run ( download_sdo_data )
0 commit comments