Skip to content

PM-1284 Sharding with 600 per 24h and use of ExponentialBackoffRetryPolicy #47

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion database/migrations/1_create_submissions_table.up.cql
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
CREATE TABLE IF NOT EXISTS submissions (
// filled by uptime_service_backend
submitted_at_date TEXT,
shard INT,
submitted_at TIMESTAMP,
submitter TEXT,
created_at TIMESTAMP,
Expand All @@ -19,5 +20,5 @@ CREATE TABLE IF NOT EXISTS submissions (
validation_error TEXT,
// was it verified by zk-validator
verified BOOLEAN,
PRIMARY KEY (submitted_at_date, submitted_at, submitter)
PRIMARY KEY ((submitted_at_date, shard), submitted_at, submitter)
);
19 changes: 15 additions & 4 deletions src/delegation_backend/aws_keyspaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func InitializeKeyspaceSession(config *AwsKeyspacesConfig) (*gocql.Session, erro

cluster.Consistency = gocql.LocalQuorum
cluster.DisableInitialHostLookup = false
cluster.RetryPolicy = &gocql.ExponentialBackoffRetryPolicy{NumRetries: 10, Min: 100 * time.Millisecond, Max: 10 * time.Second}

session, err := cluster.CreateSession()
if err != nil {
Expand Down Expand Up @@ -200,14 +201,24 @@ func (kc *KeyspaceContext) insertSubmission(submission *Submission) error {
}, maxRetries, initialBackoff)
}

// calculateShard returns the shard number for a given submission time.
// 0-599 are the possible shard numbers, each representing a 144-second interval within 24h.
// shard = (3600 * hour + 60 * minute + second) // 144
func calculateShard(submittedAt time.Time) int {
hour := submittedAt.Hour()
minute := submittedAt.Minute()
second := submittedAt.Second()
return (3600*hour + 60*minute + second) / 144
}

func (kc *KeyspaceContext) tryInsertSubmission(submission *Submission, includeRawBlock bool) error {
query := "INSERT INTO " + kc.Keyspace + ".submissions (submitted_at_date, submitted_at, submitter, remote_addr, peer_id, snark_work, block_hash, created_at, graphql_control_port, built_with_commit_sha"
values := []interface{}{submission.SubmittedAtDate, submission.SubmittedAt, submission.Submitter, submission.RemoteAddr, submission.PeerId, submission.SnarkWork, submission.BlockHash, submission.CreatedAt, submission.GraphqlControlPort, submission.BuiltWithCommitSha}
query := "INSERT INTO " + kc.Keyspace + ".submissions (submitted_at_date, shard, submitted_at, submitter, remote_addr, peer_id, snark_work, block_hash, created_at, graphql_control_port, built_with_commit_sha"
values := []interface{}{submission.SubmittedAtDate, calculateShard(submission.SubmittedAt), submission.SubmittedAt, submission.Submitter, submission.RemoteAddr, submission.PeerId, submission.SnarkWork, submission.BlockHash, submission.CreatedAt, submission.GraphqlControlPort, submission.BuiltWithCommitSha}
if includeRawBlock {
query += ", raw_block) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
query += ", raw_block) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
values = append(values, submission.RawBlock)
} else {
query += ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
query += ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
}
return kc.Session.Query(query, values...).Exec()
}
Expand Down
2 changes: 1 addition & 1 deletion src/integration_tests/aws_keyspaces_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
func checkForSubmissions(session *gocql.Session, keyspace, date string) (bool, error) {
var submitter, blockHash, rawBlock string

query := fmt.Sprintf("SELECT submitter, block_hash, raw_block FROM %s.submissions WHERE submitted_at_date='%s' LIMIT 1", keyspace, date)
query := fmt.Sprintf("SELECT submitter, block_hash, raw_block FROM %s.submissions WHERE submitted_at_date='%s' LIMIT 1 ALLOW FILTERING", keyspace, date)

if err := session.Query(query).Scan(&submitter, &blockHash, &rawBlock); err != nil {
if err == gocql.ErrNotFound {
Expand Down
Loading