Skip to content

Commit

Permalink
fix some carps
Browse files Browse the repository at this point in the history
  • Loading branch information
mask-pp committed Mar 14, 2024
1 parent acdbcfb commit 6c0d98f
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 51 deletions.
73 changes: 22 additions & 51 deletions packages/blobstorage/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package indexer
import (
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
Expand All @@ -18,12 +17,14 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/ethereum/go-ethereum/ethclient"
blobstorage "github.com/taikoxyz/taiko-mono/packages/blobstorage"
"github.com/taikoxyz/taiko-mono/packages/blobstorage/bindings/taikol1"
"github.com/taikoxyz/taiko-mono/packages/blobstorage/pkg/repo"
"github.com/urfave/cli/v2"
"golang.org/x/exp/slog"
"golang.org/x/sync/errgroup"

"github.com/taikoxyz/taiko-mono/packages/blobstorage"
"github.com/taikoxyz/taiko-mono/packages/blobstorage/bindings/taikol1"
"github.com/taikoxyz/taiko-mono/packages/blobstorage/pkg/repo"
"github.com/taikoxyz/taiko-mono/packages/blobstorage/pkg/utils"
)

type Response struct {
Expand Down Expand Up @@ -95,12 +96,7 @@ func InitFromConfig(ctx context.Context, i *Indexer, cfg *Config) (err error) {

func (i *Indexer) Start() error {
i.wg.Add(1)

go func() {
if err := i.eventLoop(i.ctx, i.latestIndexedBlockNumber); err != nil {
slog.Error("error in event loop", "error", err)
}
}()
go i.eventLoop(i.ctx, i.latestIndexedBlockNumber)

return nil
}
Expand All @@ -109,37 +105,21 @@ func (i *Indexer) Start() error {
// the latest processed block, and the latest header, and filter every block in between
// for BlockProposed events, if we are not already filtering. This lets us avoid
// unreliable subscription issues.
func (i *Indexer) eventLoop(ctx context.Context, startBlockID uint64) error {
defer func() {
i.wg.Done()
}()
func (i *Indexer) eventLoop(ctx context.Context, startBlockID uint64) {
defer i.wg.Done()

t := time.NewTicker(10 * time.Second)
defer t.Stop()

var filtering bool = false

for {
select {
case <-ctx.Done():
slog.Info("event loop context done")
return nil
return
case <-t.C:
func() {
defer func() {
filtering = false
}()
}()

if filtering {
continue
}

filtering = true

if err := i.filter(ctx); err != nil {
slog.Error("error filtering", "error", err)
return err
return
}
}
}
Expand All @@ -155,7 +135,10 @@ func (i *Indexer) withRetry(f func() error) error {
}
return f()
},
backoff.WithMaxRetries(backoff.NewConstantBackOff(i.cfg.BackOffRetryInterval), i.cfg.BackOffMaxRetries),
backoff.WithContext(
backoff.WithMaxRetries(backoff.NewConstantBackOff(i.cfg.BackOffRetryInterval), i.cfg.BackOffMaxRetries),
i.ctx,
),
)
}

Expand Down Expand Up @@ -185,12 +168,9 @@ func (i *Indexer) filter(ctx context.Context) error {
)

for j := i.latestIndexedBlockNumber + 1; j <= endBlockID; j += defaultBlockBatchSize {
end := j + uint64(defaultBlockBatchSize)
// if the end of the batch is greater than the latest block number, set end
// to the latest block number
if end > endBlockID {
end = endBlockID
}
end := utils.Min(j+defaultBlockBatchSize, endBlockID)

slog.Info("block batch", "start", j, "end", end)

Expand Down Expand Up @@ -219,11 +199,7 @@ func (i *Indexer) filter(ctx context.Context) error {
}

group.Go(func() error {
if err := i.withRetry(func() error { return i.storeBlob(ctx, event) }); err != nil {
return err
}

return nil
return i.withRetry(func() error { return i.storeBlob(ctx, event) })
})
}

Expand Down Expand Up @@ -255,7 +231,7 @@ func (i *Indexer) getBlockTimestamp(rpcURL string, blockNumber *big.Int) (uint64
return block.Time(), nil
}

func calculateBlobHash(commitmentStr string) string {
func calculateBlobHash(commitmentStr string) common.Hash {
// As per: https://eips.ethereum.org/EIPS/eip-4844
c := common.FromHex(commitmentStr)

Expand All @@ -270,9 +246,7 @@ func calculateBlobHash(commitmentStr string) string {
&commitment,
)

blobHashString := hex.EncodeToString(blobHash[:])

return blobHashString
return common.Hash(blobHash[:])
}

func (i *Indexer) checkReorg(ctx context.Context, event *taikol1.TaikoL1BlockProposed) error {
Expand Down Expand Up @@ -316,12 +290,9 @@ func (i *Indexer) storeBlob(ctx context.Context, event *taikol1.TaikoL1BlockProp
}

for _, data := range responseData.Data {
data.KzgCommitmentHex, err = hex.DecodeString(data.KzgCommitment[2:])
if err != nil {
return err
}
data.KzgCommitmentHex = common.FromHex(data.KzgCommitment)

metaBlobHash := hex.EncodeToString(event.Meta.BlobHash[:])
metaBlobHash := common.Hash(event.Meta.BlobHash[:])
// Comparing the hex strings of meta.blobHash (blobHash)
if calculateBlobHash(data.KzgCommitment) == metaBlobHash {
blockTs, err := i.getBlockTimestamp(i.cfg.RPCURL, new(big.Int).SetUint64(blockID))
Expand All @@ -330,9 +301,9 @@ func (i *Indexer) storeBlob(ctx context.Context, event *taikol1.TaikoL1BlockProp
return err
}

slog.Info("blockHash", "blobHash", fmt.Sprintf("%v%v", "0x", metaBlobHash))
slog.Info("blockHash", "blobHash", metaBlobHash.String())

err = i.storeBlobInDB(fmt.Sprintf("%v%v", "0x", metaBlobHash), data.KzgCommitment, data.Blob, blockTs, event.BlockId.Uint64(), event.Raw.BlockNumber)
err = i.storeBlobInDB(metaBlobHash.String(), data.KzgCommitment, data.Blob, blockTs, event.BlockId.Uint64(), event.Raw.BlockNumber)
if err != nil {
slog.Error("Error storing blob in DB", "error", err)
return err
Expand Down
19 changes: 19 additions & 0 deletions packages/blobstorage/pkg/utils/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package utils

import "golang.org/x/exp/constraints"

// Min return the minimum value of two integers.
func Min[T constraints.Integer](a, b T) T {
if a < b {
return a
}
return b
}

// Max return the maximum value of two integers.
func Max[T constraints.Integer](a, b T) T {
if a > b {
return a
}
return b
}

0 comments on commit 6c0d98f

Please sign in to comment.