From 1dbdd6bd041e58f80b5b404fdcba3b7d7e37a358 Mon Sep 17 00:00:00 2001 From: Rushikesh Todkar <98420315+RushiT0122@users.noreply.github.com> Date: Tue, 9 Apr 2024 17:18:57 +0530 Subject: [PATCH] Update loan_accounts bookmarking strategy --- tap_mambu/tap_generators/generator.py | 1 + .../tap_generators/loan_accounts_generator.py | 29 +++++++++++++-- .../multithreaded_offset_generator.py | 35 +++++++++++++++---- tap_mambu/tap_processors/processor.py | 9 +++-- 4 files changed, 62 insertions(+), 12 deletions(-) diff --git a/tap_mambu/tap_generators/generator.py b/tap_mambu/tap_generators/generator.py index d15459c..73c0bb2 100644 --- a/tap_mambu/tap_generators/generator.py +++ b/tap_mambu/tap_generators/generator.py @@ -16,6 +16,7 @@ def __init__(self, stream_name, client, config, state, sub_type): self.client = client self.config = config self.state = state + self.state_changed = True self.sub_type = sub_type self.date_windowing = False self.date_window_size = 5 diff --git a/tap_mambu/tap_generators/loan_accounts_generator.py b/tap_mambu/tap_generators/loan_accounts_generator.py index ce001d4..4b50fce 100644 --- a/tap_mambu/tap_generators/loan_accounts_generator.py +++ b/tap_mambu/tap_generators/loan_accounts_generator.py @@ -5,23 +5,46 @@ class LoanAccountsLMGenerator(MultithreadedBookmarkGenerator): def __init__(self, stream_name, client, config, state, sub_type): super(LoanAccountsLMGenerator, self).__init__(stream_name, client, config, state, sub_type) - self.max_threads = 5 + self.max_threads = 3 def _init_endpoint_config(self): super(LoanAccountsLMGenerator, self)._init_endpoint_config() self.endpoint_path = "loans:search" self.endpoint_bookmark_field = "lastModifiedDate" self.endpoint_sorting_criteria = { - "field": "id", + "field": "lastModifiedDate", "order": "ASC" } def prepare_batch_params(self): super(LoanAccountsLMGenerator, self).prepare_batch_params() - self.endpoint_filter_criteria[0]["value"] = datetime_to_utc_str(self.endpoint_intermediary_bookmark_value) + self.endpoint_filter_criteria[0]["value"] = datetime_to_utc_str( + self.endpoint_intermediary_bookmark_value) + + def set_last_sync_window_start(self, start): + self.state["last_sync_windows_start_lmg"] = start + self.state_changed = True + + def get_last_sync_window_start(self): + return self.state.get("last_sync_windows_start_lmg") + + def remove_last_sync_window_start(self): + if "last_sync_windows_start_ad" in self.state: + del self.state["last_sync_windows_start_lmg"] class LoanAccountsADGenerator(LoanAccountsLMGenerator): def _init_endpoint_config(self): super(LoanAccountsADGenerator, self)._init_endpoint_config() self.endpoint_bookmark_field = "lastAccountAppraisalDate" + + def set_last_sync_window_start(self, start): + self.state["last_sync_windows_start_ad"] = start + self.state_changed = True + + def get_last_sync_window_start(self): + return self.state.get("last_sync_windows_start_ad") + + def remove_last_sync_window_start(self): + if "last_sync_windows_start_ad" in self.state: + del self.state["last_sync_windows_start_ad"] diff --git a/tap_mambu/tap_generators/multithreaded_offset_generator.py b/tap_mambu/tap_generators/multithreaded_offset_generator.py index 7ee02ff..0a69e33 100644 --- a/tap_mambu/tap_generators/multithreaded_offset_generator.py +++ b/tap_mambu/tap_generators/multithreaded_offset_generator.py @@ -124,28 +124,49 @@ def preprocess_batches(self, final_buffer): self.preprocess_record(raw_record) self.last_batch_size = len(self.last_batch_set) + def set_last_sync_window_start(self, start): + self.state["last_sync_windows_start"] = start + self.state_changed = True + + def get_last_sync_window_start(self): + return self.state.get("last_sync_windows_start") + + def remove_last_sync_window_start(self): + if "last_sync_windows_start_ad" in self.state: + del self.state["last_sync_windows_start_ad"] + @backoff.on_exception(backoff.expo, RuntimeError, max_tries=5) def _all_fetch_batch_steps(self): if self.date_windowing: + last_sync_window_start = self.get_last_sync_window_start() start_datetime = datetime_to_utc_str(str_to_localized_datetime( - get_bookmark(self.state, self.stream_name, self.sub_type, self.start_date)))[:10] - end_datetime = datetime_to_utc_str(utc_now() + timedelta(days=1))[:10] - start = datetime.strptime(start_datetime, '%Y-%m-%d').date() - end = datetime.strptime(end_datetime, '%Y-%m-%d').date() + get_bookmark(self.state, self.stream_name, self.sub_type, self.start_date)) - timedelta(days=self.date_window_size)) + + if last_sync_window_start: + start = str_to_localized_datetime(last_sync_window_start) + else: + start = str_to_localized_datetime(start_datetime) + + end_datetime = datetime_to_utc_str(utc_now() + timedelta(days=1)) + end = str_to_localized_datetime(end_datetime) temp = start + timedelta(days=self.date_window_size) stop_iteration = True final_buffer = [] while start < end: + self.set_last_sync_window_start(datetime_to_utc_str(start)) # Limit the buffer size by holding generators from creating new batches - while len(self.buffer) > self.max_buffer_size: - time.sleep(1) + if len(self.buffer) > self.max_buffer_size: + while len(self.buffer): + time.sleep(1) self.modify_request_params(start, temp) - final_buffer, stop_iteration = self.collect_batches(self.queue_batches()) + final_buffer, stop_iteration = self.collect_batches( + self.queue_batches()) self.preprocess_batches(final_buffer) if not final_buffer or stop_iteration: self.offset = 0 start = temp temp = start + timedelta(days=self.date_window_size) + self.remove_last_sync_window_start() else: final_buffer, stop_iteration = self.collect_batches(self.queue_batches()) self.preprocess_batches(final_buffer) diff --git a/tap_mambu/tap_processors/processor.py b/tap_mambu/tap_processors/processor.py index 1b657af..20f4fdd 100644 --- a/tap_mambu/tap_processors/processor.py +++ b/tap_mambu/tap_processors/processor.py @@ -1,6 +1,7 @@ from abc import ABC from singer import write_record, metadata, write_schema, get_logger, metrics, utils +from copy import deepcopy from ..helpers import convert, get_bookmark, write_bookmark from ..helpers.transformer import Transformer @@ -66,10 +67,14 @@ def process_records(self): record_count += 1 self._process_child_records(record) counter.increment() - + # Write bookmark after hundred records - if record_count%100 == 0: + if record_count % 1000 == 0: + self.write_bookmark() + + if self.generators[0].state_changed: self.write_bookmark() + self.generators[0].state_changed = False return record_count