Skip to content

Commit

Permalink
#915: Minor code changes to support DISP-S1 end of frame historical p…
Browse files Browse the repository at this point in the history
…rocessing
  • Loading branch information
philipjyoon committed Sep 24, 2024
1 parent 95be8a9 commit ea23dd8
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 20 deletions.
10 changes: 10 additions & 0 deletions data_subscriber/cslc_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,16 @@ def sensing_time_day_index(sensing_time: datetime, frame_number: int, frame_to_b
frame = frame_to_bursts[frame_number]
return (_calculate_sensing_time_day_index(sensing_time, frame.sensing_datetimes[0]))

def get_nearest_sensing_datetime(frame_sensing_datetimes, sensing_time):
'''Return the nearest sensing datetime in the frame sensing datetime list that is not greater than the sensing time.
It's a linear search in a sorted list but no big deal because there will only ever be a few hundred elements'''

for i, dt in enumerate(frame_sensing_datetimes):
if dt > sensing_time:
return frame_sensing_datetimes[i-1]

return frame_sensing_datetimes[-1]

@cache
def process_disp_frame_burst_hist(file):
'''Process the disp frame burst map json file intended and return 3 dictionaries'''
Expand Down
10 changes: 9 additions & 1 deletion tests/data_subscriber/test_cslc_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from data_subscriber.parser import create_parser
import dateutil
from datetime import datetime, timedelta
from data_subscriber.cmr import DateTimeRange, get_cmr_token
from util.conf_util import SettingsConf

hist_arguments = ["query", "-c", "OPERA_L2_CSLC-S1_V1", "--processing-mode=historical", "--start-date=2021-01-24T23:00:00Z",\
Expand Down Expand Up @@ -53,6 +52,15 @@ def test_burst_to_frame_map():
assert burst_to_frames["T004-006649-IW3"][0] == 831
assert burst_to_frames["T004-006649-IW3"][1] == 832

def test_nearest_sensing_datetime():
nearest_time = cslc_utils.get_nearest_sensing_datetime(disp_burst_map_hist[8882].sensing_datetimes,
dateutil.parser.isoparse("2016-11-02T00:26:48"))
assert nearest_time == dateutil.parser.isoparse("2016-11-02T00:26:47")

nearest_time = cslc_utils.get_nearest_sensing_datetime(disp_burst_map_hist[8882].sensing_datetimes,
dateutil.parser.isoparse("2027-11-02T00:26:48"))
assert nearest_time == dateutil.parser.isoparse("2024-08-04T00:27:24")

#TODO: We may change the database json during production that could have different burst ids for the same frame
#TODO: So we may want to create different versions of this unit test, one for each version of the database json
def test_arg_expansion_hist():
Expand Down
49 changes: 30 additions & 19 deletions tools/run_disp_s1_historical_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,29 +156,40 @@ def form_job_params(p, frame_id, sensing_time_position_zero_based, args, eu):
s_date = datetime.strptime("2000-01-01T00:00:00", ES_DATETIME_FORMAT)
logger.info(f"{frame_id=} reached end of historical processing. No reprocessing needed")

# If we are outside of the database sensing time range, we are done with this frame
# Submit reprocessing job for any remainder within this incomplete k-cycle
try:
e_date = frame_sensing_datetimes[sensing_time_position_zero_based + p.k - 1] + timedelta(minutes=30)
except IndexError:
# If we are outside of the database sensing time range, we are at the very last run for this frame. We have two choices:
# 1. If the number of sensing time is >= k/2, submit one more historical processing job with k = number of sensing times
# 2. If not, Submit reprocessing jobs for all remainder sensing times
end_sensing_date_index = sensing_time_position_zero_based + p.k - 1
if end_sensing_date_index < len(frame_sensing_datetimes)\
and frame_sensing_datetimes[end_sensing_date_index] + timedelta(minutes=30) < data_end_date:
e_date = frame_sensing_datetimes[end_sensing_date_index] + timedelta(minutes=30)
job_param_k = f"--k={p.k}"
else:
logger.info(f"This is the last run for frame {frame_id}")
finished = True
do_submit = False
e_date = datetime.strptime("2000-01-01T00:00:00", ES_DATETIME_FORMAT)
logger.info(f"{frame_id=} reached end of historical processing. The rest of sensing times will be submitted as reprocessing jobs.")

# Print out all the reprocessing job commands. This is temporary until it can be automated
# TODO: submit reprocessing jobs instead of just printing them
for i in range(sensing_time_position_zero_based, len(frame_sensing_datetimes)):
s_date = frame_sensing_datetimes[i] - timedelta(minutes=30)
e_date = frame_sensing_datetimes[i] + timedelta(minutes=30)
logger.info(f"python ~/mozart/ops/opera-pcm/data_subscriber/daac_data_subscriber.py query -c {CSLC_COLLECTION} \
--chunk-size=1 --k={p.k} --m={p.m} --job-queue={p.download_job_queue} --processing-mode=reprocessing --grace-mins=0 \
--start-date={convert_datetime(s_date)} --end-date={convert_datetime(e_date)} --frame-id={frame_id} ")
remaining_sensing_time_count = len(frame_sensing_datetimes) - sensing_time_position_zero_based

if remaining_sensing_time_count >= p.k/2:
do_submit = True
e_date = frame_sensing_datetimes[-1] + timedelta(minutes=30)
job_param_k = f"--k={remaining_sensing_time_count}"
else:
do_submit = False
e_date = datetime.strptime("2000-01-01T00:00:00", ES_DATETIME_FORMAT)
logger.info(f"{frame_id=} reached end of historical processing. The rest of sensing times will be submitted as reprocessing jobs.")

# Print out all the reprocessing job commands. This is temporary until it can be automated
# TODO: submit reprocessing jobs instead of just printing them
for i in range(sensing_time_position_zero_based, len(frame_sensing_datetimes)):
s_date = frame_sensing_datetimes[i] - timedelta(minutes=30)
e_date = frame_sensing_datetimes[i] + timedelta(minutes=30)
logger.info(f"python ~/mozart/ops/opera-pcm/data_subscriber/daac_data_subscriber.py query -c {CSLC_COLLECTION} \
--chunk-size=1 --k={p.k} --m={p.m} --job-queue={p.download_job_queue} --processing-mode=reprocessing --grace-mins=0 \
--start-date={convert_datetime(s_date)} --end-date={convert_datetime(e_date)} --frame-id={frame_id} ")

if s_date < data_start_date:
do_submit = False
if e_date > data_end_date:
do_submit = False
finished = True

'''Query GRQ ES for the previous sensing time day index compressed cslc. If this doesn't exist, we can't process
Expand Down Expand Up @@ -224,7 +235,7 @@ def form_job_params(p, frame_id, sensing_time_position_zero_based, args, eu):
if len(excludes.strip()) > 0:
job_params["exclude_regions"] = f'--exclude-regions={excludes}'

job_params["k"] = f"--k={p.k}"
job_params["k"] = job_param_k

# We need to adjust the m parameter early in the sensing time series
# For example, if this is the very first k-set, there won't be compressed cslc and therefore m should be 1
Expand Down

0 comments on commit ea23dd8

Please sign in to comment.