Skip to content

Commit

Permalink
adjust for schema updates
Browse files Browse the repository at this point in the history
  • Loading branch information
i-norden committed Nov 26, 2021
1 parent ac6ef33 commit 6c285d6
Show file tree
Hide file tree
Showing 15 changed files with 103 additions and 86 deletions.
15 changes: 8 additions & 7 deletions statediff/indexer/database/dump/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,6 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode node.Node, reward, td *big.Int) (string, error) {
tx.cacheIPLD(headerNode)

var baseFee *string
if header.BaseFee != nil {
baseFee = new(string)
*baseFee = header.BaseFee.String()
}

headerID := header.Hash().String()
mod := models.HeaderModel{
CID: headerNode.Cid().String(),
Expand All @@ -205,7 +199,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he
TxRoot: header.TxHash.String(),
UncleRoot: header.UncleHash.String(),
Timestamp: header.Time,
BaseFee: baseFee,
Coinbase: header.Coinbase.String(),
}
_, err := fmt.Fprintf(sdi.dump, "%+v\r\n", mod)
return headerID, err
Expand Down Expand Up @@ -268,6 +262,12 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
// index tx
trx := args.txs[i]
trxID := trx.Hash().String()

var val string
if trx.Value() != nil {
val = trx.Value().String()
}

// derive sender for the tx that corresponds with this receipt
from, err := types.Sender(signer, trx)
if err != nil {
Expand All @@ -283,6 +283,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
CID: txNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(txNode.Cid()),
Type: trx.Type(),
Value: val,
}
if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", txModel); err != nil {
return err
Expand Down
9 changes: 8 additions & 1 deletion statediff/indexer/database/file/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode node
TxRoot: header.TxHash.String(),
UncleRoot: header.UncleHash.String(),
Timestamp: header.Time,
BaseFee: baseFee,
Coinbase: header.Coinbase.String(),
})
return headerID
}
Expand Down Expand Up @@ -269,6 +269,12 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
// index tx
trx := args.txs[i]
txID := trx.Hash().String()

var val string
if trx.Value() != nil {
val = trx.Value().String()
}

// derive sender for the tx that corresponds with this receipt
from, err := types.Sender(signer, trx)
if err != nil {
Expand All @@ -284,6 +290,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
CID: txNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(txNode.Cid()),
Type: trx.Type(),
Value: val,
}
sdi.fileWriter.upsertTransactionCID(txModel)

Expand Down
8 changes: 4 additions & 4 deletions statediff/indexer/database/file/indexer_legacy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,16 @@ func TestFileIndexerLegacy(t *testing.T) {
setupLegacy(t)
dumpData(t)
defer tearDown(t)
pgStr := `SELECT cid, td, reward, block_hash, base_fee
pgStr := `SELECT cid, td, reward, block_hash, coinbase
FROM eth.header_cids
WHERE block_number = $1`
// check header was properly indexed
type res struct {
CID string
TD string
Reward string
BlockHash string `db:"block_hash"`
BaseFee *string `db:"base_fee"`
BlockHash string `db:"block_hash"`
Coinbase string `db:"coinbase"`
}
header := new(res)
err = sqlxdb.QueryRowx(pgStr, legacyData.BlockNumber.Uint64()).StructScan(header)
Expand All @@ -126,7 +126,7 @@ func TestFileIndexerLegacy(t *testing.T) {
test_helpers.ExpectEqual(t, header.CID, legacyHeaderCID.String())
test_helpers.ExpectEqual(t, header.TD, legacyData.MockBlock.Difficulty().String())
test_helpers.ExpectEqual(t, header.Reward, "5000000000000011250")
test_helpers.ExpectEqual(t, header.Coinbase, legacyData.MockBlock.Coinbase().String())
require.Nil(t, legacyData.MockHeader.BaseFee)
require.Nil(t, header.BaseFee)
})
}
8 changes: 4 additions & 4 deletions statediff/indexer/database/file/indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,16 +177,16 @@ func TestFileIndexer(t *testing.T) {
setup(t)
dumpData(t)
defer tearDown(t)
pgStr := `SELECT cid, td, reward, block_hash, base_fee
pgStr := `SELECT cid, td, reward, block_hash, coinbase
FROM eth.header_cids
WHERE block_number = $1`
// check header was properly indexed
type res struct {
CID string
TD string
Reward string
BlockHash string `db:"block_hash"`
BaseFee *string `db:"base_fee"`
BlockHash string `db:"block_hash"`
Coinbase string `db:"coinbase"`
}
header := new(res)
err = sqlxdb.QueryRowx(pgStr, mocks.BlockNumber.Uint64()).StructScan(header)
Expand All @@ -197,7 +197,7 @@ func TestFileIndexer(t *testing.T) {
test_helpers.ExpectEqual(t, header.CID, headerCID.String())
test_helpers.ExpectEqual(t, header.TD, mocks.MockBlock.Difficulty().String())
test_helpers.ExpectEqual(t, header.Reward, "2000000000000021250")
test_helpers.ExpectEqual(t, *header.BaseFee, mocks.MockHeader.BaseFee.String())
test_helpers.ExpectEqual(t, header.Coinbase, mocks.MockHeader.Coinbase.String())
dc, err := cid.Decode(header.CID)
if err != nil {
t.Fatal(err)
Expand Down
40 changes: 17 additions & 23 deletions statediff/indexer/database/file/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,18 +124,14 @@ const (
ipldInsert = "INSERT INTO public.blocks (key, data) VALUES ('%s', '\\x%x');\n"

headerInsert = "INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, " +
"state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) VALUES " +
"('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '\\x%x', %d, '%s', %d, %s);\n"

headerInsertWithoutBaseFee = "INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, " +
"reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) VALUES " +
"('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '\\x%x', %d, '%s', %d, NULL);\n"
"state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) VALUES " +
"('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '\\x%x', %d, '%s', %d, '%s');\n"

uncleInsert = "INSERT INTO eth.uncle_cids (block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES " +
"('%s', '%s', '%s', '%s', '%s', '%s');\n"

txInsert = "INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type) " +
"VALUES ('%s', '%s', '%s', '%s', '%s', %d, '%s', '\\x%x', %d);\n"
txInsert = "INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type, " +
"value) VALUES ('%s', '%s', '%s', '%s', '%s', %d, '%s', '\\x%x', %d, '%s');\n"

alInsert = "INSERT INTO eth.access_list_elements (tx_id, index, address, storage_keys) VALUES ('%s', %d, '%s', '%s');\n"

Expand Down Expand Up @@ -191,42 +187,40 @@ func (sqw *SQLWriter) upsertIPLDRaw(codec, mh uint64, raw []byte) (string, strin
}

func (sqw *SQLWriter) upsertHeaderCID(header models.HeaderModel) {
var stmt string
if header.BaseFee == nil {
stmt = fmt.Sprintf(headerInsertWithoutBaseFee, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID,
header.TotalDifficulty, header.NodeID, header.Reward, header.StateRoot, header.TxRoot,
header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1)
} else {
stmt = fmt.Sprintf(headerInsert, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID,
header.TotalDifficulty, header.NodeID, header.Reward, header.StateRoot, header.TxRoot,
header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1, *header.BaseFee)
}
stmt := fmt.Sprintf(headerInsert, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID,
header.TotalDifficulty, header.NodeID, header.Reward, header.StateRoot, header.TxRoot,
header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1, header.Coinbase)
sqw.stmts <- []byte(stmt)
indexerMetrics.blocks.Inc(1)
}

func (sqw *SQLWriter) upsertUncleCID(uncle models.UncleModel) {
sqw.stmts <- []byte(fmt.Sprintf(uncleInsert, uncle.BlockHash, uncle.HeaderID, uncle.ParentHash, uncle.CID, uncle.Reward, uncle.MhKey))
sqw.stmts <- []byte(fmt.Sprintf(uncleInsert, uncle.BlockHash, uncle.HeaderID, uncle.ParentHash, uncle.CID,
uncle.Reward, uncle.MhKey))
}

func (sqw *SQLWriter) upsertTransactionCID(transaction models.TxModel) {
sqw.stmts <- []byte(fmt.Sprintf(txInsert, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, transaction.MhKey, transaction.Data, transaction.Type))
sqw.stmts <- []byte(fmt.Sprintf(txInsert, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst,
transaction.Src, transaction.Index, transaction.MhKey, transaction.Data, transaction.Type, transaction.Value))
indexerMetrics.transactions.Inc(1)
}

func (sqw *SQLWriter) upsertAccessListElement(accessListElement models.AccessListElementModel) {
sqw.stmts <- []byte(fmt.Sprintf(alInsert, accessListElement.TxID, accessListElement.Index, accessListElement.Address, formatPostgresStringArray(accessListElement.StorageKeys)))
sqw.stmts <- []byte(fmt.Sprintf(alInsert, accessListElement.TxID, accessListElement.Index, accessListElement.Address,
formatPostgresStringArray(accessListElement.StorageKeys)))
indexerMetrics.accessListEntries.Inc(1)
}

func (sqw *SQLWriter) upsertReceiptCID(rct *models.ReceiptModel) {
sqw.stmts <- []byte(fmt.Sprintf(rctInsert, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, rct.PostState, rct.PostStatus, rct.LogRoot))
sqw.stmts <- []byte(fmt.Sprintf(rctInsert, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey,
rct.PostState, rct.PostStatus, rct.LogRoot))
indexerMetrics.receipts.Inc(1)
}

func (sqw *SQLWriter) upsertLogCID(logs []*models.LogsModel) {
for _, l := range logs {
sqw.stmts <- []byte(fmt.Sprintf(logInsert, l.LeafCID, l.LeafMhKey, l.ReceiptID, l.Address, l.Index, l.Topic0, l.Topic1, l.Topic2, l.Topic3, l.Data))
sqw.stmts <- []byte(fmt.Sprintf(logInsert, l.LeafCID, l.LeafMhKey, l.ReceiptID, l.Address, l.Index, l.Topic0,
l.Topic1, l.Topic2, l.Topic3, l.Data))
indexerMetrics.logs.Inc(1)
}
}
Expand Down
9 changes: 8 additions & 1 deletion statediff/indexer/database/sql/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he
TxRoot: header.TxHash.String(),
UncleRoot: header.UncleHash.String(),
Timestamp: header.Time,
BaseFee: baseFee,
Coinbase: header.Coinbase.String(),
})
}

Expand Down Expand Up @@ -316,6 +316,12 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
// index tx
trx := args.txs[i]
txID := trx.Hash().String()

var val string
if trx.Value() != nil {
val = trx.Value().String()
}

// derive sender for the tx that corresponds with this receipt
from, err := types.Sender(signer, trx)
if err != nil {
Expand All @@ -331,6 +337,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
CID: txNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(txNode.Cid()),
Type: trx.Type(),
Value: val,
}
if err := sdi.dbWriter.upsertTransactionCID(tx.dbtx, txModel); err != nil {
return err
Expand Down
8 changes: 4 additions & 4 deletions statediff/indexer/database/sql/pgx_indexer_legacy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestPGXIndexerLegacy(t *testing.T) {
t.Run("Publish and index header IPLDs", func(t *testing.T) {
setupLegacyPGX(t)
defer tearDown(t)
pgStr := `SELECT cid, cast(td AS TEXT), cast(reward AS TEXT), block_hash, base_fee
pgStr := `SELECT cid, cast(td AS TEXT), cast(reward AS TEXT), block_hash, coinbase
FROM eth.header_cids
WHERE block_number = $1`
// check header was properly indexed
Expand All @@ -72,18 +72,18 @@ func TestPGXIndexerLegacy(t *testing.T) {
TD string
Reward string
BlockHash string `db:"block_hash"`
BaseFee *int64 `db:"base_fee"`
Coinbase string `db:"coinbase"`
}
header := new(res)

err = db.QueryRow(context.Background(), pgStr, legacyData.BlockNumber.Uint64()).Scan(
&header.CID, &header.TD, &header.Reward, &header.BlockHash, &header.BaseFee)
&header.CID, &header.TD, &header.Reward, &header.BlockHash, &header.Coinbase)
require.NoError(t, err)

test_helpers.ExpectEqual(t, header.CID, legacyHeaderCID.String())
test_helpers.ExpectEqual(t, header.TD, legacyData.MockBlock.Difficulty().String())
test_helpers.ExpectEqual(t, header.Reward, "5000000000000011250")
test_helpers.ExpectEqual(t, header.Coinbase, legacyData.MockHeader.Coinbase.String())
require.Nil(t, legacyData.MockHeader.BaseFee)
require.Nil(t, header.BaseFee)
})
}
10 changes: 5 additions & 5 deletions statediff/indexer/database/sql/pgx_indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,31 +150,31 @@ func TestPGXIndexer(t *testing.T) {
t.Run("Publish and index header IPLDs in a single tx", func(t *testing.T) {
setupPGX(t)
defer tearDown(t)
pgStr := `SELECT cid, cast(td AS TEXT), cast(reward AS TEXT), block_hash, cast(base_fee AS TEXT)
pgStr := `SELECT cid, cast(td AS TEXT), cast(reward AS TEXT), block_hash, coinbase
FROM eth.header_cids
WHERE block_number = $1`
// check header was properly indexed
type res struct {
CID string
TD string
Reward string
BlockHash string `db:"block_hash"`
BaseFee *string `db:"base_fee"`
BlockHash string `db:"block_hash"`
Coinbase string `db:"coinbase"`
}
header := new(res)
err = db.QueryRow(context.Background(), pgStr, mocks.BlockNumber.Uint64()).Scan(
&header.CID,
&header.TD,
&header.Reward,
&header.BlockHash,
&header.BaseFee)
&header.Coinbase)
if err != nil {
t.Fatal(err)
}
test_helpers.ExpectEqual(t, header.CID, headerCID.String())
test_helpers.ExpectEqual(t, header.TD, mocks.MockBlock.Difficulty().String())
test_helpers.ExpectEqual(t, header.Reward, "2000000000000021250")
test_helpers.ExpectEqual(t, *header.BaseFee, mocks.MockHeader.BaseFee.String())
test_helpers.ExpectEqual(t, header.Coinbase, mocks.MockHeader.Coinbase.String())
dc, err := cid.Decode(header.CID)
if err != nil {
t.Fatal(err)
Expand Down
4 changes: 2 additions & 2 deletions statediff/indexer/database/sql/postgres/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ func ResolveDriverType(str string) (DriverType, error) {
var DefaultConfig = Config{
Hostname: "localhost",
Port: 5432,
DatabaseName: "vulcanize_testing",
DatabaseName: "vulcanize_test",
Username: "postgres",
Password: "password",
Password: "",
}

// Config holds params for a Postgres db
Expand Down
6 changes: 3 additions & 3 deletions statediff/indexer/database/sql/postgres/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ type DB struct {

// InsertHeaderStm satisfies the sql.Statements interface
func (db *DB) InsertHeaderStm() string {
return `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee)
return `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
ON CONFLICT (block_hash) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) = ($3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, eth.header_cids.times_validated + 1, $16)`
ON CONFLICT (block_hash) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) = ($3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, eth.header_cids.times_validated + 1, $16)`
}

// InsertUncleStm satisfies the sql.Statements interface
Expand All @@ -50,7 +50,7 @@ func (db *DB) InsertUncleStm() string {

// InsertTxStm satisfies the sql.Statements interface
func (db *DB) InsertTxStm() string {
return `INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
return `INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type, value) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
ON CONFLICT (tx_hash) DO NOTHING`
}

Expand Down
6 changes: 3 additions & 3 deletions statediff/indexer/database/sql/sqlx_indexer_legacy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestSQLXIndexerLegacy(t *testing.T) {
t.Run("Publish and index header IPLDs", func(t *testing.T) {
setupLegacySQLX(t)
defer tearDown(t)
pgStr := `SELECT cid, td, reward, block_hash, base_fee
pgStr := `SELECT cid, td, reward, block_hash, coinbase
FROM eth.header_cids
WHERE block_number = $1`
// check header was properly indexed
Expand All @@ -82,7 +82,7 @@ func TestSQLXIndexerLegacy(t *testing.T) {
TD string
Reward string
BlockHash string `db:"block_hash"`
BaseFee *int64 `db:"base_fee"`
Coinbase string `db:"coinbase"`
}
header := new(res)
err = db.QueryRow(context.Background(), pgStr, legacyData.BlockNumber.Uint64()).(*sqlx.Row).StructScan(header)
Expand All @@ -91,7 +91,7 @@ func TestSQLXIndexerLegacy(t *testing.T) {
test_helpers.ExpectEqual(t, header.CID, legacyHeaderCID.String())
test_helpers.ExpectEqual(t, header.TD, legacyData.MockBlock.Difficulty().String())
test_helpers.ExpectEqual(t, header.Reward, "5000000000000011250")
test_helpers.ExpectEqual(t, header.Coinbase, legacyData.MockHeader.Coinbase.String())
require.Nil(t, legacyData.MockHeader.BaseFee)
require.Nil(t, header.BaseFee)
})
}
8 changes: 4 additions & 4 deletions statediff/indexer/database/sql/sqlx_indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,16 +177,16 @@ func TestSQLXIndexer(t *testing.T) {
t.Run("Publish and index header IPLDs in a single tx", func(t *testing.T) {
setupSQLX(t)
defer tearDown(t)
pgStr := `SELECT cid, td, reward, block_hash, base_fee
pgStr := `SELECT cid, td, reward, block_hash, coinbase
FROM eth.header_cids
WHERE block_number = $1`
// check header was properly indexed
type res struct {
CID string
TD string
Reward string
BlockHash string `db:"block_hash"`
BaseFee *string `db:"base_fee"`
BlockHash string `db:"block_hash"`
Coinbase string `db:"coinbase"`
}
header := new(res)
err = db.QueryRow(context.Background(), pgStr, mocks.BlockNumber.Uint64()).(*sqlx.Row).StructScan(header)
Expand All @@ -196,7 +196,7 @@ func TestSQLXIndexer(t *testing.T) {
test_helpers.ExpectEqual(t, header.CID, headerCID.String())
test_helpers.ExpectEqual(t, header.TD, mocks.MockBlock.Difficulty().String())
test_helpers.ExpectEqual(t, header.Reward, "2000000000000021250")
test_helpers.ExpectEqual(t, *header.BaseFee, mocks.MockHeader.BaseFee.String())
test_helpers.ExpectEqual(t, header.Coinbase, mocks.MockHeader.Coinbase.String())
dc, err := cid.Decode(header.CID)
if err != nil {
t.Fatal(err)
Expand Down
Loading

0 comments on commit 6c285d6

Please sign in to comment.