Skip to content

Commit

Permalink
Merge pull request #49 from MinaFoundation/caclulate-block-size
Browse files Browse the repository at this point in the history
PM-1343 Caclulate block size
  • Loading branch information
piotr-iohk authored Mar 27, 2024
2 parents eec11cc + a0ba732 commit 5fba917
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 35 deletions.
4 changes: 2 additions & 2 deletions shell.nix
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
with import <nixpkgs> { };
with import (fetchTarball "https://nixos.org/channels/nixos-unstable/nixexprs.tar.xz") { };
let
minaSigner = import ./external/c-reference-signer;
in
{
devEnv = stdenv.mkDerivation {
name = "dev";
buildInputs = [ stdenv go_1_20 glibc minaSigner ];
buildInputs = [ stdenv go_1_21 glibc minaSigner ];
shellHook = ''
export LIB_MINA_SIGNER=${minaSigner}/lib/libmina_signer.so
return
Expand Down
96 changes: 66 additions & 30 deletions src/delegation_backend/aws_keyspaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,20 +187,6 @@ type KeyspaceContext struct {
Log *logging.ZapEventLogger
}

// Insert a submission into the Keyspaces database
func (kc *KeyspaceContext) insertSubmission(submission *Submission) error {
return ExponentialBackoff(func() error {
if err := kc.tryInsertSubmission(submission, true); err != nil {
if isRowSizeError(err) {
kc.Log.Warnf("KeyspaceSave: Block too large, inserting without raw_block")
return kc.tryInsertSubmission(submission, false)
}
return err
}
return nil
}, maxRetries, initialBackoff)
}

// calculateShard returns the shard number for a given submission time.
// 0-599 are the possible shard numbers, each representing a 144-second interval within 24h.
// shard = (3600 * hour + 60 * minute + second) // 144
Expand All @@ -211,21 +197,71 @@ func calculateShard(submittedAt time.Time) int {
return (3600*hour + 60*minute + second) / 144
}

func (kc *KeyspaceContext) tryInsertSubmission(submission *Submission, includeRawBlock bool) error {
query := "INSERT INTO " + kc.Keyspace + ".submissions (submitted_at_date, shard, submitted_at, submitter, remote_addr, peer_id, snark_work, block_hash, created_at, graphql_control_port, built_with_commit_sha"
values := []interface{}{submission.SubmittedAtDate, calculateShard(submission.SubmittedAt), submission.SubmittedAt, submission.Submitter, submission.RemoteAddr, submission.PeerId, submission.SnarkWork, submission.BlockHash, submission.CreatedAt, submission.GraphqlControlPort, submission.BuiltWithCommitSha}
if includeRawBlock {
query += ", raw_block) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
values = append(values, submission.RawBlock)
} else {
query += ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
// Estimate the size of the raw block in bytes.
// In Go, len() returns the number of bytes in a slice, which should suffice for a rough estimation.
func calculateBlockSize(rawBlock []byte) int {
return len(rawBlock)
}

// Insert a submission into the Keyspaces database
func (kc *KeyspaceContext) insertSubmission(submission *Submission) error {
return ExponentialBackoff(func() error {
if submission.RawBlock == nil {
kc.Log.Error("KeyspaceSave: Block is missing in the submission, which is not expected, but inserting without raw_block")
if err := kc.insertSubmissionWithoutRawBlock(submission); err != nil {
return err
}
} else if calculateBlockSize(submission.RawBlock) > MAX_BLOCK_SIZE {
kc.Log.Infof("KeyspaceSave: Block too large (%d bytes), inserting without raw_block", calculateBlockSize(submission.RawBlock))
if err := kc.insertSubmissionWithoutRawBlock(submission); err != nil {
return err
}
} else {
if err := kc.insertSubmissionWithRawBlock(submission); err != nil {
return err
}

}

return nil
}, maxRetries, initialBackoff)
}

func (kc *KeyspaceContext) insertSubmissionWithoutRawBlock(submission *Submission) error {
query := "INSERT INTO " + kc.Keyspace + ".submissions (submitted_at_date, shard, submitted_at, submitter, remote_addr, peer_id, snark_work, block_hash, created_at, graphql_control_port, built_with_commit_sha) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
values := []interface{}{
submission.SubmittedAtDate,
calculateShard(submission.SubmittedAt),
submission.SubmittedAt,
submission.Submitter,
submission.RemoteAddr,
submission.PeerId,
submission.SnarkWork,
submission.BlockHash,
submission.CreatedAt,
submission.GraphqlControlPort,
submission.BuiltWithCommitSha,
}
return kc.Session.Query(query, values...).Exec()
}

func isRowSizeError(err error) bool {
// Replace with more robust error checking if possible
return strings.Contains(err.Error(), "The update would cause the row to exceed the maximum allowed size")
func (kc *KeyspaceContext) insertSubmissionWithRawBlock(submission *Submission) error {
query := "INSERT INTO " + kc.Keyspace + ".submissions (submitted_at_date, shard, submitted_at, submitter, remote_addr, peer_id, snark_work, block_hash, created_at, graphql_control_port, built_with_commit_sha, raw_block) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
values := []interface{}{
submission.SubmittedAtDate,
calculateShard(submission.SubmittedAt),
submission.SubmittedAt,
submission.Submitter,
submission.RemoteAddr,
submission.PeerId,
submission.SnarkWork,
submission.BlockHash,
submission.CreatedAt,
submission.GraphqlControlPort,
submission.BuiltWithCommitSha,
submission.RawBlock,
}
return kc.Session.Query(query, values...).Exec()
}

// KeyspaceSave saves the provided objects into Amazon Keyspaces.
Expand All @@ -235,7 +271,7 @@ func (kc *KeyspaceContext) KeyspaceSave(objs ObjectsToSave) {
if strings.HasPrefix(path, "submissions/") {
submission, err := kc.parseSubmissionBytes(bs, path)
if err != nil {
kc.Log.Warnf("KeyspaceSave: Error parsing submission JSON: %v", err)
kc.Log.Errorf("KeyspaceSave: Error parsing submission JSON: %v", err)
continue
}
submissionToSave.BlockHash = submission.BlockHash
Expand All @@ -252,19 +288,19 @@ func (kc *KeyspaceContext) KeyspaceSave(objs ObjectsToSave) {
} else if strings.HasPrefix(path, "blocks/") {
block, err := kc.parseBlockBytes(bs, path)
if err != nil {
kc.Log.Warnf("KeyspaceSave: Error parsing block file: %v", err)
kc.Log.Errorf("KeyspaceSave: Error parsing block file: %v", err)
continue
}
submissionToSave.RawBlock = block.RawBlock
submissionToSave.BlockHash = block.BlockHash
} else {
kc.Log.Warnf("KeyspaceSave: Unknown path format: %s", path)
kc.Log.Errorf("KeyspaceSave: Unknown path format: %s", path)
}

}
kc.Log.Debugf("KeyspaceSave: Saving submission for block: %v, submitter: %v, submitted_at: %v", submissionToSave.BlockHash, submissionToSave.Submitter, submissionToSave.SubmittedAt)
kc.Log.Infof("KeyspaceSave: Saving submission for block: %v, submitter: %v, submitted_at: %v", submissionToSave.BlockHash, submissionToSave.Submitter, submissionToSave.SubmittedAt)
if err := kc.insertSubmission(submissionToSave); err != nil {
kc.Log.Warnf("KeyspaceSave: Error saving submission to Keyspaces: %v", err)
kc.Log.Errorf("KeyspaceSave: Error saving submission to Keyspaces: %v", err)
}
}

Expand Down
1 change: 1 addition & 0 deletions src/delegation_backend/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const WHITELIST_REFRESH_INTERVAL = 10 * 60 * 1000000000 // 10m
var PK_PREFIX = [...]byte{1, 1}
var SIG_PREFIX = [...]byte{1}
var BLOCK_HASH_PREFIX = [...]byte{1}
var MAX_BLOCK_SIZE = 1000000 // (1MB) max block size in bytes for Cassandra, blocks larger than this size will be stored in S3 only

func NetworkId() uint8 {
if os.Getenv("NETWORK") == "mainnet" {
Expand Down
6 changes: 3 additions & 3 deletions src/delegation_backend/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (ctx *AwsContext) S3Save(objs ObjectsToSave) {
}
}

ctx.Log.Debugf("S3Save: saving %s", path)
ctx.Log.Infof("S3Save: saving %s", path)
_, err := ctx.Client.PutObject(ctx.Context, &s3.PutObjectInput{
Bucket: ctx.BucketName,
Key: fullKey,
Expand All @@ -76,10 +76,10 @@ func LocalFileSystemSave(objs ObjectsToSave, directory string, log logging.Stand

err := os.MkdirAll(filepath.Dir(fullPath), os.ModePerm)
if err != nil {
log.Warnf("LocalFileSystemSave: Error creating directories for %s: %v", fullPath, err)
log.Errorf("LocalFileSystemSave: Error creating directories for %s: %v", fullPath, err)
continue // skip to the next object
}
log.Debugf("LocalFileSystemSave: saving %s", fullPath)
log.Infof("LocalFileSystemSave: saving %s", fullPath)
err = os.WriteFile(fullPath, bs, 0644)
if err != nil {
log.Warnf("Error writing to file %s: %v", fullPath, err)
Expand Down

0 comments on commit 5fba917

Please sign in to comment.