Skip to content

Commit

Permalink
Introduce 'size' field to QDSL query so all UUIDs to be re-indexed ar…
Browse files Browse the repository at this point in the history
…e returned during the 'catch-up' command.
  • Loading branch information
kburke committed Oct 7, 2024
1 parent c41b666 commit 65383ff
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 12 deletions.
6 changes: 6 additions & 0 deletions scripts/fresh_indices/TooMuchToCatchUpException.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
class TooMuchToCatchUpException(Exception):
"""Exception raised when the number of UUIDs to catch-up exceeds the configured value."""

def __init__(self, message="The number of UUIDs to catch-up exceeds the configured maximum."):
self.message = message
super().__init__(self.message)
20 changes: 19 additions & 1 deletion scripts/fresh_indices/es_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
from IndexBlockType import IndexBlockType
from AggQueryType import AggQueryType
from TooMuchToCatchUpException import TooMuchToCatchUpException

# Set logging format and level (default is warning)
# All the API logging is forwarded to the uWSGI server and gets written into the log file `uwsgo-entity-api.log`
Expand Down Expand Up @@ -79,7 +80,7 @@ def get_index_document_count(self, index_name) -> int:
# Log the full stack trace, prepend a line with our message
logger.exception(msg)

def get_document_uuids_by_timestamps(self, index_name:str, timestamp_data_list:list ) -> list:
def get_document_uuids_by_timestamps(self, index_name:str, timestamp_data_list:list, expected_max_hits:int ) -> list:
replacement_str=','.join(timestamp_data_list)
query_json='''{
"query": {
Expand All @@ -90,9 +91,12 @@ def get_document_uuids_by_timestamps(self, index_name:str, timestamp_data_list:l
"minimum_should_match": 1
}
},
"size": MAX_HITS_GO_HERE,
"_source": false
}'''
query_json=query_json.replace('TIME_STAMP_RANGES_GO_HERE', replacement_str)
query_json = query_json.replace('MAX_HITS_GO_HERE', str(expected_max_hits))
logger.debug(f"request query payload is query_json={query_json}")
headers = {'Content-Type': 'application/json'}
try:
rspn = requests.post(f"{self.elasticsearch_url}/{index_name}/_search"
Expand All @@ -102,6 +106,18 @@ def get_document_uuids_by_timestamps(self, index_name:str, timestamp_data_list:l
post_create_revised_uuids = []
rspn_json=rspn.json()

if 'hits' in rspn_json and \
'total' in rspn_json['hits'] and \
'value' in rspn_json['hits']['total']:
entity_count_to_catchup=int(rspn_json['hits']['total']['value'])
if entity_count_to_catchup > expected_max_hits:
logger.critical(f"This script is designed to work with {expected_max_hits} or fewer"
f" UUIDs to 'catch-up'. Found {entity_count_to_catchup}, so"
f" data would be lost.")
logger.critical(f"You can adjust the MAX_EXPECTED_CATCH_UP_UUID_COUNT parameter of"
f" fresh_indices.ini up to the ElasticSearch product limit.")
raise TooMuchToCatchUpException(f"Cannot catch-up {entity_count_to_catchup} UUIDs."
f" Maximum expected is {expected_max_hits}.")
if 'hits' in rspn_json and 'hits' in rspn_json['hits']:
for hit in rspn_json['hits']['hits']:
post_create_revised_uuids.append(hit['_id'])
Expand All @@ -112,6 +128,8 @@ def get_document_uuids_by_timestamps(self, index_name:str, timestamp_data_list:l
else:
logger.error(f"Search of {index_name} for post-create revised documents failed:")
logger.error(f"Error Message: {rspn.text}")
except TooMuchToCatchUpException as tmtcue:
raise tmtcue
except Exception as e:
msg = f"Exception encountered executing ESManager.get_document_uuids_by_timestamps()" \
f" with query_json '{query_json}':"
Expand Down
1 change: 1 addition & 0 deletions scripts/fresh_indices/fresh_indices.ini.example
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
[LocalServerSettings]
EXEC_INFO_DIR=./exec_info
MAX_EXPECTED_CATCH_UP_UUID_COUNT=20

[FullReindexSettings]
; from FillStrategyType.py FillStrategyType enum
Expand Down
29 changes: 18 additions & 11 deletions scripts/fresh_indices/fresh_indices.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@
from AggQueryType import AggQueryType
from IndexBlockType import IndexBlockType
from es_manager import ESManager
from TooMuchToCatchUpException import TooMuchToCatchUpException

def init():
global logger
global fill_strategy
global appcfg
global EXEC_INFO_DIR
global MAX_EXPECTED_CATCH_UP_UUID_COUNT

#
# Read configuration from the INI file and set global constants
Expand All @@ -39,6 +41,8 @@ def init():
config_file_name = 'fresh_indices.ini'
Config.read(config_file_name)
EXEC_INFO_DIR = Config.get('LocalServerSettings', 'EXEC_INFO_DIR')
str_MAX_EXPECTED_CATCH_UP_UUID_COUNT = Config.get('LocalServerSettings', 'MAX_EXPECTED_CATCH_UP_UUID_COUNT')
MAX_EXPECTED_CATCH_UP_UUID_COUNT=int(str_MAX_EXPECTED_CATCH_UP_UUID_COUNT)
FILL_STRATEGY_ENUM = Config.get('FullReindexSettings', 'FILL_STRATEGY_ENUM')
except Exception as e:
logger.error(f"Reading {config_file_name}, got error'{str(e)}'.")
Expand Down Expand Up @@ -437,9 +441,6 @@ def catch_up_new_index(es_mgr:ESManager,op_data_key:int)->None:
f" timestamp_field_name={timestamp_field_name},"
f" looking for documents with timestamp_value greater than {timestamp_value}")
timestamp_range_json_list.append(f'{{"range": {{"{timestamp_field_name}": {{"gt": {timestamp_value}}}}}}}')
timestamp_data = { 'timestamp_field': timestamp_field_name
, 'timestamp_op': 'gt'
, 'timestamp_value': op_data[previous_op_data_seq]['index_info'][source_index][AggQueryType.MAX.value][timestamp_field_name]}
# Query documents with timestamps subsequent to the last command.
modified_index_uuids=esmanager.get_document_uuids_by_timestamps(index_name=source_index
, timestamp_data_list=timestamp_range_json_list )
Expand Down Expand Up @@ -502,6 +503,7 @@ def catch_up_new_index(es_mgr:ESManager,op_data_key:int)->None:
def catch_up_live_index(es_mgr:ESManager)->None:
global op_data
global op_data_supplement
global MAX_EXPECTED_CATCH_UP_UUID_COUNT

# Need to re-identify the indices which are now live for the service. Rather than extract
# op_data['0']['index_info'].keys() and parse each key to attempt to match an entry in
Expand Down Expand Up @@ -538,16 +540,21 @@ def catch_up_live_index(es_mgr:ESManager)->None:
timestamp_range_json_list=[]
for timestamp_field_name in op_data['0']['index_info'][active_index][AggQueryType.MAX.value].keys():
timestamp_value=op_data['0']['index_info'][active_index][AggQueryType.MAX.value][timestamp_field_name]
logger.debug(f"For active_index={active_index},"
f" timestamp_field_name={timestamp_field_name},"
f" looking for documents with timestamp_value greater than {timestamp_value}")
logger.debug(f"Looking for documents in {flush_index}"
f" with timestamp_value greater than {timestamp_value},"
f" Captured from timestamp_field_name={timestamp_field_name}"
f" of active_index={active_index}"
f" during the last operation.")
timestamp_range_json_list.append(f'{{"range": {{"{timestamp_field_name}": {{"gt": {timestamp_value}}}}}}}')
timestamp_data = { 'timestamp_field': timestamp_field_name
, 'timestamp_op': 'gt'
, 'timestamp_value': op_data['0']['index_info'][active_index][AggQueryType.MAX.value][timestamp_field_name]}
# Query documents with timestamps subsequent to the 'create' command which made into the original (now 'flush') index.
modified_index_uuids=esmanager.get_document_uuids_by_timestamps(index_name=flush_index
, timestamp_data_list=timestamp_range_json_list )
try:
modified_index_uuids=esmanager.get_document_uuids_by_timestamps(index_name=flush_index
, timestamp_data_list=timestamp_range_json_list
, expected_max_hits=MAX_EXPECTED_CATCH_UP_UUID_COUNT)
except TooMuchToCatchUpException as tmtcu:
msg=f"catch-up command failed due to: {tmtcu.message}"
logger.critical(msg)
sys.exit(f"{msg} See logs.")
logger.debug(f"For flush_index={flush_index} the touched UUIDs list is {modified_index_uuids}")
catch_up_uuids |= set(modified_index_uuids)

Expand Down

0 comments on commit 65383ff

Please sign in to comment.