Skip to content

Commit

Permalink
Update loan_accounts bookmarking strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
RushiT0122 committed Apr 9, 2024
1 parent ec3bc76 commit 1dbdd6b
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 12 deletions.
1 change: 1 addition & 0 deletions tap_mambu/tap_generators/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ def __init__(self, stream_name, client, config, state, sub_type):
self.client = client
self.config = config
self.state = state
self.state_changed = True
self.sub_type = sub_type
self.date_windowing = False
self.date_window_size = 5
Expand Down
29 changes: 26 additions & 3 deletions tap_mambu/tap_generators/loan_accounts_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,46 @@
class LoanAccountsLMGenerator(MultithreadedBookmarkGenerator):
def __init__(self, stream_name, client, config, state, sub_type):
super(LoanAccountsLMGenerator, self).__init__(stream_name, client, config, state, sub_type)
self.max_threads = 5
self.max_threads = 3

def _init_endpoint_config(self):
super(LoanAccountsLMGenerator, self)._init_endpoint_config()
self.endpoint_path = "loans:search"
self.endpoint_bookmark_field = "lastModifiedDate"
self.endpoint_sorting_criteria = {
"field": "id",
"field": "lastModifiedDate",
"order": "ASC"
}

def prepare_batch_params(self):
super(LoanAccountsLMGenerator, self).prepare_batch_params()
self.endpoint_filter_criteria[0]["value"] = datetime_to_utc_str(self.endpoint_intermediary_bookmark_value)
self.endpoint_filter_criteria[0]["value"] = datetime_to_utc_str(
self.endpoint_intermediary_bookmark_value)

def set_last_sync_window_start(self, start):
self.state["last_sync_windows_start_lmg"] = start
self.state_changed = True

def get_last_sync_window_start(self):
return self.state.get("last_sync_windows_start_lmg")

def remove_last_sync_window_start(self):
if "last_sync_windows_start_ad" in self.state:
del self.state["last_sync_windows_start_lmg"]


class LoanAccountsADGenerator(LoanAccountsLMGenerator):
def _init_endpoint_config(self):
super(LoanAccountsADGenerator, self)._init_endpoint_config()
self.endpoint_bookmark_field = "lastAccountAppraisalDate"

def set_last_sync_window_start(self, start):
self.state["last_sync_windows_start_ad"] = start
self.state_changed = True

def get_last_sync_window_start(self):
return self.state.get("last_sync_windows_start_ad")

def remove_last_sync_window_start(self):
if "last_sync_windows_start_ad" in self.state:
del self.state["last_sync_windows_start_ad"]
35 changes: 28 additions & 7 deletions tap_mambu/tap_generators/multithreaded_offset_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,28 +124,49 @@ def preprocess_batches(self, final_buffer):
self.preprocess_record(raw_record)
self.last_batch_size = len(self.last_batch_set)

def set_last_sync_window_start(self, start):
self.state["last_sync_windows_start"] = start
self.state_changed = True

def get_last_sync_window_start(self):
return self.state.get("last_sync_windows_start")

def remove_last_sync_window_start(self):
if "last_sync_windows_start_ad" in self.state:
del self.state["last_sync_windows_start_ad"]

@backoff.on_exception(backoff.expo, RuntimeError, max_tries=5)
def _all_fetch_batch_steps(self):
if self.date_windowing:
last_sync_window_start = self.get_last_sync_window_start()
start_datetime = datetime_to_utc_str(str_to_localized_datetime(
get_bookmark(self.state, self.stream_name, self.sub_type, self.start_date)))[:10]
end_datetime = datetime_to_utc_str(utc_now() + timedelta(days=1))[:10]
start = datetime.strptime(start_datetime, '%Y-%m-%d').date()
end = datetime.strptime(end_datetime, '%Y-%m-%d').date()
get_bookmark(self.state, self.stream_name, self.sub_type, self.start_date)) - timedelta(days=self.date_window_size))

if last_sync_window_start:
start = str_to_localized_datetime(last_sync_window_start)
else:
start = str_to_localized_datetime(start_datetime)

end_datetime = datetime_to_utc_str(utc_now() + timedelta(days=1))
end = str_to_localized_datetime(end_datetime)
temp = start + timedelta(days=self.date_window_size)
stop_iteration = True
final_buffer = []
while start < end:
self.set_last_sync_window_start(datetime_to_utc_str(start))
# Limit the buffer size by holding generators from creating new batches
while len(self.buffer) > self.max_buffer_size:
time.sleep(1)
if len(self.buffer) > self.max_buffer_size:
while len(self.buffer):
time.sleep(1)
self.modify_request_params(start, temp)
final_buffer, stop_iteration = self.collect_batches(self.queue_batches())
final_buffer, stop_iteration = self.collect_batches(
self.queue_batches())
self.preprocess_batches(final_buffer)
if not final_buffer or stop_iteration:
self.offset = 0
start = temp
temp = start + timedelta(days=self.date_window_size)
self.remove_last_sync_window_start()
else:
final_buffer, stop_iteration = self.collect_batches(self.queue_batches())
self.preprocess_batches(final_buffer)
Expand Down
9 changes: 7 additions & 2 deletions tap_mambu/tap_processors/processor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from abc import ABC

from singer import write_record, metadata, write_schema, get_logger, metrics, utils
from copy import deepcopy

from ..helpers import convert, get_bookmark, write_bookmark
from ..helpers.transformer import Transformer
Expand Down Expand Up @@ -66,10 +67,14 @@ def process_records(self):
record_count += 1
self._process_child_records(record)
counter.increment()

# Write bookmark after hundred records
if record_count%100 == 0:
if record_count % 1000 == 0:
self.write_bookmark()

if self.generators[0].state_changed:
self.write_bookmark()
self.generators[0].state_changed = False

return record_count

Expand Down

0 comments on commit 1dbdd6b

Please sign in to comment.