From e890c797ec5378d479d8c192cf8c6f7688521d2c Mon Sep 17 00:00:00 2001 From: Paul Weidner Date: Tue, 13 Feb 2024 07:32:01 -0800 Subject: [PATCH] chore: only index votes for specified chain_num --- index_votes.py | 190 +++++++++++++++++++++++-------------------------- 1 file changed, 89 insertions(+), 101 deletions(-) diff --git a/index_votes.py b/index_votes.py index 3cff202..c5fe4d2 100644 --- a/index_votes.py +++ b/index_votes.py @@ -39,108 +39,96 @@ def gen_records(cur, query): def _index_votes(pg_conn, _client, _chain_num): with pg_conn.cursor() as cur: - all_chain_nums = [ - record[0] for record in gen_records(cur, "select num from chain;") - ] - max_block_heights = { - chain_num: max_block_height - for chain_num, max_block_height in gen_records( - cur, - "select chain_num, MAX(block_height) from votes group by chain_num;", + cur.execute( + "SELECT MAX(block_height) FROM votes WHERE chain_num = %s", (_chain_num,) + ) + res = cur.fetchone() + max_block_height = 0 if res[0] is None else res[0] + logger.debug(f"{_chain_num=} {max_block_height=}") + for event in new_events_to_process( + cur, "votes", _chain_num, max_block_height + ): + ( + type, + block_height, + tx_idx, + msg_idx, + _, + _, + chain_num, + timestamp, + tx_hash, + ) = event[0] + normalize = {} + normalize["type"] = type + normalize["block_height"] = block_height + normalize["tx_idx"] = tx_idx + normalize["msg_idx"] = msg_idx + normalize["chain_num"] = chain_num + normalize["timestamp"] = timestamp + normalize["tx_hash"] = tx_hash + for entry in event: + (_, _, _, _, key, value, _, _, _) = entry + value = value.strip('"') + normalize[key] = value + logger.debug(normalize) + votes = fetch_votes_by_proposal( + normalize["block_height"], normalize["proposal_id"] ) - } - logger.debug(f"{all_chain_nums=}") - logger.debug(f"{max_block_heights=}") - for chain_num in all_chain_nums: - if chain_num not in max_block_heights.keys(): - max_block_heights[chain_num] = 0 - logger.debug(f"{max_block_heights=}") - for chain_num, max_block_height in max_block_heights.items(): - logger.debug(f"{chain_num=} {max_block_height=}") - for event in new_events_to_process( - cur, "votes", chain_num, max_block_height - ): - ( - type, - block_height, - tx_idx, - msg_idx, - _, - _, - chain_num, - timestamp, - tx_hash, - ) = event[0] - normalize = {} - normalize["type"] = type - normalize["block_height"] = block_height - normalize["tx_idx"] = tx_idx - normalize["msg_idx"] = msg_idx - normalize["chain_num"] = chain_num - normalize["timestamp"] = timestamp - normalize["tx_hash"] = tx_hash - for entry in event: - (_, _, _, _, key, value, _, _, _) = entry - value = value.strip('"') - normalize[key] = value - logger.debug(normalize) - votes = fetch_votes_by_proposal( - normalize["block_height"], normalize["proposal_id"] - ) - logger.debug(f"{votes=}") - insert_text = textwrap.dedent( - """ - INSERT INTO votes ( - type, - block_height, - tx_idx, - msg_idx, - chain_num, - timestamp, - tx_hash, - proposal_id, - voter, - option, - metadata, - submit_time - ) VALUES ( - %s, - %s, - %s, - %s, - %s, - %s, - %s, - %s, - %s, - %s, - %s, - %s - ) ON CONFLICT DO NOTHING;""" - ).strip("\n") - with pg_conn.cursor() as _cur: - for vote in votes: - row = ( - normalize["type"], - normalize["block_height"], - normalize["tx_idx"], - normalize["msg_idx"], - normalize["chain_num"], - normalize["timestamp"], - normalize["tx_hash"], - normalize["proposal_id"], - vote["voter"], - vote["option"], - vote["metadata"], - vote["submit_time"], - ) - _cur.execute( - insert_text, - row, - ) - logger.debug(_cur.statusmessage) - pg_conn.commit() - logger.info("vote inserted..") + logger.debug(f"{votes=}") + insert_text = textwrap.dedent( + """ + INSERT INTO votes ( + type, + block_height, + tx_idx, + msg_idx, + chain_num, + timestamp, + tx_hash, + proposal_id, + voter, + option, + metadata, + submit_time + ) VALUES ( + %s, + %s, + %s, + %s, + %s, + %s, + %s, + %s, + %s, + %s, + %s, + %s + ) ON CONFLICT DO NOTHING;""" + ).strip("\n") + with pg_conn.cursor() as _cur: + for vote in votes: + row = ( + normalize["type"], + normalize["block_height"], + normalize["tx_idx"], + normalize["msg_idx"], + normalize["chain_num"], + normalize["timestamp"], + normalize["tx_hash"], + normalize["proposal_id"], + vote["voter"], + vote["option"], + vote["metadata"], + vote["submit_time"], + ) + _cur.execute( + insert_text, + row, + ) + logger.debug(_cur.statusmessage) + pg_conn.commit() + logger.info("vote inserted..") def index_votes():