diff --git a/op-node/flags/flags.go b/op-node/flags/flags.go index fa26db8247ca3..a69ec0544e068 100644 --- a/op-node/flags/flags.go +++ b/op-node/flags/flags.go @@ -264,7 +264,7 @@ var ( Name: "sequencer.use-finalized", Usage: "Enable use of only finalized L1 blocks as L1 origin. Overwrites the value of 'sequencer.l1-confs'.", EnvVars: prefixEnvVars("SEQUENCER_USE_FINALIZED"), - Value: false, + Value: false, // Sishan TODO: So Celo set it to false by default? Do we want to change to true if deriving from caff node? Category: SequencerCategory, } L1EpochPollIntervalFlag = &cli.DurationFlag{ @@ -444,13 +444,6 @@ var ( Value: true, Category: OperationsCategory, } - CaffNodeNamespace = &cli.Uint64Flag{ - Name: "caff.namespace", - Usage: "Namespace for the caffeinated node", - EnvVars: prefixEnvVars("CAFF_NAMESPACE"), - Value: 42, - Category: OperationsCategory, - } CaffNodeNextHotShotBlockNum = &cli.Uint64Flag{ Name: "caff.next-hotshot-block-num", Usage: "Next hotshot block number for the caffeinated node", @@ -526,7 +519,6 @@ var optionalFlags = []cli.Flag{ InteropRPCPort, InteropJWTSecret, CaffNodeFlag, - CaffNodeNamespace, CaffNodeNextHotShotBlockNum, CaffNodePollingHotShotPollingInterval, CaffNodeHotShotUrls, diff --git a/op-node/rollup/derive/attributes_queue.go b/op-node/rollup/derive/attributes_queue.go index 56dea09fbc5b9..20912f6fa5f3e 100644 --- a/op-node/rollup/derive/attributes_queue.go +++ b/op-node/rollup/derive/attributes_queue.go @@ -76,7 +76,7 @@ func initEspressoStreamer(log log.Logger, cfg *rollup.Config) *EspressoStreamer return nil } espressoStreamer := NewEspressoStreamer( - cfg.CaffNodeConfig.Namespace, + cfg.L2ChainID.Uint64(), cfg.CaffNodeConfig.NextHotShotBlockNum, cfg.CaffNodeConfig.PollingHotShotPollingInterval, espressoClient.NewMultipleNodesClient(cfg.CaffNodeConfig.HotShotUrls), @@ -84,7 +84,7 @@ func initEspressoStreamer(log log.Logger, cfg *rollup.Config) *EspressoStreamer cfg.BatchInboxAddress, cfg, ) - log.Info("Espresso streamer initialized", "namespace", cfg.CaffNodeConfig.Namespace, "next hotshot block num", cfg.CaffNodeConfig.NextHotShotBlockNum, "polling hotshot polling interval", cfg.CaffNodeConfig.PollingHotShotPollingInterval, "hotshot urls", cfg.CaffNodeConfig.HotShotUrls) + log.Info("Espresso streamer initialized", "namespace", cfg.L2ChainID.Uint64(), "next hotshot block num", cfg.CaffNodeConfig.NextHotShotBlockNum, "polling hotshot polling interval", cfg.CaffNodeConfig.PollingHotShotPollingInterval, "hotshot urls", cfg.CaffNodeConfig.HotShotUrls) return espressoStreamer } @@ -103,16 +103,17 @@ func (aq *AttributesQueue) Origin() eth.L1BlockRef { return aq.prev.Origin() } -func (aq *AttributesQueue) NextAttributes(ctx context.Context, parent eth.L2BlockRef) (*AttributesWithParent, error) { +func (aq *AttributesQueue) NextAttributes(ctx context.Context, parent eth.L2BlockRef, l1Finalized func() (eth.L1BlockRef, error), l1BlockRefByNumber func(context.Context, uint64) (eth.L1BlockRef, error)) (*AttributesWithParent, error) { // Get a batch if we need it if aq.batch == nil { var batch *SingularBatch var concluding bool var err error + // aq.batch.Epoch() is the L1 origin of the batch // For caff node, call NextBatch() on EspressoStreamer instead, assign concluding to false for now if aq.isCaffNode { // Sishan TODO: change to this once BatchValidity is ready - // batch, concluding, err = aq.espressoStreamer.NextBatch(ctx, parent) + _, _, _ = aq.espressoStreamer.NextBatch(ctx, parent, l1Finalized, l1BlockRefByNumber) batch, concluding, err = aq.prev.NextBatch(ctx, parent) if err != nil { return nil, err diff --git a/op-node/rollup/derive/check_l1.go b/op-node/rollup/derive/check_l1.go index 52303592802af..e33ed93331220 100644 --- a/op-node/rollup/derive/check_l1.go +++ b/op-node/rollup/derive/check_l1.go @@ -9,6 +9,7 @@ import ( type L1BlockRefByNumber interface { L1BlockRefByNumber(context.Context, uint64) (eth.L1BlockRef, error) + L1FinalizedBlock() (eth.L1BlockRef, error) } // VerifyNewL1Origin checks that the L2 unsafe head still has a L1 origin that is on the canonical chain. diff --git a/op-node/rollup/derive/espresso_streamer.go b/op-node/rollup/derive/espresso_streamer.go index 6d75c2985cf69..4ad0ebf22c764 100644 --- a/op-node/rollup/derive/espresso_streamer.go +++ b/op-node/rollup/derive/espresso_streamer.go @@ -76,8 +76,6 @@ func CheckBatchEspresso(ctx context.Context, cfg *rollup.Config, log log.Logger, // add details to the log log = batch.LogContext(log) - // Sishan TODO: check the L1 origin is already finalized - // Sishan TODO: these checks are copy-pasted from OP's checkSingularBatch(), we should check whether these apply to caff node nextTimestamp := l2SafeHead.Time + cfg.BlockTime if batch.Timestamp > nextTimestamp { @@ -110,12 +108,13 @@ func CheckBatchEspresso(ctx context.Context, cfg *rollup.Config, log log.Logger, return BatchAccept } -func (s *EspressoStreamer) NextBatch(ctx context.Context, parent eth.L2BlockRef) (*SingularBatch, bool, error) { +func (s *EspressoStreamer) NextBatch(ctx context.Context, parent eth.L2BlockRef, l1Finalized func() (eth.L1BlockRef, error), l1BlockRefByNumber func(context.Context, uint64) (eth.L1BlockRef, error)) (*SingularBatch, bool, error) { s.messageMutex.Lock() defer s.messageMutex.Unlock() // Sishan TODO: Find the batch that match the parent block, concluding is assignedto false for now var returnBatch *SingularBatch + // remaining is the list of batches that are not processed yet var remaining []*MessageWithHeight batchLoop: for i, message := range s.messagesWithHeights { @@ -145,6 +144,33 @@ batchLoop: return nil, false, NewCriticalError(fmt.Errorf("unknown batch validity type: %d", validity)) } } + + // check the L1 origin of returnBatch is already finalized + // if not, return NotEnoughData to wait longer + l1FinalizedBlock, err := l1Finalized() + if err != nil { + s.log.Error("failed to get the L1 finalized block", "err", err) + return nil, false, NotEnoughData + } + if returnBatch.Epoch().Number > l1FinalizedBlock.Number { + // we will not change s.messagesWithHeights here, because we want to keep the same lists of batches + s.log.Warn("you need to wait longer for the L1 origin to be finalized", "l1_origin", returnBatch.Epoch().Number) + return nil, false, NotEnoughData + } else { + // make sure it's a valid L1 origin state by check the hash + expectedL1BlockRef, err := l1BlockRefByNumber(ctx, returnBatch.Epoch().Number) + if err != nil { + s.log.Warn("failed to get the L1 block ref by number", "err", err, "l1_origin_number", returnBatch.Epoch().Number) + return nil, false, err + } + if returnBatch.Epoch().Hash != expectedL1BlockRef.Hash { + s.log.Warn("the L1 origin hash is not valid anymore", "l1_origin", returnBatch.Epoch().Hash, "expected", expectedL1BlockRef.Hash) + // drop the batch and wait longer + s.messagesWithHeights = remaining + return nil, false, NotEnoughData + } + } + s.messagesWithHeights = remaining return returnBatch, false, nil } diff --git a/op-node/rollup/derive/pipeline.go b/op-node/rollup/derive/pipeline.go index 5e290bd0c84aa..c33b7e86aa81b 100644 --- a/op-node/rollup/derive/pipeline.go +++ b/op-node/rollup/derive/pipeline.go @@ -33,6 +33,7 @@ type L1Fetcher interface { L1BlockRefByHashFetcher L1ReceiptsFetcher L1TransactionFetcher + L1FinalizedBlock() (eth.L1BlockRef, error) } type ResettableStage interface { @@ -211,7 +212,7 @@ func (dp *DerivationPipeline) Step(ctx context.Context, pendingSafeHead eth.L2Bl dp.origin = newOrigin } - if attrib, err := dp.attrib.NextAttributes(ctx, pendingSafeHead); err == nil { + if attrib, err := dp.attrib.NextAttributes(ctx, pendingSafeHead, dp.l1Fetcher.L1FinalizedBlock, dp.l1Fetcher.L1BlockRefByNumber); err == nil { return attrib, nil } else if err == io.EOF { // If every stage has returned io.EOF, try to advance the L1 Origin diff --git a/op-node/rollup/driver/metered_l1fetcher.go b/op-node/rollup/driver/metered_l1fetcher.go index d3bc4eece7ac1..bc52fc652b2d8 100644 --- a/op-node/rollup/driver/metered_l1fetcher.go +++ b/op-node/rollup/driver/metered_l1fetcher.go @@ -59,6 +59,11 @@ func (m *MeteredL1Fetcher) FetchReceipts(ctx context.Context, blockHash common.H var _ derive.L1Fetcher = (*MeteredL1Fetcher)(nil) +func (m *MeteredL1Fetcher) L1FinalizedBlock() (eth.L1BlockRef, error) { + defer m.recordTime("L1FinalizedBlock")() + return m.inner.L1FinalizedBlock() +} + func (m *MeteredL1Fetcher) recordTime(method string) func() { start := m.now() return func() { diff --git a/op-node/rollup/finalized/finalized.go b/op-node/rollup/finalized/finalized.go index fd10253efd174..f502c41abf14c 100644 --- a/op-node/rollup/finalized/finalized.go +++ b/op-node/rollup/finalized/finalized.go @@ -30,3 +30,8 @@ func (f *finalized) L1BlockRefByNumber(ctx context.Context, num uint64) (eth.L1B } var _ derive.L1Fetcher = (*finalized)(nil) + +func (f *finalized) L1FinalizedBlock() (eth.L1BlockRef, error) { + finalized := f.l1Finalized() + return finalized, nil +} diff --git a/op-node/rollup/types.go b/op-node/rollup/types.go index 6af0758016788..f5a92d9a3c43f 100644 --- a/op-node/rollup/types.go +++ b/op-node/rollup/types.go @@ -158,7 +158,6 @@ type Config struct { // CaffNodeConfig is the config for the Caff Node type CaffNodeConfig struct { IsCaffNode bool - Namespace uint64 NextHotShotBlockNum uint64 PollingHotShotPollingInterval time.Duration HotShotUrls []string diff --git a/op-node/service.go b/op-node/service.go index 945925923da7b..75168975083de 100644 --- a/op-node/service.go +++ b/op-node/service.go @@ -299,7 +299,6 @@ func NewSyncConfig(ctx *cli.Context, log log.Logger) (*sync.Config, error) { func NewCaffNodeConfig(ctx *cli.Context) *rollup.CaffNodeConfig { return &rollup.CaffNodeConfig{ IsCaffNode: ctx.Bool(flags.CaffNodeFlag.Name), - Namespace: ctx.Uint64(flags.CaffNodeNamespace.Name), NextHotShotBlockNum: ctx.Uint64(flags.CaffNodeNextHotShotBlockNum.Name), PollingHotShotPollingInterval: ctx.Duration(flags.CaffNodePollingHotShotPollingInterval.Name), HotShotUrls: ctx.StringSlice(flags.CaffNodeHotShotUrls.Name), diff --git a/op-program/client/l1/client.go b/op-program/client/l1/client.go index af2513131d5a1..64567d91cadde 100644 --- a/op-program/client/l1/client.go +++ b/op-program/client/l1/client.go @@ -81,3 +81,8 @@ func (o *OracleL1Client) InfoAndTxsByHash(ctx context.Context, hash common.Hash) info, txs := o.oracle.TransactionsByBlockHash(hash) return info, txs, nil } + +func (o *OracleL1Client) L1FinalizedBlock() (eth.L1BlockRef, error) { + // Since this is for the fault proof program, we can consider the head block as finalized + return o.L1BlockRefByHash(context.Background(), o.head.Hash) +} diff --git a/op-service/sources/eth_client.go b/op-service/sources/eth_client.go index 5908508e9e134..00c270185bf57 100644 --- a/op-service/sources/eth_client.go +++ b/op-service/sources/eth_client.go @@ -442,3 +442,14 @@ func (s *EthClient) BlockRefByHash(ctx context.Context, hash common.Hash) (eth.B s.blockRefsCache.Add(ref.Hash, ref) return ref, nil } + +func (s *EthClient) FinalizedBlock() (eth.L1BlockRef, error) { + var block *RPCBlock + err := s.client.CallContext(context.Background(), &block, "eth_getBlockByNumber", "finalized", false) + if err != nil { + return eth.L1BlockRef{}, fmt.Errorf("failed to fetch finalized block: %w", err) + } + + num := uint64(block.Number) + return s.BlockRefByNumber(context.Background(), num) +} diff --git a/op-service/sources/l1_client.go b/op-service/sources/l1_client.go index b69429d14a231..2435af0dbdae4 100644 --- a/op-service/sources/l1_client.go +++ b/op-service/sources/l1_client.go @@ -78,3 +78,7 @@ func (s *L1Client) L1BlockRefByNumber(ctx context.Context, num uint64) (eth.L1Bl func (s *L1Client) L1BlockRefByHash(ctx context.Context, hash common.Hash) (eth.L1BlockRef, error) { return s.BlockRefByHash(ctx, hash) } + +func (s *L1Client) L1FinalizedBlock() (eth.L1BlockRef, error) { + return s.FinalizedBlock() +} diff --git a/op-service/testutils/mock_l1.go b/op-service/testutils/mock_l1.go index 14b4fe5f57e4a..6816a7980309c 100644 --- a/op-service/testutils/mock_l1.go +++ b/op-service/testutils/mock_l1.go @@ -37,3 +37,8 @@ func (m *MockL1Source) L1BlockRefByHash(ctx context.Context, hash common.Hash) ( func (m *MockL1Source) ExpectL1BlockRefByHash(hash common.Hash, ref eth.L1BlockRef, err error) { m.Mock.On("L1BlockRefByHash", hash).Once().Return(ref, err) } + +func (m *MockL1Source) L1FinalizedBlock() (eth.L1BlockRef, error) { + out := m.Mock.Called() + return out.Get(0).(eth.L1BlockRef), out.Error(1) +}