Skip to content

Commit

Permalink
Merge pull request #47 from MinaFoundation/sharding+cassandra-creds
Browse files Browse the repository at this point in the history
PM-1284 Sharding with 600 per 24h and use of ExponentialBackoffRetryPolicy
  • Loading branch information
piotr-iohk authored Mar 14, 2024
2 parents d47a6c8 + 2a30606 commit 2c329d8
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 6 deletions.
3 changes: 2 additions & 1 deletion database/migrations/1_create_submissions_table.up.cql
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
CREATE TABLE IF NOT EXISTS submissions (
// filled by uptime_service_backend
submitted_at_date TEXT,
shard INT,
submitted_at TIMESTAMP,
submitter TEXT,
created_at TIMESTAMP,
Expand All @@ -19,5 +20,5 @@ CREATE TABLE IF NOT EXISTS submissions (
validation_error TEXT,
// was it verified by zk-validator
verified BOOLEAN,
PRIMARY KEY (submitted_at_date, submitted_at, submitter)
PRIMARY KEY ((submitted_at_date, shard), submitted_at, submitter)
);
19 changes: 15 additions & 4 deletions src/delegation_backend/aws_keyspaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func InitializeKeyspaceSession(config *AwsKeyspacesConfig) (*gocql.Session, erro

cluster.Consistency = gocql.LocalQuorum
cluster.DisableInitialHostLookup = false
cluster.RetryPolicy = &gocql.ExponentialBackoffRetryPolicy{NumRetries: 10, Min: 100 * time.Millisecond, Max: 10 * time.Second}

session, err := cluster.CreateSession()
if err != nil {
Expand Down Expand Up @@ -200,14 +201,24 @@ func (kc *KeyspaceContext) insertSubmission(submission *Submission) error {
}, maxRetries, initialBackoff)
}

// calculateShard returns the shard number for a given submission time.
// 0-599 are the possible shard numbers, each representing a 144-second interval within 24h.
// shard = (3600 * hour + 60 * minute + second) // 144
func calculateShard(submittedAt time.Time) int {
hour := submittedAt.Hour()
minute := submittedAt.Minute()
second := submittedAt.Second()
return (3600*hour + 60*minute + second) / 144
}

func (kc *KeyspaceContext) tryInsertSubmission(submission *Submission, includeRawBlock bool) error {
query := "INSERT INTO " + kc.Keyspace + ".submissions (submitted_at_date, submitted_at, submitter, remote_addr, peer_id, snark_work, block_hash, created_at, graphql_control_port, built_with_commit_sha"
values := []interface{}{submission.SubmittedAtDate, submission.SubmittedAt, submission.Submitter, submission.RemoteAddr, submission.PeerId, submission.SnarkWork, submission.BlockHash, submission.CreatedAt, submission.GraphqlControlPort, submission.BuiltWithCommitSha}
query := "INSERT INTO " + kc.Keyspace + ".submissions (submitted_at_date, shard, submitted_at, submitter, remote_addr, peer_id, snark_work, block_hash, created_at, graphql_control_port, built_with_commit_sha"
values := []interface{}{submission.SubmittedAtDate, calculateShard(submission.SubmittedAt), submission.SubmittedAt, submission.Submitter, submission.RemoteAddr, submission.PeerId, submission.SnarkWork, submission.BlockHash, submission.CreatedAt, submission.GraphqlControlPort, submission.BuiltWithCommitSha}
if includeRawBlock {
query += ", raw_block) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
query += ", raw_block) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
values = append(values, submission.RawBlock)
} else {
query += ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
query += ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
}
return kc.Session.Query(query, values...).Exec()
}
Expand Down
2 changes: 1 addition & 1 deletion src/integration_tests/aws_keyspaces_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
func checkForSubmissions(session *gocql.Session, keyspace, date string) (bool, error) {
var submitter, blockHash, rawBlock string

query := fmt.Sprintf("SELECT submitter, block_hash, raw_block FROM %s.submissions WHERE submitted_at_date='%s' LIMIT 1", keyspace, date)
query := fmt.Sprintf("SELECT submitter, block_hash, raw_block FROM %s.submissions WHERE submitted_at_date='%s' LIMIT 1 ALLOW FILTERING", keyspace, date)

if err := session.Query(query).Scan(&submitter, &blockHash, &rawBlock); err != nil {
if err == gocql.ErrNotFound {
Expand Down

0 comments on commit 2c329d8

Please sign in to comment.