Skip to content

Commit

Permalink
Merge pull request #33 from MinaFoundation/denormalize-blocks-in-cass…
Browse files Browse the repository at this point in the history
…andra2

PM-834 Denormalize blocks in cassandra
  • Loading branch information
piotr-iohk authored Dec 20, 2023
2 parents 249650c + 6ce8953 commit 03fa150
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 63 deletions.
1 change: 1 addition & 0 deletions database/migrations/1_create_submissions_table.up.cql
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ CREATE TABLE IF NOT EXISTS submissions (
submitter TEXT,
created_at TIMESTAMP,
block_hash TEXT,
raw_block BLOB,
remote_addr TEXT,
peer_id TEXT,
snark_work BLOB,
Expand Down
4 changes: 0 additions & 4 deletions database/migrations/2_create_blocks_table.up.cql

This file was deleted.

1 change: 0 additions & 1 deletion database/migrations/2_drop_blocks_table.down.cql

This file was deleted.

73 changes: 43 additions & 30 deletions src/delegation_backend/aws_keyspaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type Submission struct {
RemoteAddr string `json:"remote_addr"`
PeerId string `json:"peer_id"`
Submitter string `json:"submitter"` // is base58check-encoded submitter's public key
RawBlock []byte `json:"raw_block,omitempty"`
SnarkWork []byte `json:"snark_work,omitempty"`
GraphqlControlPort int `json:"graphql_control_port,omitempty"`
BuiltWithCommitSha string `json:"built_with_commit_sha,omitempty"`
Expand Down Expand Up @@ -122,59 +123,71 @@ type KeyspaceContext struct {
// Insert a submission into the Keyspaces database
func (kc *KeyspaceContext) insertSubmission(submission *Submission) error {
return ExponentialBackoff(func() error {
return kc.Session.Query(
"INSERT INTO "+kc.Keyspace+".submissions (submitted_at_date, submitted_at, submitter, remote_addr, peer_id, snark_work, block_hash, created_at, graphql_control_port, built_with_commit_sha) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
submission.SubmittedAtDate,
submission.SubmittedAt,
submission.Submitter,
submission.RemoteAddr,
submission.PeerId,
submission.SnarkWork,
submission.BlockHash,
submission.CreatedAt,
submission.GraphqlControlPort,
submission.BuiltWithCommitSha,
).Exec()
if err := kc.tryInsertSubmission(submission, true); err != nil {
if isRowSizeError(err) {
kc.Log.Warnf("KeyspaceSave: Block too large, inserting without raw_block")
return kc.tryInsertSubmission(submission, false)
}
return err
}
return nil
}, maxRetries, initialBackoff)
}

// Insert a block into the Keyspaces database
func (kc *KeyspaceContext) insertBlock(block *Block) error {
return ExponentialBackoff(func() error {
return kc.Session.Query(
"INSERT INTO "+kc.Keyspace+".blocks (block_hash, raw_block) VALUES (?, ?)",
block.BlockHash,
block.RawBlock,
).Exec()
}, maxRetries, initialBackoff)
func (kc *KeyspaceContext) tryInsertSubmission(submission *Submission, includeRawBlock bool) error {
query := "INSERT INTO " + kc.Keyspace + ".submissions (submitted_at_date, submitted_at, submitter, remote_addr, peer_id, snark_work, block_hash, created_at, graphql_control_port, built_with_commit_sha"
values := []interface{}{submission.SubmittedAtDate, submission.SubmittedAt, submission.Submitter, submission.RemoteAddr, submission.PeerId, submission.SnarkWork, submission.BlockHash, submission.CreatedAt, submission.GraphqlControlPort, submission.BuiltWithCommitSha}
if includeRawBlock {
query += ", raw_block) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
values = append(values, submission.RawBlock)
} else {
query += ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
}
return kc.Session.Query(query, values...).Exec()
}

func isRowSizeError(err error) bool {
// Replace with more robust error checking if possible
return strings.Contains(err.Error(), "The update would cause the row to exceed the maximum allowed size")
}

// KeyspaceSave saves the provided objects into Amazon Keyspaces.
func (kc *KeyspaceContext) KeyspaceSave(objs ObjectsToSave) {
var submissionToSave *Submission = &Submission{}
for path, bs := range objs {
if strings.HasPrefix(path, "submissions/") {
submission, err := kc.parseSubmissionBytes(bs, path)
kc.Log.Debugf("KeyspaceSave: Saving submission for block: %v, submitter: %v, submitted_at: %v", submission.BlockHash, submission.Submitter, submission.SubmittedAt)
if err != nil {
kc.Log.Warnf("KeyspaceSave: Error parsing submission JSON: %v", err)
continue
}
if err := kc.insertSubmission(submission); err != nil {
kc.Log.Warnf("KeyspaceSave: Error saving submission to Keyspaces: %v", err)
}
submissionToSave.BlockHash = submission.BlockHash
submissionToSave.CreatedAt = submission.CreatedAt
submissionToSave.GraphqlControlPort = submission.GraphqlControlPort
submissionToSave.PeerId = submission.PeerId
submissionToSave.RemoteAddr = submission.RemoteAddr
submissionToSave.SnarkWork = submission.SnarkWork
submissionToSave.SubmittedAt = submission.SubmittedAt
submissionToSave.SubmittedAtDate = submission.SubmittedAtDate
submissionToSave.Submitter = submission.Submitter
submissionToSave.BuiltWithCommitSha = submission.BuiltWithCommitSha

} else if strings.HasPrefix(path, "blocks/") {
block, err := kc.parseBlockBytes(bs, path)
kc.Log.Debugf("KeyspaceSave: Saving block: %v", block.BlockHash)
if err != nil {
kc.Log.Warnf("KeyspaceSave: Error parsing block file: %v", err)
continue
}
if err := kc.insertBlock(block); err != nil {
kc.Log.Warnf("KeyspaceSave: Error saving block to Keyspaces: %v", err)
}
submissionToSave.RawBlock = block.RawBlock
submissionToSave.BlockHash = block.BlockHash
} else {
kc.Log.Warnf("KeyspaceSave: Unknown path format: %s", path)
}

}
kc.Log.Debugf("KeyspaceSave: Saving submission for block: %v, submitter: %v, submitted_at: %v", submissionToSave.BlockHash, submissionToSave.Submitter, submissionToSave.SubmittedAt)
if err := kc.insertSubmission(submissionToSave); err != nil {
kc.Log.Warnf("KeyspaceSave: Error saving submission to Keyspaces: %v", err)
}
}

Expand Down
44 changes: 17 additions & 27 deletions src/integration_tests/aws_keyspaces_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,34 +9,29 @@ import (
"github.com/gocql/gocql"
)

func checkForBlocks(session *gocql.Session, keyspace string) (bool, error) {
var blockHash string
query := fmt.Sprintf("SELECT block_hash FROM %s.blocks LIMIT 1", keyspace)
if err := session.Query(query).Scan(&blockHash); err != nil {
func checkForSubmissions(session *gocql.Session, keyspace, date string) (bool, error) {
var submitter, blockHash, rawBlock string

query := fmt.Sprintf("SELECT submitter, block_hash, raw_block FROM %s.submissions WHERE submitted_at_date='%s' LIMIT 1", keyspace, date)

if err := session.Query(query).Scan(&submitter, &blockHash, &rawBlock); err != nil {
if err == gocql.ErrNotFound {
return false, nil // No blocks found
return false, nil
}
return false, err // An error occurred
return false, err
}
log.Printf("Found block: %s\n", blockHash)
return true, nil // At least one block found
}

func checkForSubmissions(session *gocql.Session, keyspace, date string) (bool, error) {
var submitter string
query := fmt.Sprintf("SELECT submitter FROM %s.submissions WHERE submitted_at_date='%s' LIMIT 1", keyspace, date)
if err := session.Query(query).Scan(&submitter); err != nil {
if err == gocql.ErrNotFound {
return false, nil // No submissions found for today
}
return false, err // An error occurred
if submitter == "" || blockHash == "" || rawBlock == "" {
log.Printf("Found submission for today with empty required fields\n")
return false, nil // Found a row but required fields are empty
}
log.Printf("Found submission for today: %s\n", submitter)
return true, nil // At least one submission found for today

log.Printf("Found valid submission for today: submitter=%s, block_hash=%s\n", submitter, blockHash)
return true, nil // Valid submission found with all required fields
}

func waitUntilKeyspacesHasBlocksAndSubmissions(config dg.AppConfig) error {
log.Printf("Waiting for blocks and submissions to appear in Keyspaces")
log.Printf("Waiting for submissions to appear in Keyspaces")

sess, err := dg.InitializeKeyspaceSession(config.AwsKeyspaces)
if err != nil {
Expand All @@ -53,19 +48,14 @@ func waitUntilKeyspacesHasBlocksAndSubmissions(config dg.AppConfig) error {
case <-timeout:
return fmt.Errorf("timeout reached while waiting for Keyspaces contents")
case <-tick:
hasBlocks, err := checkForBlocks(sess, config.AwsKeyspaces.Keyspace)
if err != nil {
return err
}

hasSubmissionsForToday, err := checkForSubmissions(sess, config.AwsKeyspaces.Keyspace, currentDate)
if err != nil {
return err
}

// If both blocks and submissions for today are found, return
if hasBlocks && hasSubmissionsForToday {
log.Printf("Found blocks and submissions for today in Keyspaces")
if hasSubmissionsForToday {
log.Printf("Found submissions for today in Keyspaces")
return nil
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/integration_tests/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func init() {
if err != nil {
log.Fatalf("Failed to migrate up: %v", err)
}
tables := []string{"schema_migrations", "submissions", "blocks"}
tables := []string{"schema_migrations", "submissions"}
err = WaitForTablesActive(config.AwsKeyspaces, tables)
if err != nil {
log.Fatalf("Failed to wait for tables to be active: %v", err)
Expand Down

0 comments on commit 03fa150

Please sign in to comment.