diff --git a/scripts/fresh_indices/TooMuchToCatchUpException.py b/scripts/fresh_indices/TooMuchToCatchUpException.py new file mode 100644 index 00000000..7f7beb5a --- /dev/null +++ b/scripts/fresh_indices/TooMuchToCatchUpException.py @@ -0,0 +1,6 @@ +class TooMuchToCatchUpException(Exception): + """Exception raised when the number of UUIDs to catch-up exceeds the configured value.""" + + def __init__(self, message="The number of UUIDs to catch-up exceeds the configured maximum."): + self.message = message + super().__init__(self.message) \ No newline at end of file diff --git a/scripts/fresh_indices/es_manager.py b/scripts/fresh_indices/es_manager.py index bf1a1533..3d81c5c6 100644 --- a/scripts/fresh_indices/es_manager.py +++ b/scripts/fresh_indices/es_manager.py @@ -3,6 +3,7 @@ import logging from IndexBlockType import IndexBlockType from AggQueryType import AggQueryType +from TooMuchToCatchUpException import TooMuchToCatchUpException # Set logging format and level (default is warning) # All the API logging is forwarded to the uWSGI server and gets written into the log file `uwsgo-entity-api.log` @@ -79,7 +80,7 @@ def get_index_document_count(self, index_name) -> int: # Log the full stack trace, prepend a line with our message logger.exception(msg) - def get_document_uuids_by_timestamps(self, index_name:str, timestamp_data_list:list ) -> list: + def get_document_uuids_by_timestamps(self, index_name:str, timestamp_data_list:list, expected_max_hits:int ) -> list: replacement_str=','.join(timestamp_data_list) query_json='''{ "query": { @@ -90,9 +91,12 @@ def get_document_uuids_by_timestamps(self, index_name:str, timestamp_data_list:l "minimum_should_match": 1 } }, + "size": MAX_HITS_GO_HERE, "_source": false }''' query_json=query_json.replace('TIME_STAMP_RANGES_GO_HERE', replacement_str) + query_json = query_json.replace('MAX_HITS_GO_HERE', str(expected_max_hits)) + logger.debug(f"request query payload is query_json={query_json}") headers = {'Content-Type': 'application/json'} try: rspn = requests.post(f"{self.elasticsearch_url}/{index_name}/_search" @@ -102,6 +106,18 @@ def get_document_uuids_by_timestamps(self, index_name:str, timestamp_data_list:l post_create_revised_uuids = [] rspn_json=rspn.json() + if 'hits' in rspn_json and \ + 'total' in rspn_json['hits'] and \ + 'value' in rspn_json['hits']['total']: + entity_count_to_catchup=int(rspn_json['hits']['total']['value']) + if entity_count_to_catchup > expected_max_hits: + logger.critical(f"This script is designed to work with {expected_max_hits} or fewer" + f" UUIDs to 'catch-up'. Found {entity_count_to_catchup}, so" + f" data would be lost.") + logger.critical(f"You can adjust the MAX_EXPECTED_CATCH_UP_UUID_COUNT parameter of" + f" fresh_indices.ini up to the ElasticSearch product limit.") + raise TooMuchToCatchUpException(f"Cannot catch-up {entity_count_to_catchup} UUIDs." + f" Maximum expected is {expected_max_hits}.") if 'hits' in rspn_json and 'hits' in rspn_json['hits']: for hit in rspn_json['hits']['hits']: post_create_revised_uuids.append(hit['_id']) @@ -112,6 +128,8 @@ def get_document_uuids_by_timestamps(self, index_name:str, timestamp_data_list:l else: logger.error(f"Search of {index_name} for post-create revised documents failed:") logger.error(f"Error Message: {rspn.text}") + except TooMuchToCatchUpException as tmtcue: + raise tmtcue except Exception as e: msg = f"Exception encountered executing ESManager.get_document_uuids_by_timestamps()" \ f" with query_json '{query_json}':" diff --git a/scripts/fresh_indices/fresh_indices.ini.example b/scripts/fresh_indices/fresh_indices.ini.example index ed30b58d..62935ac2 100644 --- a/scripts/fresh_indices/fresh_indices.ini.example +++ b/scripts/fresh_indices/fresh_indices.ini.example @@ -1,5 +1,6 @@ [LocalServerSettings] EXEC_INFO_DIR=./exec_info +MAX_EXPECTED_CATCH_UP_UUID_COUNT=20 [FullReindexSettings] ; from FillStrategyType.py FillStrategyType enum diff --git a/scripts/fresh_indices/fresh_indices.py b/scripts/fresh_indices/fresh_indices.py index 26c44538..65da72be 100644 --- a/scripts/fresh_indices/fresh_indices.py +++ b/scripts/fresh_indices/fresh_indices.py @@ -24,12 +24,14 @@ from AggQueryType import AggQueryType from IndexBlockType import IndexBlockType from es_manager import ESManager +from TooMuchToCatchUpException import TooMuchToCatchUpException def init(): global logger global fill_strategy global appcfg global EXEC_INFO_DIR + global MAX_EXPECTED_CATCH_UP_UUID_COUNT # # Read configuration from the INI file and set global constants @@ -39,6 +41,8 @@ def init(): config_file_name = 'fresh_indices.ini' Config.read(config_file_name) EXEC_INFO_DIR = Config.get('LocalServerSettings', 'EXEC_INFO_DIR') + str_MAX_EXPECTED_CATCH_UP_UUID_COUNT = Config.get('LocalServerSettings', 'MAX_EXPECTED_CATCH_UP_UUID_COUNT') + MAX_EXPECTED_CATCH_UP_UUID_COUNT=int(str_MAX_EXPECTED_CATCH_UP_UUID_COUNT) FILL_STRATEGY_ENUM = Config.get('FullReindexSettings', 'FILL_STRATEGY_ENUM') except Exception as e: logger.error(f"Reading {config_file_name}, got error'{str(e)}'.") @@ -437,9 +441,6 @@ def catch_up_new_index(es_mgr:ESManager,op_data_key:int)->None: f" timestamp_field_name={timestamp_field_name}," f" looking for documents with timestamp_value greater than {timestamp_value}") timestamp_range_json_list.append(f'{{"range": {{"{timestamp_field_name}": {{"gt": {timestamp_value}}}}}}}') - timestamp_data = { 'timestamp_field': timestamp_field_name - , 'timestamp_op': 'gt' - , 'timestamp_value': op_data[previous_op_data_seq]['index_info'][source_index][AggQueryType.MAX.value][timestamp_field_name]} # Query documents with timestamps subsequent to the last command. modified_index_uuids=esmanager.get_document_uuids_by_timestamps(index_name=source_index , timestamp_data_list=timestamp_range_json_list ) @@ -502,6 +503,7 @@ def catch_up_new_index(es_mgr:ESManager,op_data_key:int)->None: def catch_up_live_index(es_mgr:ESManager)->None: global op_data global op_data_supplement + global MAX_EXPECTED_CATCH_UP_UUID_COUNT # Need to re-identify the indices which are now live for the service. Rather than extract # op_data['0']['index_info'].keys() and parse each key to attempt to match an entry in @@ -538,16 +540,21 @@ def catch_up_live_index(es_mgr:ESManager)->None: timestamp_range_json_list=[] for timestamp_field_name in op_data['0']['index_info'][active_index][AggQueryType.MAX.value].keys(): timestamp_value=op_data['0']['index_info'][active_index][AggQueryType.MAX.value][timestamp_field_name] - logger.debug(f"For active_index={active_index}," - f" timestamp_field_name={timestamp_field_name}," - f" looking for documents with timestamp_value greater than {timestamp_value}") + logger.debug(f"Looking for documents in {flush_index}" + f" with timestamp_value greater than {timestamp_value}," + f" Captured from timestamp_field_name={timestamp_field_name}" + f" of active_index={active_index}" + f" during the last operation.") timestamp_range_json_list.append(f'{{"range": {{"{timestamp_field_name}": {{"gt": {timestamp_value}}}}}}}') - timestamp_data = { 'timestamp_field': timestamp_field_name - , 'timestamp_op': 'gt' - , 'timestamp_value': op_data['0']['index_info'][active_index][AggQueryType.MAX.value][timestamp_field_name]} # Query documents with timestamps subsequent to the 'create' command which made into the original (now 'flush') index. - modified_index_uuids=esmanager.get_document_uuids_by_timestamps(index_name=flush_index - , timestamp_data_list=timestamp_range_json_list ) + try: + modified_index_uuids=esmanager.get_document_uuids_by_timestamps(index_name=flush_index + , timestamp_data_list=timestamp_range_json_list + , expected_max_hits=MAX_EXPECTED_CATCH_UP_UUID_COUNT) + except TooMuchToCatchUpException as tmtcu: + msg=f"catch-up command failed due to: {tmtcu.message}" + logger.critical(msg) + sys.exit(f"{msg} See logs.") logger.debug(f"For flush_index={flush_index} the touched UUIDs list is {modified_index_uuids}") catch_up_uuids |= set(modified_index_uuids)