From 927ddb3ef32082b419eb01a5acf6d18a882411fd Mon Sep 17 00:00:00 2001 From: Kyle Lawlor-Bagcal Date: Tue, 12 Sep 2023 16:59:35 -0400 Subject: [PATCH] feat: index historical votes (#38) --- .env-example | 1 + index_class_issuers.py | 8 +- index_votes.py | 151 ++++++++++++++++++++++++++++++++++++++ main.py | 2 + sql/V1_12__votes.sql | 24 ++++++ sql/run_all_migrations.sh | 1 + utils.py | 48 ++++++++++++ 7 files changed, 233 insertions(+), 2 deletions(-) create mode 100644 index_votes.py create mode 100644 sql/V1_12__votes.sql diff --git a/.env-example b/.env-example index 0270fd6..8ebbc8f 100644 --- a/.env-example +++ b/.env-example @@ -3,3 +3,4 @@ REGEN_RPC= REGEN_API= # START_BLOCK_OVERRIDE= # ONLY_INDEX_SPECIFIC_BLOCKS= +# REGEN_IS_ARCHIVE_NODE=false diff --git a/index_class_issuers.py b/index_class_issuers.py index 2c635cd..97b9e67 100644 --- a/index_class_issuers.py +++ b/index_class_issuers.py @@ -2,15 +2,19 @@ import os import textwrap import requests -from utils import PollingProcess, events_to_process +from utils import is_archive_node, PollingProcess, events_to_process logger = logging.getLogger(__name__) def fetch_class_issuers(height, class_id): + if is_archive_node(): + headers = {"x-cosmos-block-height": str(height)} + else: + headers = None resp = requests.get( f"{os.environ['REGEN_API']}/regen/ecocredit/v1/classes/{class_id}/issuers", - headers={"x-cosmos-block-height": str(height)}, + headers=headers, ) resp.raise_for_status() return resp.json()["issuers"] diff --git a/index_votes.py b/index_votes.py new file mode 100644 index 0000000..3cff202 --- /dev/null +++ b/index_votes.py @@ -0,0 +1,151 @@ +import logging +import os +import textwrap +from psycopg2.errors import ForeignKeyViolation +import requests +from utils import ( + is_archive_node, + PollingProcess, + events_to_process, + new_events_to_process, +) + +logger = logging.getLogger(__name__) + + +def fetch_votes_by_proposal(height, proposal_id): + if is_archive_node(): + headers = {"x-cosmos-block-height": str(height)} + else: + headers = None + # Currently EventVote only contains proposal id + # Eventually EventVote may contain proposal id and voter address + # At which point we could get the vote with this endpoint: + # /cosmos/group/v1/vote_by_proposal_voter/{proposal_id}/{voter} + # Ref: https://github.com/regen-network/indexer/pull/38#discussion_r1310958235 + resp = requests.get( + f"{os.environ['REGEN_API']}/cosmos/group/v1/votes_by_proposal/{proposal_id}", + headers=headers, + ) + resp.raise_for_status() + return resp.json()["votes"] + + +def gen_records(cur, query): + cur.execute(query) + for record in cur: + yield record + + +def _index_votes(pg_conn, _client, _chain_num): + with pg_conn.cursor() as cur: + all_chain_nums = [ + record[0] for record in gen_records(cur, "select num from chain;") + ] + max_block_heights = { + chain_num: max_block_height + for chain_num, max_block_height in gen_records( + cur, + "select chain_num, MAX(block_height) from votes group by chain_num;", + ) + } + logger.debug(f"{all_chain_nums=}") + logger.debug(f"{max_block_heights=}") + for chain_num in all_chain_nums: + if chain_num not in max_block_heights.keys(): + max_block_heights[chain_num] = 0 + logger.debug(f"{max_block_heights=}") + for chain_num, max_block_height in max_block_heights.items(): + logger.debug(f"{chain_num=} {max_block_height=}") + for event in new_events_to_process( + cur, "votes", chain_num, max_block_height + ): + ( + type, + block_height, + tx_idx, + msg_idx, + _, + _, + chain_num, + timestamp, + tx_hash, + ) = event[0] + normalize = {} + normalize["type"] = type + normalize["block_height"] = block_height + normalize["tx_idx"] = tx_idx + normalize["msg_idx"] = msg_idx + normalize["chain_num"] = chain_num + normalize["timestamp"] = timestamp + normalize["tx_hash"] = tx_hash + for entry in event: + (_, _, _, _, key, value, _, _, _) = entry + value = value.strip('"') + normalize[key] = value + logger.debug(normalize) + votes = fetch_votes_by_proposal( + normalize["block_height"], normalize["proposal_id"] + ) + logger.debug(f"{votes=}") + insert_text = textwrap.dedent( + """ + INSERT INTO votes ( + type, + block_height, + tx_idx, + msg_idx, + chain_num, + timestamp, + tx_hash, + proposal_id, + voter, + option, + metadata, + submit_time + ) VALUES ( + %s, + %s, + %s, + %s, + %s, + %s, + %s, + %s, + %s, + %s, + %s, + %s + ) ON CONFLICT DO NOTHING;""" + ).strip("\n") + with pg_conn.cursor() as _cur: + for vote in votes: + row = ( + normalize["type"], + normalize["block_height"], + normalize["tx_idx"], + normalize["msg_idx"], + normalize["chain_num"], + normalize["timestamp"], + normalize["tx_hash"], + normalize["proposal_id"], + vote["voter"], + vote["option"], + vote["metadata"], + vote["submit_time"], + ) + _cur.execute( + insert_text, + row, + ) + logger.debug(_cur.statusmessage) + pg_conn.commit() + logger.info("vote inserted..") + + +def index_votes(): + p = PollingProcess( + target=_index_votes, + sleep_secs=1, + ) + p.start() diff --git a/main.py b/main.py index f8af0d1..b0637ec 100644 --- a/main.py +++ b/main.py @@ -7,6 +7,7 @@ from index_retires import index_retires from index_proposals import index_proposals from index_class_issuers import index_class_issuers +from index_votes import index_votes load_dotenv() @@ -36,3 +37,4 @@ index_retires() index_proposals() index_class_issuers() + index_votes() diff --git a/sql/V1_12__votes.sql b/sql/V1_12__votes.sql new file mode 100644 index 0000000..72ea4ba --- /dev/null +++ b/sql/V1_12__votes.sql @@ -0,0 +1,24 @@ +CREATE TABLE + IF NOT EXISTS votes ( + type TEXT NOT NULL, + block_height BIGINT NOT NULL, + tx_idx SMALLINT NOT NULL, + msg_idx SMALLINT NOT NULL, + chain_num SMALLINT NOT NULL, + timestamp timestamptz, + tx_hash TEXT NOT NULL, + proposal_id BIGINT NOT NULL, + voter TEXT NOT NULL, + option TEXT NOT NULL, + metadata TEXT NOT NULL, + submit_time timestamptz NOT NULL, + PRIMARY KEY (chain_num, block_height, tx_idx, msg_idx), + FOREIGN KEY (chain_num, block_height, tx_idx, msg_idx, type) REFERENCES msg_event + ); + +CREATE INDEX IF NOT EXISTS votes_proposal_id_chain_num_idx ON votes (proposal_id, chain_num); + +ALTER TABLE IF EXISTS votes +DROP CONSTRAINT votes_chain_num_proposal_id_voter_ux; + +ALTER TABLE IF EXISTS votes ADD CONSTRAINT votes_chain_num_proposal_id_voter_ux UNIQUE (chain_num, proposal_id, voter); diff --git a/sql/run_all_migrations.sh b/sql/run_all_migrations.sh index c0665ca..ab9d2a5 100755 --- a/sql/run_all_migrations.sh +++ b/sql/run_all_migrations.sh @@ -15,3 +15,4 @@ psql -c "\i V1_8__index_proposal_id.sql" $DATABASE_URL psql -c "\i V1_9__all_ecocredit_txes.sql" $DATABASE_URL psql -c "\i V1_10__class_issuers.sql" $DATABASE_URL psql -c "\i V1_11__class_issuers_indexes.sql" $DATABASE_URL +psql -c "\i V1_12__votes.sql" $DATABASE_URL diff --git a/utils.py b/utils.py index 35009df..781b29f 100644 --- a/utils.py +++ b/utils.py @@ -108,6 +108,7 @@ def run(self): "regen.ecocredit.v1.EventCreateClass", "regen.ecocredit.v1.EventUpdateClassIssuers", ], + "votes": ["cosmos.group.v1.EventVote"], } @@ -145,3 +146,50 @@ def events_to_process(cur, index_table_name): # this is how key and value are put into their own column for _, g in groupby(cur, lambda x: f"{x[1]}-{x[2]}-{x[3]}"): yield list(g) + + +def new_events_to_process(cur, index_table_name, chain_num, max_block_height): + event_names = TABLE_EVENT_NAMES_MAP[index_table_name] + formatted_event_names = [f"'{x}'" for x in event_names] + formatted_event_names_set = f"({','.join(formatted_event_names)})" + sql = textwrap.dedent( + f""" + SELECT mea.type, + mea.block_height, + mea.tx_idx, + mea.msg_idx, + mea.key, + mea.value, + mea.chain_num, + TRIM(BOTH '"' FROM (tx.data -> 'tx_response' -> 'timestamp')::text) AS timestamp, + encode(tx.hash, 'hex') as tx_hash + FROM msg_event_attr AS mea + NATURAL LEFT JOIN {index_table_name} AS e + NATURAL LEFT JOIN tx + WHERE mea.type IN {formatted_event_names_set} + AND (e.block_height IS NULL + AND e.type IS NULL + AND e.tx_idx IS NULL + AND e.msg_idx IS NULL) + AND mea.chain_num = {chain_num} + AND mea.block_height > {max_block_height} + ORDER BY block_height ASC, + KEY ASC; + """ + ) + cur.execute(sql) + + # group together results from the query above + # the group by done based on the block_height, tx_idx, and msg_idx + # this is how key and value are put into their own column + for _, g in groupby(cur, lambda x: f"{x[1]}-{x[2]}-{x[3]}"): + yield list(g) + + +def is_archive_node(): + # since the indexer is intended to run against archive nodes, + # assume that by default the node is an archive node. + value = os.environ.get("REGEN_IS_ARCHIVE_NODE", "true").lower() + if value not in ["true", "false"]: + raise ValueError("REGEN_IS_ARCHIVE_NODE must be true or false") + return value == "true"