Skip to content

Commit

Permalink
feat: index historical votes (#38)
Browse files Browse the repository at this point in the history
  • Loading branch information
wgwz authored Sep 12, 2023
1 parent 8d3635a commit 927ddb3
Show file tree
Hide file tree
Showing 7 changed files with 233 additions and 2 deletions.
1 change: 1 addition & 0 deletions .env-example
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ REGEN_RPC=
REGEN_API=
# START_BLOCK_OVERRIDE=
# ONLY_INDEX_SPECIFIC_BLOCKS=
# REGEN_IS_ARCHIVE_NODE=false
8 changes: 6 additions & 2 deletions index_class_issuers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,19 @@
import os
import textwrap
import requests
from utils import PollingProcess, events_to_process
from utils import is_archive_node, PollingProcess, events_to_process

logger = logging.getLogger(__name__)


def fetch_class_issuers(height, class_id):
if is_archive_node():
headers = {"x-cosmos-block-height": str(height)}
else:
headers = None
resp = requests.get(
f"{os.environ['REGEN_API']}/regen/ecocredit/v1/classes/{class_id}/issuers",
headers={"x-cosmos-block-height": str(height)},
headers=headers,
)
resp.raise_for_status()
return resp.json()["issuers"]
Expand Down
151 changes: 151 additions & 0 deletions index_votes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import logging
import os
import textwrap
from psycopg2.errors import ForeignKeyViolation
import requests
from utils import (
is_archive_node,
PollingProcess,
events_to_process,
new_events_to_process,
)

logger = logging.getLogger(__name__)


def fetch_votes_by_proposal(height, proposal_id):
if is_archive_node():
headers = {"x-cosmos-block-height": str(height)}
else:
headers = None
# Currently EventVote only contains proposal id
# Eventually EventVote may contain proposal id and voter address
# At which point we could get the vote with this endpoint:
# /cosmos/group/v1/vote_by_proposal_voter/{proposal_id}/{voter}
# Ref: https://github.com/regen-network/indexer/pull/38#discussion_r1310958235
resp = requests.get(
f"{os.environ['REGEN_API']}/cosmos/group/v1/votes_by_proposal/{proposal_id}",
headers=headers,
)
resp.raise_for_status()
return resp.json()["votes"]


def gen_records(cur, query):
cur.execute(query)
for record in cur:
yield record


def _index_votes(pg_conn, _client, _chain_num):
with pg_conn.cursor() as cur:
all_chain_nums = [
record[0] for record in gen_records(cur, "select num from chain;")
]
max_block_heights = {
chain_num: max_block_height
for chain_num, max_block_height in gen_records(
cur,
"select chain_num, MAX(block_height) from votes group by chain_num;",
)
}
logger.debug(f"{all_chain_nums=}")
logger.debug(f"{max_block_heights=}")
for chain_num in all_chain_nums:
if chain_num not in max_block_heights.keys():
max_block_heights[chain_num] = 0
logger.debug(f"{max_block_heights=}")
for chain_num, max_block_height in max_block_heights.items():
logger.debug(f"{chain_num=} {max_block_height=}")
for event in new_events_to_process(
cur, "votes", chain_num, max_block_height
):
(
type,
block_height,
tx_idx,
msg_idx,
_,
_,
chain_num,
timestamp,
tx_hash,
) = event[0]
normalize = {}
normalize["type"] = type
normalize["block_height"] = block_height
normalize["tx_idx"] = tx_idx
normalize["msg_idx"] = msg_idx
normalize["chain_num"] = chain_num
normalize["timestamp"] = timestamp
normalize["tx_hash"] = tx_hash
for entry in event:
(_, _, _, _, key, value, _, _, _) = entry
value = value.strip('"')
normalize[key] = value
logger.debug(normalize)
votes = fetch_votes_by_proposal(
normalize["block_height"], normalize["proposal_id"]
)
logger.debug(f"{votes=}")
insert_text = textwrap.dedent(
"""
INSERT INTO votes (
type,
block_height,
tx_idx,
msg_idx,
chain_num,
timestamp,
tx_hash,
proposal_id,
voter,
option,
metadata,
submit_time
) VALUES (
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s
) ON CONFLICT DO NOTHING;"""
).strip("\n")
with pg_conn.cursor() as _cur:
for vote in votes:
row = (
normalize["type"],
normalize["block_height"],
normalize["tx_idx"],
normalize["msg_idx"],
normalize["chain_num"],
normalize["timestamp"],
normalize["tx_hash"],
normalize["proposal_id"],
vote["voter"],
vote["option"],
vote["metadata"],
vote["submit_time"],
)
_cur.execute(
insert_text,
row,
)
logger.debug(_cur.statusmessage)
pg_conn.commit()
logger.info("vote inserted..")


def index_votes():
p = PollingProcess(
target=_index_votes,
sleep_secs=1,
)
p.start()
2 changes: 2 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from index_retires import index_retires
from index_proposals import index_proposals
from index_class_issuers import index_class_issuers
from index_votes import index_votes

load_dotenv()

Expand Down Expand Up @@ -36,3 +37,4 @@
index_retires()
index_proposals()
index_class_issuers()
index_votes()
24 changes: 24 additions & 0 deletions sql/V1_12__votes.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
CREATE TABLE
IF NOT EXISTS votes (
type TEXT NOT NULL,
block_height BIGINT NOT NULL,
tx_idx SMALLINT NOT NULL,
msg_idx SMALLINT NOT NULL,
chain_num SMALLINT NOT NULL,
timestamp timestamptz,
tx_hash TEXT NOT NULL,
proposal_id BIGINT NOT NULL,
voter TEXT NOT NULL,
option TEXT NOT NULL,
metadata TEXT NOT NULL,
submit_time timestamptz NOT NULL,
PRIMARY KEY (chain_num, block_height, tx_idx, msg_idx),
FOREIGN KEY (chain_num, block_height, tx_idx, msg_idx, type) REFERENCES msg_event
);

CREATE INDEX IF NOT EXISTS votes_proposal_id_chain_num_idx ON votes (proposal_id, chain_num);

ALTER TABLE IF EXISTS votes
DROP CONSTRAINT votes_chain_num_proposal_id_voter_ux;

ALTER TABLE IF EXISTS votes ADD CONSTRAINT votes_chain_num_proposal_id_voter_ux UNIQUE (chain_num, proposal_id, voter);
1 change: 1 addition & 0 deletions sql/run_all_migrations.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ psql -c "\i V1_8__index_proposal_id.sql" $DATABASE_URL
psql -c "\i V1_9__all_ecocredit_txes.sql" $DATABASE_URL
psql -c "\i V1_10__class_issuers.sql" $DATABASE_URL
psql -c "\i V1_11__class_issuers_indexes.sql" $DATABASE_URL
psql -c "\i V1_12__votes.sql" $DATABASE_URL
48 changes: 48 additions & 0 deletions utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ def run(self):
"regen.ecocredit.v1.EventCreateClass",
"regen.ecocredit.v1.EventUpdateClassIssuers",
],
"votes": ["cosmos.group.v1.EventVote"],
}


Expand Down Expand Up @@ -145,3 +146,50 @@ def events_to_process(cur, index_table_name):
# this is how key and value are put into their own column
for _, g in groupby(cur, lambda x: f"{x[1]}-{x[2]}-{x[3]}"):
yield list(g)


def new_events_to_process(cur, index_table_name, chain_num, max_block_height):
event_names = TABLE_EVENT_NAMES_MAP[index_table_name]
formatted_event_names = [f"'{x}'" for x in event_names]
formatted_event_names_set = f"({','.join(formatted_event_names)})"
sql = textwrap.dedent(
f"""
SELECT mea.type,
mea.block_height,
mea.tx_idx,
mea.msg_idx,
mea.key,
mea.value,
mea.chain_num,
TRIM(BOTH '"' FROM (tx.data -> 'tx_response' -> 'timestamp')::text) AS timestamp,
encode(tx.hash, 'hex') as tx_hash
FROM msg_event_attr AS mea
NATURAL LEFT JOIN {index_table_name} AS e
NATURAL LEFT JOIN tx
WHERE mea.type IN {formatted_event_names_set}
AND (e.block_height IS NULL
AND e.type IS NULL
AND e.tx_idx IS NULL
AND e.msg_idx IS NULL)
AND mea.chain_num = {chain_num}
AND mea.block_height > {max_block_height}
ORDER BY block_height ASC,
KEY ASC;
"""
)
cur.execute(sql)

# group together results from the query above
# the group by done based on the block_height, tx_idx, and msg_idx
# this is how key and value are put into their own column
for _, g in groupby(cur, lambda x: f"{x[1]}-{x[2]}-{x[3]}"):
yield list(g)


def is_archive_node():
# since the indexer is intended to run against archive nodes,
# assume that by default the node is an archive node.
value = os.environ.get("REGEN_IS_ARCHIVE_NODE", "true").lower()
if value not in ["true", "false"]:
raise ValueError("REGEN_IS_ARCHIVE_NODE must be true or false")
return value == "true"

0 comments on commit 927ddb3

Please sign in to comment.