Skip to content

Commit

Permalink
feat(blobstorage): isolating tables for no blob data duplication (#16702
Browse files Browse the repository at this point in the history
)

Co-authored-by: jeff <[email protected]>
  • Loading branch information
k-kaddal and cyberhorsey authored Apr 10, 2024
1 parent e25ad9f commit 55426ef
Show file tree
Hide file tree
Showing 8 changed files with 340 additions and 50 deletions.
17 changes: 6 additions & 11 deletions packages/blobstorage/blob_hash.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,19 @@
package blobstorage

type BlobHash struct {
BlobHash string
KzgCommitment string
BlobData string
BlockID uint64
EmittedBlockID uint64
BlobHash string
KzgCommitment string
BlobData string
}

type SaveBlobHashOpts struct {
BlobHash string
KzgCommitment string
BlobData string
BlockID uint64
EmittedBlockID uint64
BlobHash string
KzgCommitment string
BlobData string
}

type BlobHashRepository interface {
Save(opts SaveBlobHashOpts) error
FirstByBlobHash(blobHash string) (*BlobHash, error)
FindLatestBlockID() (uint64, error)
DeleteAllAfterBlockID(blockID uint64) error
}
19 changes: 19 additions & 0 deletions packages/blobstorage/block_meta.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package blobstorage

type BlockMeta struct {
BlobHash string
BlockID uint64
EmittedBlockID uint64
}

type SaveBlockMetaOpts struct {
BlobHash string
BlockID uint64
EmittedBlockID uint64
}

type BlockMetaRepository interface {
Save(opts SaveBlockMetaOpts) error
FindLatestBlockID() (uint64, error)
DeleteAllAfterBlockID(blockID uint64) error
}
40 changes: 21 additions & 19 deletions packages/blobstorage/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type Indexer struct {
startHeight *uint64
taikoL1 *taikol1.TaikoL1
db DB
blobHashRepo blobstorage.BlobHashRepository
repositories *repo.Repositories
cfg *Config
wg *sync.WaitGroup
ctx context.Context
Expand All @@ -53,7 +53,7 @@ func InitFromConfig(ctx context.Context, i *Indexer, cfg *Config) (err error) {
return err
}

blobHashRepo, err := repo.NewBlobHashRepository(db)
repositories, err := repo.NewRepositories(db)
if err != nil {
return err
}
Expand All @@ -73,7 +73,7 @@ func InitFromConfig(ctx context.Context, i *Indexer, cfg *Config) (err error) {
return err
}

i.blobHashRepo = blobHashRepo
i.repositories = repositories
i.ethClient = client
i.taikoL1 = taikoL1
i.startHeight = cfg.StartingBlockID
Expand Down Expand Up @@ -103,7 +103,9 @@ func (i *Indexer) setInitialIndexingBlock(
ctx context.Context,
) error {
// get most recently processed block height from the DB
latest, err := i.blobHashRepo.FindLatestBlockID()
// latest, err := i.blobHashRepo.FindLatestBlockID()
latest, err := i.repositories.BlockMetaRepo.FindLatestBlockID()
// latest, err := i.blockMetaRepo.FindLatestBlockID()
if err != nil {
return err
}
Expand Down Expand Up @@ -269,15 +271,16 @@ func calculateBlobHash(commitmentStr string) common.Hash {
}

func (i *Indexer) checkReorg(ctx context.Context, event *taikol1.TaikoL1BlockProposed) error {
n, err := i.blobHashRepo.FindLatestBlockID()
// n, err := i.blockMetaRepo.FindLatestBlockID()
n, err := i.repositories.BlockMetaRepo.FindLatestBlockID()
if err != nil {
return err
}

if n >= event.Raw.BlockNumber {
slog.Info("reorg detected", "event emitted in", event.Raw.BlockNumber, "latest emitted block id from db", n)
// reorg detected, we have seen a higher block number than this already.
return i.blobHashRepo.DeleteAllAfterBlockID(event.Raw.BlockNumber)
return i.repositories.DeleteAllAfterBlockID(ctx, event.Raw.BlockNumber)
}

return nil
Expand Down Expand Up @@ -311,11 +314,20 @@ func (i *Indexer) storeBlob(ctx context.Context, event *taikol1.TaikoL1BlockProp
metaBlobHash := common.BytesToHash(event.Meta.BlobHash[:])
// Comparing the hex strings of meta.blobHash (blobHash)
if calculateBlobHash(data.KzgCommitment) == metaBlobHash {
slog.Info("storing blobHash in db", "blobHash", metaBlobHash.String())
saveBlockMetaOpts := &blobstorage.SaveBlockMetaOpts{
BlobHash: metaBlobHash.String(),
BlockID: event.BlockId.Uint64(),
EmittedBlockID: event.Raw.BlockNumber,
}
saveBlobHashOpts := &blobstorage.SaveBlobHashOpts{
BlobHash: metaBlobHash.String(),
KzgCommitment: data.KzgCommitment,
BlobData: data.Blob,
}

err = i.storeBlobInDB(metaBlobHash.String(), data.KzgCommitment, data.Blob, event.BlockId.Uint64(), event.Raw.BlockNumber)
err := i.repositories.SaveBlobAndBlockMeta(ctx, saveBlockMetaOpts, saveBlobHashOpts)
if err != nil {
slog.Error("Error storing blob in DB", "error", err)
slog.Error("Error storing Blob and BlockMeta in DB", "error", err)
return err
}

Expand All @@ -325,13 +337,3 @@ func (i *Indexer) storeBlob(ctx context.Context, event *taikol1.TaikoL1BlockProp

return errors.New("BLOB not found")
}

func (i *Indexer) storeBlobInDB(blobHashInMeta, kzgCommitment, blob string, blockID uint64, emittedBlockID uint64) error {
return i.blobHashRepo.Save(blobstorage.SaveBlobHashOpts{
BlobHash: blobHashInMeta,
KzgCommitment: kzgCommitment,
BlockID: blockID,
BlobData: blob,
EmittedBlockID: emittedBlockID,
})
}
27 changes: 27 additions & 0 deletions packages/blobstorage/migrations/16670_create_blocks_meta_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
-- +goose Up
-- create blocks_meta table
CREATE TABLE IF NOT EXISTS blocks_meta (
block_id BIGINT NOT NULL PRIMARY KEY,
blob_hash VARCHAR(100) NOT NULL,
emitted_block_id BIGINT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
FOREIGN KEY (blob_hash) REFERENCES blob_hashes(blob_hash)
);

-- migrate data into blocks_meta
INSERT INTO blocks_meta (block_id, blob_hash, emitted_block_id, created_at, updated_at)
SELECT block_id, blob_hash, emitted_block_id, created_at, updated_at
FROM blob_hashes;

-- update indexes
DROP INDEX block_id_index on blob_hashes;
ALTER TABLE blocks_meta ADD INDEX `block_id_index` (`block_id`);

-- +goose Down
-- reverse indexes
DROP INDEX block_id_index on blocks_meta;
ALTER TABLE blob_hashes ADD INDEX `block_id_index` (`block_id`);

-- drop blocks_meta table
DROP TABLE IF EXISTS blocks_meta;
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
-- +goose Up
-- create a blob_hashes_temp with constraint blob_hash
CREATE TABLE IF NOT EXISTS blob_hashes_temp (
id INT NOT NULL PRIMARY KEY AUTO_INCREMENT,
blob_hash VARCHAR(100) NOT NULL UNIQUE,
kzg_commitment LONGTEXT NOT NULL,
blob_data LONGTEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

-- migrate data to blob_hashes_temp
INSERT IGNORE INTO blob_hashes_temp (id, blob_hash, kzg_commitment, blob_data, created_at, updated_at)
SELECT id, blob_hash, kzg_commitment, blob_data, created_at, updated_at
FROM blob_hashes;

-- Update blob_hash references and indexes
UPDATE blocks_meta bm
JOIN blob_hashes_temp bh ON bm.blob_hash = bh.blob_hash
SET bm.blob_hash = bh.blob_hash;

ALTER TABLE blocks_meta DROP FOREIGN KEY blocks_meta_ibfk_1;

ALTER TABLE `blob_hashes_temp` ADD INDEX `blob_hash_index` (`blob_hash`);

-- make blob_hashes_temp the new blob_hashes
DROP TABLE IF EXISTS blob_hashes;
ALTER TABLE blob_hashes_temp RENAME TO blob_hashes;

-- +goose Down
-- create a blob_hashes_temp as original
CREATE TABLE IF NOT EXISTS blob_hashes_temp (
id int NOT NULL PRIMARY KEY AUTO_INCREMENT,
block_id BIGINT NOT NULL,
emitted_block_id BIGINT NOT NULL,
blob_hash VARCHAR(100) NOT NULL,
kzg_commitment LONGTEXT NOT NULL,
blob_data LONGTEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

-- migrate data to blob_hashes_temp
INSERT INTO blob_hashes_temp (block_id, emitted_block_id, blob_hash, kzg_commitment, blob_data, created_at, updated_at)
SELECT
bm.block_id,
bm.emitted_block_id,
bh.blob_hash,
bh.kzg_commitment,
bh.blob_data,
bh.created_at,
bh.updated_at
FROM
blocks_meta bm
JOIN
blob_hashes bh ON bm.blob_hash = bh.blob_hash;

-- Update blob_hash references and indexes
UPDATE blocks_meta bm
JOIN blob_hashes_temp bh ON bm.blob_hash = bh.blob_hash
SET bm.blob_hash = bh.blob_hash;

-- create blob_hash_index
ALTER TABLE `blob_hashes_temp` ADD INDEX `blob_hash_index` (`blob_hash`);

-- make blob_hashes_temp the new blob_hashes
DROP TABLE IF EXISTS blob_hashes;
ALTER TABLE blob_hashes_temp RENAME TO blob_hashes;
30 changes: 10 additions & 20 deletions packages/blobstorage/pkg/repo/blob_hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,9 @@ func (r *BlobHashRepository) startQuery() *gorm.DB {

func (r *BlobHashRepository) Save(opts blobstorage.SaveBlobHashOpts) error {
b := &blobstorage.BlobHash{
BlobHash: opts.BlobHash,
KzgCommitment: opts.KzgCommitment,
BlobData: opts.BlobData,
BlockID: opts.BlockID,
EmittedBlockID: opts.EmittedBlockID,
BlobHash: opts.BlobHash,
KzgCommitment: opts.KzgCommitment,
BlobData: opts.BlobData,
}
if err := r.startQuery().Create(b).Error; err != nil {
return err
Expand All @@ -38,19 +36,6 @@ func (r *BlobHashRepository) Save(opts blobstorage.SaveBlobHashOpts) error {
return nil
}

func (r *BlobHashRepository) FindLatestBlockID() (uint64, error) {
q := `SELECT COALESCE(MAX(emitted_block_id), 0)
FROM blob_hashes`

var b uint64

if err := r.startQuery().Raw(q).Scan(&b).Error; err != nil {
return 0, err
}

return b, nil
}

func (r *BlobHashRepository) FirstByBlobHash(blobHash string) (*blobstorage.BlobHash, error) {
var b blobstorage.BlobHash

Expand All @@ -64,8 +49,13 @@ func (r *BlobHashRepository) FirstByBlobHash(blobHash string) (*blobstorage.Blob
// DeleteAllAfterBlockID is used when a reorg is detected
func (r *BlobHashRepository) DeleteAllAfterBlockID(blockID uint64) error {
query := `
DELETE FROM blob_hashes
WHERE block_id >= ?`
DELETE FROM blob_hashes
WHERE blob_hash IN (
SELECT blob_hashes.blob_hash
FROM blob_hashes
INNER JOIN block_meta ON blob_hashes.blob_hash = block_meta.blob_hash
WHERE block_meta.block_id >= ?
)`

if err := r.startQuery().Exec(query, blockID).Error; err != nil {
return err
Expand Down
63 changes: 63 additions & 0 deletions packages/blobstorage/pkg/repo/block_meta.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package repo

import (
blobstorage "github.com/taikoxyz/taiko-mono/packages/blobstorage"
"gorm.io/gorm"
)

type BlockMetaRepository struct {
db DB
}

func NewBlockMetaRepository(db DB) (*BlockMetaRepository, error) {
if db == nil {
return nil, ErrNoDB
}

return &BlockMetaRepository{
db: db,
}, nil
}

func (r *BlockMetaRepository) startQuery() *gorm.DB {
return r.db.GormDB().Table("blocks_meta")
}

func (r *BlockMetaRepository) Save(opts blobstorage.SaveBlockMetaOpts) error {
b := &blobstorage.BlockMeta{
BlobHash: opts.BlobHash,
BlockID: opts.BlockID,
EmittedBlockID: opts.EmittedBlockID,
}
if err := r.startQuery().Create(b).Error; err != nil {
return err
}

return nil
}

func (r *BlockMetaRepository) FindLatestBlockID() (uint64, error) {
q := `SELECT COALESCE(MAX(emitted_block_id), 0)
FROM blocks_meta`

var b uint64

if err := r.startQuery().Raw(q).Scan(&b).Error; err != nil {
return 0, err
}

return b, nil
}

// DeleteAllAfterBlockID is used when a reorg is detected
func (r *BlockMetaRepository) DeleteAllAfterBlockID(blockID uint64) error {
query := `
DELETE FROM blob_hashes
WHERE block_id >= ?`

if err := r.startQuery().Exec(query, blockID).Error; err != nil {
return err
}

return nil
}
Loading

0 comments on commit 55426ef

Please sign in to comment.