diff --git a/shell.nix b/shell.nix index cd04ac3..832918e 100644 --- a/shell.nix +++ b/shell.nix @@ -1,11 +1,11 @@ -with import { }; +with import (fetchTarball "https://nixos.org/channels/nixos-unstable/nixexprs.tar.xz") { }; let minaSigner = import ./external/c-reference-signer; in { devEnv = stdenv.mkDerivation { name = "dev"; - buildInputs = [ stdenv go_1_20 glibc minaSigner ]; + buildInputs = [ stdenv go_1_21 glibc minaSigner ]; shellHook = '' export LIB_MINA_SIGNER=${minaSigner}/lib/libmina_signer.so return diff --git a/src/delegation_backend/aws_keyspaces.go b/src/delegation_backend/aws_keyspaces.go index 6e4e911..6deedf9 100644 --- a/src/delegation_backend/aws_keyspaces.go +++ b/src/delegation_backend/aws_keyspaces.go @@ -187,20 +187,6 @@ type KeyspaceContext struct { Log *logging.ZapEventLogger } -// Insert a submission into the Keyspaces database -func (kc *KeyspaceContext) insertSubmission(submission *Submission) error { - return ExponentialBackoff(func() error { - if err := kc.tryInsertSubmission(submission, true); err != nil { - if isRowSizeError(err) { - kc.Log.Warnf("KeyspaceSave: Block too large, inserting without raw_block") - return kc.tryInsertSubmission(submission, false) - } - return err - } - return nil - }, maxRetries, initialBackoff) -} - // calculateShard returns the shard number for a given submission time. // 0-599 are the possible shard numbers, each representing a 144-second interval within 24h. // shard = (3600 * hour + 60 * minute + second) // 144 @@ -211,21 +197,71 @@ func calculateShard(submittedAt time.Time) int { return (3600*hour + 60*minute + second) / 144 } -func (kc *KeyspaceContext) tryInsertSubmission(submission *Submission, includeRawBlock bool) error { - query := "INSERT INTO " + kc.Keyspace + ".submissions (submitted_at_date, shard, submitted_at, submitter, remote_addr, peer_id, snark_work, block_hash, created_at, graphql_control_port, built_with_commit_sha" - values := []interface{}{submission.SubmittedAtDate, calculateShard(submission.SubmittedAt), submission.SubmittedAt, submission.Submitter, submission.RemoteAddr, submission.PeerId, submission.SnarkWork, submission.BlockHash, submission.CreatedAt, submission.GraphqlControlPort, submission.BuiltWithCommitSha} - if includeRawBlock { - query += ", raw_block) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" - values = append(values, submission.RawBlock) - } else { - query += ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" +// Estimate the size of the raw block in bytes. +// In Go, len() returns the number of bytes in a slice, which should suffice for a rough estimation. +func calculateBlockSize(rawBlock []byte) int { + return len(rawBlock) +} + +// Insert a submission into the Keyspaces database +func (kc *KeyspaceContext) insertSubmission(submission *Submission) error { + return ExponentialBackoff(func() error { + if submission.RawBlock == nil { + kc.Log.Error("KeyspaceSave: Block is missing in the submission, which is not expected, but inserting without raw_block") + if err := kc.insertSubmissionWithoutRawBlock(submission); err != nil { + return err + } + } else if calculateBlockSize(submission.RawBlock) > MAX_BLOCK_SIZE { + kc.Log.Infof("KeyspaceSave: Block too large (%d bytes), inserting without raw_block", calculateBlockSize(submission.RawBlock)) + if err := kc.insertSubmissionWithoutRawBlock(submission); err != nil { + return err + } + } else { + if err := kc.insertSubmissionWithRawBlock(submission); err != nil { + return err + } + + } + + return nil + }, maxRetries, initialBackoff) +} + +func (kc *KeyspaceContext) insertSubmissionWithoutRawBlock(submission *Submission) error { + query := "INSERT INTO " + kc.Keyspace + ".submissions (submitted_at_date, shard, submitted_at, submitter, remote_addr, peer_id, snark_work, block_hash, created_at, graphql_control_port, built_with_commit_sha) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" + values := []interface{}{ + submission.SubmittedAtDate, + calculateShard(submission.SubmittedAt), + submission.SubmittedAt, + submission.Submitter, + submission.RemoteAddr, + submission.PeerId, + submission.SnarkWork, + submission.BlockHash, + submission.CreatedAt, + submission.GraphqlControlPort, + submission.BuiltWithCommitSha, } return kc.Session.Query(query, values...).Exec() } -func isRowSizeError(err error) bool { - // Replace with more robust error checking if possible - return strings.Contains(err.Error(), "The update would cause the row to exceed the maximum allowed size") +func (kc *KeyspaceContext) insertSubmissionWithRawBlock(submission *Submission) error { + query := "INSERT INTO " + kc.Keyspace + ".submissions (submitted_at_date, shard, submitted_at, submitter, remote_addr, peer_id, snark_work, block_hash, created_at, graphql_control_port, built_with_commit_sha, raw_block) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" + values := []interface{}{ + submission.SubmittedAtDate, + calculateShard(submission.SubmittedAt), + submission.SubmittedAt, + submission.Submitter, + submission.RemoteAddr, + submission.PeerId, + submission.SnarkWork, + submission.BlockHash, + submission.CreatedAt, + submission.GraphqlControlPort, + submission.BuiltWithCommitSha, + submission.RawBlock, + } + return kc.Session.Query(query, values...).Exec() } // KeyspaceSave saves the provided objects into Amazon Keyspaces. @@ -235,7 +271,7 @@ func (kc *KeyspaceContext) KeyspaceSave(objs ObjectsToSave) { if strings.HasPrefix(path, "submissions/") { submission, err := kc.parseSubmissionBytes(bs, path) if err != nil { - kc.Log.Warnf("KeyspaceSave: Error parsing submission JSON: %v", err) + kc.Log.Errorf("KeyspaceSave: Error parsing submission JSON: %v", err) continue } submissionToSave.BlockHash = submission.BlockHash @@ -252,19 +288,19 @@ func (kc *KeyspaceContext) KeyspaceSave(objs ObjectsToSave) { } else if strings.HasPrefix(path, "blocks/") { block, err := kc.parseBlockBytes(bs, path) if err != nil { - kc.Log.Warnf("KeyspaceSave: Error parsing block file: %v", err) + kc.Log.Errorf("KeyspaceSave: Error parsing block file: %v", err) continue } submissionToSave.RawBlock = block.RawBlock submissionToSave.BlockHash = block.BlockHash } else { - kc.Log.Warnf("KeyspaceSave: Unknown path format: %s", path) + kc.Log.Errorf("KeyspaceSave: Unknown path format: %s", path) } } - kc.Log.Debugf("KeyspaceSave: Saving submission for block: %v, submitter: %v, submitted_at: %v", submissionToSave.BlockHash, submissionToSave.Submitter, submissionToSave.SubmittedAt) + kc.Log.Infof("KeyspaceSave: Saving submission for block: %v, submitter: %v, submitted_at: %v", submissionToSave.BlockHash, submissionToSave.Submitter, submissionToSave.SubmittedAt) if err := kc.insertSubmission(submissionToSave); err != nil { - kc.Log.Warnf("KeyspaceSave: Error saving submission to Keyspaces: %v", err) + kc.Log.Errorf("KeyspaceSave: Error saving submission to Keyspaces: %v", err) } } diff --git a/src/delegation_backend/constants.go b/src/delegation_backend/constants.go index 42c9194..a35c09b 100644 --- a/src/delegation_backend/constants.go +++ b/src/delegation_backend/constants.go @@ -16,6 +16,7 @@ const WHITELIST_REFRESH_INTERVAL = 10 * 60 * 1000000000 // 10m var PK_PREFIX = [...]byte{1, 1} var SIG_PREFIX = [...]byte{1} var BLOCK_HASH_PREFIX = [...]byte{1} +var MAX_BLOCK_SIZE = 1000000 // (1MB) max block size in bytes for Cassandra, blocks larger than this size will be stored in S3 only func NetworkId() uint8 { if os.Getenv("NETWORK") == "mainnet" { diff --git a/src/delegation_backend/submit.go b/src/delegation_backend/submit.go index df94614..e38358e 100644 --- a/src/delegation_backend/submit.go +++ b/src/delegation_backend/submit.go @@ -51,7 +51,7 @@ func (ctx *AwsContext) S3Save(objs ObjectsToSave) { } } - ctx.Log.Debugf("S3Save: saving %s", path) + ctx.Log.Infof("S3Save: saving %s", path) _, err := ctx.Client.PutObject(ctx.Context, &s3.PutObjectInput{ Bucket: ctx.BucketName, Key: fullKey, @@ -76,10 +76,10 @@ func LocalFileSystemSave(objs ObjectsToSave, directory string, log logging.Stand err := os.MkdirAll(filepath.Dir(fullPath), os.ModePerm) if err != nil { - log.Warnf("LocalFileSystemSave: Error creating directories for %s: %v", fullPath, err) + log.Errorf("LocalFileSystemSave: Error creating directories for %s: %v", fullPath, err) continue // skip to the next object } - log.Debugf("LocalFileSystemSave: saving %s", fullPath) + log.Infof("LocalFileSystemSave: saving %s", fullPath) err = os.WriteFile(fullPath, bs, 0644) if err != nil { log.Warnf("Error writing to file %s: %v", fullPath, err)