Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions op-node/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ var (
Name: "sequencer.use-finalized",
Usage: "Enable use of only finalized L1 blocks as L1 origin. Overwrites the value of 'sequencer.l1-confs'.",
EnvVars: prefixEnvVars("SEQUENCER_USE_FINALIZED"),
Value: false,
Value: false, // Sishan TODO: So Celo set it to false by default? Do we want to change to true if deriving from caff node?
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to turn it on?

Copy link
Collaborator Author

@dailinsubjam dailinsubjam Mar 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe due to Celo is actively working on it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to turn it on?

Yeah, I think so!

Maybe due to Celo is actively working on it.

Looks like this PR has been merged.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but I guess the reason they haven't turned it on is because later they'll send some other fix relevant to this feature.

Category: SequencerCategory,
}
L1EpochPollIntervalFlag = &cli.DurationFlag{
Expand Down Expand Up @@ -444,13 +444,6 @@ var (
Value: true,
Category: OperationsCategory,
}
CaffNodeNamespace = &cli.Uint64Flag{
Name: "caff.namespace",
Usage: "Namespace for the caffeinated node",
EnvVars: prefixEnvVars("CAFF_NAMESPACE"),
Value: 42,
Category: OperationsCategory,
}
CaffNodeNextHotShotBlockNum = &cli.Uint64Flag{
Name: "caff.next-hotshot-block-num",
Usage: "Next hotshot block number for the caffeinated node",
Expand Down Expand Up @@ -526,7 +519,6 @@ var optionalFlags = []cli.Flag{
InteropRPCPort,
InteropJWTSecret,
CaffNodeFlag,
CaffNodeNamespace,
CaffNodeNextHotShotBlockNum,
CaffNodePollingHotShotPollingInterval,
CaffNodeHotShotUrls,
Expand Down
9 changes: 5 additions & 4 deletions op-node/rollup/derive/attributes_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,15 @@ func initEspressoStreamer(log log.Logger, cfg *rollup.Config) *EspressoStreamer
return nil
}
espressoStreamer := NewEspressoStreamer(
cfg.CaffNodeConfig.Namespace,
cfg.L2ChainID.Uint64(),
cfg.CaffNodeConfig.NextHotShotBlockNum,
cfg.CaffNodeConfig.PollingHotShotPollingInterval,
espressoClient.NewMultipleNodesClient(cfg.CaffNodeConfig.HotShotUrls),
log,
cfg.BatchInboxAddress,
cfg,
)
log.Info("Espresso streamer initialized", "namespace", cfg.CaffNodeConfig.Namespace, "next hotshot block num", cfg.CaffNodeConfig.NextHotShotBlockNum, "polling hotshot polling interval", cfg.CaffNodeConfig.PollingHotShotPollingInterval, "hotshot urls", cfg.CaffNodeConfig.HotShotUrls)
log.Info("Espresso streamer initialized", "namespace", cfg.L2ChainID.Uint64(), "next hotshot block num", cfg.CaffNodeConfig.NextHotShotBlockNum, "polling hotshot polling interval", cfg.CaffNodeConfig.PollingHotShotPollingInterval, "hotshot urls", cfg.CaffNodeConfig.HotShotUrls)
return espressoStreamer
}

Expand All @@ -103,16 +103,17 @@ func (aq *AttributesQueue) Origin() eth.L1BlockRef {
return aq.prev.Origin()
}

func (aq *AttributesQueue) NextAttributes(ctx context.Context, parent eth.L2BlockRef) (*AttributesWithParent, error) {
func (aq *AttributesQueue) NextAttributes(ctx context.Context, parent eth.L2BlockRef, l1Finalized func() (eth.L1BlockRef, error), l1BlockRefByNumber func(context.Context, uint64) (eth.L1BlockRef, error)) (*AttributesWithParent, error) {
// Get a batch if we need it
if aq.batch == nil {
var batch *SingularBatch
var concluding bool
var err error
// aq.batch.Epoch() is the L1 origin of the batch
// For caff node, call NextBatch() on EspressoStreamer instead, assign concluding to false for now
if aq.isCaffNode {
// Sishan TODO: change to this once BatchValidity is ready
// batch, concluding, err = aq.espressoStreamer.NextBatch(ctx, parent)
_, _, _ = aq.espressoStreamer.NextBatch(ctx, parent, l1Finalized, l1BlockRefByNumber)
batch, concluding, err = aq.prev.NextBatch(ctx, parent)
if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions op-node/rollup/derive/check_l1.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

type L1BlockRefByNumber interface {
L1BlockRefByNumber(context.Context, uint64) (eth.L1BlockRef, error)
L1FinalizedBlock() (eth.L1BlockRef, error)
}

// VerifyNewL1Origin checks that the L2 unsafe head still has a L1 origin that is on the canonical chain.
Expand Down
32 changes: 29 additions & 3 deletions op-node/rollup/derive/espresso_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@ func CheckBatchEspresso(ctx context.Context, cfg *rollup.Config, log log.Logger,
// add details to the log
log = batch.LogContext(log)

// Sishan TODO: check the L1 origin is already finalized

// Sishan TODO: these checks are copy-pasted from OP's checkSingularBatch(), we should check whether these apply to caff node
nextTimestamp := l2SafeHead.Time + cfg.BlockTime
if batch.Timestamp > nextTimestamp {
Expand Down Expand Up @@ -110,12 +108,13 @@ func CheckBatchEspresso(ctx context.Context, cfg *rollup.Config, log log.Logger,
return BatchAccept
}

func (s *EspressoStreamer) NextBatch(ctx context.Context, parent eth.L2BlockRef) (*SingularBatch, bool, error) {
func (s *EspressoStreamer) NextBatch(ctx context.Context, parent eth.L2BlockRef, l1Finalized func() (eth.L1BlockRef, error), l1BlockRefByNumber func(context.Context, uint64) (eth.L1BlockRef, error)) (*SingularBatch, bool, error) {
s.messageMutex.Lock()
defer s.messageMutex.Unlock()

// Sishan TODO: Find the batch that match the parent block, concluding is assignedto false for now
var returnBatch *SingularBatch
// remaining is the list of batches that are not processed yet
var remaining []*MessageWithHeight
batchLoop:
for i, message := range s.messagesWithHeights {
Expand Down Expand Up @@ -145,6 +144,33 @@ batchLoop:
return nil, false, NewCriticalError(fmt.Errorf("unknown batch validity type: %d", validity))
}
}

// check the L1 origin of returnBatch is already finalized
// if not, return NotEnoughData to wait longer
l1FinalizedBlock, err := l1Finalized()
if err != nil {
s.log.Error("failed to get the L1 finalized block", "err", err)
return nil, false, NotEnoughData
}
if returnBatch.Epoch().Number > l1FinalizedBlock.Number {
// we will not change s.messagesWithHeights here, because we want to keep the same lists of batches
s.log.Warn("you need to wait longer for the L1 origin to be finalized", "l1_origin", returnBatch.Epoch().Number)
return nil, false, NotEnoughData
} else {
// make sure it's a valid L1 origin state by check the hash
expectedL1BlockRef, err := l1BlockRefByNumber(ctx, returnBatch.Epoch().Number)
if err != nil {
s.log.Warn("failed to get the L1 block ref by number", "err", err, "l1_origin_number", returnBatch.Epoch().Number)
return nil, false, err
}
if returnBatch.Epoch().Hash != expectedL1BlockRef.Hash {
s.log.Warn("the L1 origin hash is not valid anymore", "l1_origin", returnBatch.Epoch().Hash, "expected", expectedL1BlockRef.Hash)
// drop the batch and wait longer
s.messagesWithHeights = remaining
return nil, false, NotEnoughData
}
}

s.messagesWithHeights = remaining
return returnBatch, false, nil
}
Expand Down
3 changes: 2 additions & 1 deletion op-node/rollup/derive/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type L1Fetcher interface {
L1BlockRefByHashFetcher
L1ReceiptsFetcher
L1TransactionFetcher
L1FinalizedBlock() (eth.L1BlockRef, error)
}

type ResettableStage interface {
Expand Down Expand Up @@ -211,7 +212,7 @@ func (dp *DerivationPipeline) Step(ctx context.Context, pendingSafeHead eth.L2Bl
dp.origin = newOrigin
}

if attrib, err := dp.attrib.NextAttributes(ctx, pendingSafeHead); err == nil {
if attrib, err := dp.attrib.NextAttributes(ctx, pendingSafeHead, dp.l1Fetcher.L1FinalizedBlock, dp.l1Fetcher.L1BlockRefByNumber); err == nil {
return attrib, nil
} else if err == io.EOF {
// If every stage has returned io.EOF, try to advance the L1 Origin
Expand Down
5 changes: 5 additions & 0 deletions op-node/rollup/driver/metered_l1fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ func (m *MeteredL1Fetcher) FetchReceipts(ctx context.Context, blockHash common.H

var _ derive.L1Fetcher = (*MeteredL1Fetcher)(nil)

func (m *MeteredL1Fetcher) L1FinalizedBlock() (eth.L1BlockRef, error) {
defer m.recordTime("L1FinalizedBlock")()
return m.inner.L1FinalizedBlock()
}

func (m *MeteredL1Fetcher) recordTime(method string) func() {
start := m.now()
return func() {
Expand Down
5 changes: 5 additions & 0 deletions op-node/rollup/finalized/finalized.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,8 @@ func (f *finalized) L1BlockRefByNumber(ctx context.Context, num uint64) (eth.L1B
}

var _ derive.L1Fetcher = (*finalized)(nil)

func (f *finalized) L1FinalizedBlock() (eth.L1BlockRef, error) {
finalized := f.l1Finalized()
return finalized, nil
}
1 change: 0 additions & 1 deletion op-node/rollup/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ type Config struct {
// CaffNodeConfig is the config for the Caff Node
type CaffNodeConfig struct {
IsCaffNode bool
Namespace uint64
NextHotShotBlockNum uint64
PollingHotShotPollingInterval time.Duration
HotShotUrls []string
Expand Down
1 change: 0 additions & 1 deletion op-node/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,6 @@ func NewSyncConfig(ctx *cli.Context, log log.Logger) (*sync.Config, error) {
func NewCaffNodeConfig(ctx *cli.Context) *rollup.CaffNodeConfig {
return &rollup.CaffNodeConfig{
IsCaffNode: ctx.Bool(flags.CaffNodeFlag.Name),
Namespace: ctx.Uint64(flags.CaffNodeNamespace.Name),
NextHotShotBlockNum: ctx.Uint64(flags.CaffNodeNextHotShotBlockNum.Name),
PollingHotShotPollingInterval: ctx.Duration(flags.CaffNodePollingHotShotPollingInterval.Name),
HotShotUrls: ctx.StringSlice(flags.CaffNodeHotShotUrls.Name),
Expand Down
5 changes: 5 additions & 0 deletions op-program/client/l1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,8 @@ func (o *OracleL1Client) InfoAndTxsByHash(ctx context.Context, hash common.Hash)
info, txs := o.oracle.TransactionsByBlockHash(hash)
return info, txs, nil
}

func (o *OracleL1Client) L1FinalizedBlock() (eth.L1BlockRef, error) {
// Since this is for the fault proof program, we can consider the head block as finalized
return o.L1BlockRefByHash(context.Background(), o.head.Hash)
}
11 changes: 11 additions & 0 deletions op-service/sources/eth_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,3 +442,14 @@ func (s *EthClient) BlockRefByHash(ctx context.Context, hash common.Hash) (eth.B
s.blockRefsCache.Add(ref.Hash, ref)
return ref, nil
}

func (s *EthClient) FinalizedBlock() (eth.L1BlockRef, error) {
var block *RPCBlock
err := s.client.CallContext(context.Background(), &block, "eth_getBlockByNumber", "finalized", false)
if err != nil {
return eth.L1BlockRef{}, fmt.Errorf("failed to fetch finalized block: %w", err)
}

num := uint64(block.Number)
return s.BlockRefByNumber(context.Background(), num)
}
4 changes: 4 additions & 0 deletions op-service/sources/l1_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,7 @@ func (s *L1Client) L1BlockRefByNumber(ctx context.Context, num uint64) (eth.L1Bl
func (s *L1Client) L1BlockRefByHash(ctx context.Context, hash common.Hash) (eth.L1BlockRef, error) {
return s.BlockRefByHash(ctx, hash)
}

func (s *L1Client) L1FinalizedBlock() (eth.L1BlockRef, error) {
return s.FinalizedBlock()
}
5 changes: 5 additions & 0 deletions op-service/testutils/mock_l1.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,8 @@ func (m *MockL1Source) L1BlockRefByHash(ctx context.Context, hash common.Hash) (
func (m *MockL1Source) ExpectL1BlockRefByHash(hash common.Hash, ref eth.L1BlockRef, err error) {
m.Mock.On("L1BlockRefByHash", hash).Once().Return(ref, err)
}

func (m *MockL1Source) L1FinalizedBlock() (eth.L1BlockRef, error) {
out := m.Mock.Called()
return out.Get(0).(eth.L1BlockRef), out.Error(1)
}