diff --git a/op-alt-da/cli.go b/op-alt-da/cli.go index 84364e47952a7..72c6fa5259539 100644 --- a/op-alt-da/cli.go +++ b/op-alt-da/cli.go @@ -102,8 +102,12 @@ func (c CLIConfig) Check() error { return nil } -func (c CLIConfig) NewDAClient() *DAClient { - return &DAClient{url: c.DAServerURL, verify: c.VerifyOnRead, precompute: !c.GenericDA, getTimeout: c.GetTimeout, putTimeout: c.PutTimeout} +func (c CLIConfig) NewDAClient() (*DAClient, error) { + err := c.Check() + if err != nil { + return nil, fmt.Errorf("check daclient CLIConfig: %w", err) + } + return &DAClient{url: c.DAServerURL, verify: c.VerifyOnRead, precompute: !c.GenericDA, getTimeout: c.GetTimeout, putTimeout: c.PutTimeout}, nil } func ReadCLIConfig(c *cli.Context) CLIConfig { diff --git a/op-alt-da/daclient_test.go b/op-alt-da/daclient_test.go index d9f7902aadee1..bee1030c7a5e1 100644 --- a/op-alt-da/daclient_test.go +++ b/op-alt-da/daclient_test.go @@ -27,7 +27,8 @@ func TestDAClientPrecomputed(t *testing.T) { } require.NoError(t, cfg.Check()) - client := cfg.NewDAClient() + client, err := cfg.NewDAClient() + require.NoError(t, err) rng := rand.New(rand.NewSource(1234)) @@ -85,7 +86,8 @@ func TestDAClientService(t *testing.T) { } require.NoError(t, cfg.Check()) - client := cfg.NewDAClient() + client, err := cfg.NewDAClient() + require.NoError(t, err) rng := rand.New(rand.NewSource(1234)) diff --git a/op-alt-da/damgr.go b/op-alt-da/damgr.go index d320c9bfe5bb4..2397d27b7e0cc 100644 --- a/op-alt-da/damgr.go +++ b/op-alt-da/damgr.go @@ -78,8 +78,12 @@ type DA struct { } // NewAltDA creates a new AltDA instance with the given log and CLIConfig. -func NewAltDA(log log.Logger, cli CLIConfig, cfg Config, metrics Metricer) *DA { - return NewAltDAWithStorage(log, cfg, cli.NewDAClient(), metrics) +func NewAltDA(log log.Logger, cli CLIConfig, cfg Config, metrics Metricer) (*DA, error) { + daClient, err := cli.NewDAClient() + if err != nil { + return nil, fmt.Errorf("new DAClient: %w", err) + } + return NewAltDAWithStorage(log, cfg, daClient, metrics), nil } // NewAltDAWithStorage creates a new AltDA instance with the given log and DAStorage interface. diff --git a/op-alt-da/damock.go b/op-alt-da/damock.go index 62ece16611649..2c3a0d286b23c 100644 --- a/op-alt-da/damock.go +++ b/op-alt-da/damock.go @@ -2,7 +2,9 @@ package altda import ( "context" + "encoding/binary" "errors" + "fmt" "io" "net/http" "sync" @@ -16,11 +18,16 @@ import ( ) // MockDAClient mocks a DA storage provider to avoid running an HTTP DA server -// in unit tests. +// in unit tests. MockDAClient is goroutine-safe. type MockDAClient struct { - CommitmentType CommitmentType - store ethdb.KeyValueStore - log log.Logger + mu sync.Mutex + CommitmentType CommitmentType + GenericCommitmentCount uint16 // next generic commitment (use counting commitment instead of hash to help with testing) + store ethdb.KeyValueStore + StoreCount int + log log.Logger + dropEveryNthPut uint // 0 means nothing gets dropped, 1 means every put errors, etc. + setInputRequestCount uint // number of put requests received, irrespective of whether they were successful } func NewMockDAClient(log log.Logger) *MockDAClient { @@ -31,7 +38,30 @@ func NewMockDAClient(log log.Logger) *MockDAClient { } } +// NewCountingGenericCommitmentMockDAClient creates a MockDAClient that uses counting commitments. +// Its commitments are big-endian encoded uint16s of 0, 1, 2, etc. instead of actual hash or altda-layer related commitments. +// Used for testing to make sure we receive commitments in order following Holocene strict ordering rules. +func NewCountingGenericCommitmentMockDAClient(log log.Logger) *MockDAClient { + return &MockDAClient{ + CommitmentType: GenericCommitmentType, + store: memorydb.New(), + log: log, + } +} + +// Fakes a da server that drops/errors on every Nth put request. +// Useful for testing the batcher's error handling. +// 0 means nothing gets dropped, 1 means every put errors, etc. +func (c *MockDAClient) DropEveryNthPut(n uint) { + c.mu.Lock() + defer c.mu.Unlock() + c.dropEveryNthPut = n +} + func (c *MockDAClient) GetInput(ctx context.Context, key CommitmentData) ([]byte, error) { + c.mu.Lock() + defer c.mu.Unlock() + c.log.Debug("Getting input", "key", key) bytes, err := c.store.Get(key.Encode()) if err != nil { return nil, ErrNotFound @@ -40,12 +70,46 @@ func (c *MockDAClient) GetInput(ctx context.Context, key CommitmentData) ([]byte } func (c *MockDAClient) SetInput(ctx context.Context, data []byte) (CommitmentData, error) { - key := NewCommitmentData(c.CommitmentType, data) - return key, c.store.Put(key.Encode(), data) + c.mu.Lock() + defer c.mu.Unlock() + c.setInputRequestCount++ + var key CommitmentData + if c.CommitmentType == GenericCommitmentType { + countCommitment := make([]byte, 2) + binary.BigEndian.PutUint16(countCommitment, c.GenericCommitmentCount) + key = NewGenericCommitment(countCommitment) + } else { + key = NewKeccak256Commitment(data) + } + var action string = "put" + if c.dropEveryNthPut > 0 && c.setInputRequestCount%c.dropEveryNthPut == 0 { + action = "dropped" + } + c.log.Debug("Setting input", "action", action, "key", key, "data", fmt.Sprintf("%x", data)) + if action == "dropped" { + return nil, errors.New("put dropped") + } + err := c.store.Put(key.Encode(), data) + if err == nil { + c.GenericCommitmentCount++ + c.StoreCount++ + } + return key, err } func (c *MockDAClient) DeleteData(key []byte) error { - return c.store.Delete(key) + c.mu.Lock() + defer c.mu.Unlock() + c.log.Debug("Deleting data", "key", key) + // memorydb.Delete() returns nil even when the key doesn't exist, so we need to check if the key exists + // before decrementing StoreCount. + var err error + if _, err = c.store.Get(key); err == nil { + if err = c.store.Delete(key); err == nil { + c.StoreCount-- + } + } + return err } // DAErrFaker is a DA client that can be configured to return errors on GetInput @@ -121,6 +185,12 @@ type FakeDAServer struct { getRequestLatency time.Duration // next failoverCount Put requests will return 503 status code for failover testing failoverCount uint64 + // outOfOrderResponses is a flag that, when set, causes the server to send responses out of order. + // It will only respond to pairs of request, returning the second response first, and waiting 1 second before sending the first response. + // This is used to test the batcher's ability to handle out of order responses, while still ensuring holocene's strict ordering rules. + outOfOrderResponses bool + oooMu sync.Mutex + oooWaitChan chan struct{} } func NewFakeDAServer(host string, port int, log log.Logger) *FakeDAServer { @@ -145,6 +215,21 @@ func (s *FakeDAServer) HandlePut(w http.ResponseWriter, r *http.Request) { s.failoverCount-- return } + if s.outOfOrderResponses { + s.oooMu.Lock() + if s.oooWaitChan == nil { + s.log.Info("Received put request while in out-of-order mode, waiting for next request") + s.oooWaitChan = make(chan struct{}) + s.oooMu.Unlock() + <-s.oooWaitChan + time.Sleep(1 * time.Second) + } else { + s.log.Info("Received second put request in out-of-order mode, responding to this one first, then the first one") + close(s.oooWaitChan) + s.oooWaitChan = nil + s.oooMu.Unlock() + } + } s.DAServer.HandlePut(w, r) } @@ -162,10 +247,12 @@ func (s *FakeDAServer) Start() error { } func (s *FakeDAServer) SetPutRequestLatency(latency time.Duration) { + s.log.Info("Setting put request latency", "latency", latency) s.putRequestLatency = latency } func (s *FakeDAServer) SetGetRequestLatency(latency time.Duration) { + s.log.Info("Setting get request latency", "latency", latency) s.getRequestLatency = latency } @@ -174,6 +261,14 @@ func (s *FakeDAServer) SetPutFailoverForNRequests(n uint64) { s.failoverCount = n } +// When ooo=true, causes the server to send responses out of order. +// It will only respond to pairs of request, returning the second response first, and waiting 1 second before sending the first response. +// This is used to test the batcher's ability to handle out of order responses, while still ensuring holocene's strict ordering rules. +func (s *FakeDAServer) SetOutOfOrderResponses(ooo bool) { + s.log.Info("Setting out of order responses", "ooo", ooo) + s.outOfOrderResponses = ooo +} + type MemStore struct { db map[string][]byte lock sync.RWMutex diff --git a/op-alt-da/damock_test.go b/op-alt-da/damock_test.go new file mode 100644 index 0000000000000..3d651e3bd9193 --- /dev/null +++ b/op-alt-da/damock_test.go @@ -0,0 +1,65 @@ +package altda + +import ( + "net/http/httptest" + "sync" + "testing" + "time" + + "github.com/ethereum-optimism/optimism/op-service/testlog" + "github.com/ethereum/go-ethereum/log" +) + +func TestFakeDAServer_OutOfOrderResponses(t *testing.T) { + logger := testlog.Logger(t, log.LevelDebug) + daServer := NewFakeDAServer("localhost", 0, logger) + daServer.SetOutOfOrderResponses(true) + + // Channel to track completion order + completionOrder := make(chan int, 2) + + // Start two concurrent requests + var wg sync.WaitGroup + wg.Add(2) + + // First request + go func() { + defer wg.Done() + w := httptest.NewRecorder() + r := httptest.NewRequest("PUT", "/data", nil) + + daServer.HandlePut(w, r) + completionOrder <- 1 + }() + + // Small delay to ensure first request starts first + time.Sleep(100 * time.Millisecond) + + // Second request + go func() { + defer wg.Done() + w := httptest.NewRecorder() + r := httptest.NewRequest("PUT", "/data", nil) + + daServer.HandlePut(w, r) + completionOrder <- 2 + }() + + // Wait for both requests to complete + wg.Wait() + close(completionOrder) + + // Check completion order + var order []int + for n := range completionOrder { + order = append(order, n) + } + + // Second request should complete before first + if len(order) != 2 { + t.Fatalf("expected 2 requests to complete, got %d", len(order)) + } + if order[0] != 2 || order[1] != 1 { + t.Errorf("expected completion order [2,1], got %v", order) + } +} diff --git a/op-batcher/batcher/channel.go b/op-batcher/batcher/channel.go index ff3569cba58bb..c84748d49f928 100644 --- a/op-batcher/batcher/channel.go +++ b/op-batcher/batcher/channel.go @@ -3,6 +3,7 @@ package batcher import ( "math" + altda "github.com/ethereum-optimism/optimism/op-alt-da" "github.com/ethereum-optimism/optimism/op-batcher/metrics" "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" @@ -20,7 +21,16 @@ type channel struct { // pending channel builder channelBuilder *ChannelBuilder - // Set of unconfirmed txID -> tx data. For tx resubmission + // Temporary cache for altDACommitments that are received potentially out of order from the da layer. + // Map: first frameNumber in txData -> txData (that contains an altDACommitment) + // Once the txData containing altDAFrameCursor is received, it will be pulled out of the + // channel on the next driver iteration, and sent to L1. + altDACommitments map[uint16]txData + // Points to the next frame number to send to L1 in order to maintain holocene strict ordering rules. + // When altDACommitments[altDAFrameCursor] is non-nil, it will be sent to L1. + altDAFrameCursor uint16 + // Set of unconfirmed txID -> tx data. For tx resubmission. + // Also used for altda for the entirity of the submission (data -> commitment -> tx). pendingTransactions map[string]txData // Set of confirmed txID -> inclusion block. For determining if the channel is timed out confirmedTransactions map[string]eth.BlockID @@ -38,26 +48,50 @@ func newChannel(log log.Logger, metr metrics.Metricer, cfg ChannelConfig, rollup metr: metr, cfg: cfg, channelBuilder: cb, + altDACommitments: make(map[uint16]txData), pendingTransactions: make(map[string]txData), confirmedTransactions: make(map[string]eth.BlockID), minInclusionBlock: math.MaxUint64, } } -// TxFailed records a transaction as failed. It will attempt to resubmit the data -// in the failed transaction. failoverToEthDA should be set to true when using altDA -// and altDA is down. This will switch the channel to submit frames to ethDA instead. -func (c *channel) TxFailed(id string, failoverToEthDA bool) { - if data, ok := c.pendingTransactions[id]; ok { - c.log.Trace("marked transaction as failed", "id", id) - // Rewind to the first frame of the failed tx - // -- the frames are ordered, and we want to send them - // all again. - c.channelBuilder.RewindFrameCursor(data.Frames()[0]) - delete(c.pendingTransactions, id) - } else { - c.log.Warn("unknown transaction marked as failed", "id", id) +// CacheAltDACommitment caches the commitment received from the DA layer for the given txData. +// We cannot submit it directly to L1 yet, as we need to make sure the commitments are submitted in order, +// according to the holocene rules. Therefore, we cache the commitment and let the channelManager +// decide when to pull them out of the channel and send them to L1. +func (c *channel) CacheAltDACommitment(txData txData, commitment altda.CommitmentData) { + if commitment == nil { + panic("expected non-nil commitment") } + if len(txData.frames) == 0 { + panic("expected txData to have frames") + } + txData.altDACommitment = commitment + c.log.Debug("caching altDA commitment", "frame", txData.frames[0].id.frameNumber, "commitment", commitment.String()) + c.altDACommitments[txData.frames[0].id.frameNumber] = txData +} + +func (c *channel) rewindAltDAFrameCursor(txData txData) { + if len(txData.frames) == 0 { + panic("expected txData to have frames") + } + c.altDAFrameCursor = txData.frames[0].id.frameNumber +} + +// AltDASubmissionFailed records an AltDA blob dispersal as having failed. +// It rewinds the channelBuilder's frameCursor to the first frame of the failed txData, +// so that the frames can be resubmitted. failoverToEthDA should be set to true when using altDA +// and altDA is down. This will switch the channel to submit frames to ethDA instead. +// TODO: add a metric for altDA submission failures. +func (c *channel) AltDASubmissionFailed(id string, failoverToEthDA bool) { + // We coopt TxFailed to rewind the frame cursor. + // This will force a resubmit of all the following frames as well, + // even if they had already successfully been submitted and their commitment cached. + // Ideally we'd have another way but for simplicity and to not tangle the altda code + // too much with the non altda code, we reuse the FrameCursor feature. + // TODO: Is there a better abstraction for altda channels? FrameCursors are not well suited + // since frames do not have to be sent in order to the altda, only their commitment does. + c.TxFailed(id) if failoverToEthDA { // We failover to calldata txs because in altda mode the channel and channelManager // are configured to use a calldataConfigManager, as opposed to DynamicEthChannelConfig @@ -68,6 +102,29 @@ func (c *channel) TxFailed(id string, failoverToEthDA bool) { c.cfg.DaType = DaTypeCalldata c.metr.RecordFailoverToEthDA() } +} + +// TxFailed records a transaction as failed. It will attempt to resubmit the data +// in the failed transaction. +func (c *channel) TxFailed(id string) { + if data, ok := c.pendingTransactions[id]; ok { + c.log.Trace("marked transaction as failed", "id", id) + if data.altDACommitment != nil { + // In altDA mode, we don't want to rewind the channelBuilder's frameCursor + // because that will lead to resubmitting the same data to the da layer. + // We simply need to rewind the altDAFrameCursor to the first frame of the failed txData, + // to force a resubmit of the cached altDACommitment. + c.rewindAltDAFrameCursor(data) + } else { + // Rewind to the first frame of the failed tx + // -- the frames are ordered, and we want to send them + // all again. + c.channelBuilder.RewindFrameCursor(data.Frames()[0]) + } + delete(c.pendingTransactions, id) + } else { + c.log.Warn("unknown transaction marked as failed", "id", id) + } c.metr.RecordBatchTxFailed() } @@ -99,7 +156,16 @@ func (c *channel) TxConfirmed(id string, inclusionBlock eth.BlockID) bool { // and then reset this state so it can try to build a new channel. if c.isTimedOut() { c.metr.RecordChannelTimedOut(c.ID()) - c.log.Warn("Channel timed out", "id", c.ID(), "min_inclusion_block", c.minInclusionBlock, "max_inclusion_block", c.maxInclusionBlock) + var chanFirstL2BlockNum, chanLastL2BlockNum uint64 + if c.channelBuilder.blocks.Len() > 0 { + chanFirstL2Block, _ := c.channelBuilder.blocks.Peek() + chanLastL2Block, _ := c.channelBuilder.blocks.PeekN(c.channelBuilder.blocks.Len() - 1) + chanFirstL2BlockNum = chanFirstL2Block.NumberU64() + chanLastL2BlockNum = chanLastL2Block.NumberU64() + } + c.log.Warn("Channel timed out", "id", c.ID(), + "min_l1_inclusion_block", c.minInclusionBlock, "max_l1_inclusion_block", c.maxInclusionBlock, + "first_l2_block", chanFirstL2BlockNum, "last_l2_block", chanLastL2BlockNum) return true } @@ -134,6 +200,28 @@ func (c *channel) ID() derive.ChannelID { return c.channelBuilder.ID() } +// NextAltDACommitment checks if it has already received the altDA commitment +// of the txData whose first frame is altDAFrameCursor. If it has, it returns +// the txData and true. Otherwise, it returns an empty txData and false. +func (c *channel) NextAltDACommitment() (txData, bool) { + if txData, ok := c.altDACommitments[c.altDAFrameCursor]; ok { + if txData.altDACommitment == nil { + panic("expected altDACommitment to be non-nil") + } + if len(txData.frames) == 0 { + panic("expected txData to have frames") + } + // update altDAFrameCursor to the first frame of the next txData + lastFrame := txData.frames[len(txData.frames)-1] + c.altDAFrameCursor = lastFrame.id.frameNumber + 1 + // We also store it in pendingTransactions so that TxFailed can know + // that this tx's altDA commitment was already cached. + c.pendingTransactions[txData.ID().String()] = txData + return txData, true + } + return txData{}, false +} + // NextTxData dequeues the next frames from the channel and returns them encoded in a tx data packet. // If cfg.DaType == DaTypeCalldata, it returns txData with a single frame. // Else when cfg.DaType == DaTypeBlob or DaTypeAltDA, it will read frames from its channel builder diff --git a/op-batcher/batcher/channel_manager.go b/op-batcher/batcher/channel_manager.go index a59af2fb4c9dc..9ed179e7ef09e 100644 --- a/op-batcher/batcher/channel_manager.go +++ b/op-batcher/batcher/channel_manager.go @@ -6,6 +6,7 @@ import ( "io" "math" + altda "github.com/ethereum-optimism/optimism/op-alt-da" "github.com/ethereum-optimism/optimism/op-batcher/metrics" "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" @@ -53,7 +54,7 @@ type channelManager struct { currentChannel *channel // channels to read frame data from, for writing batches onchain channelQueue []*channel - // used to lookup channels by tx ID upon tx success / failure + // used to lookup channels by tx ID upon altda and tx success / failure txChannels map[string]*channel } @@ -94,14 +95,48 @@ func (s *channelManager) pendingBlocks() int { return s.blocks.Len() - s.blockCursor } -// TxFailed records a transaction as failed. It will attempt to resubmit the data -// in the failed transaction. failoverToEthDA should be set to true when using altDA +// CacheAltDACommitment caches the commitment received from the DA layer for the given txData. +// We cannot submit it directly to L1 yet, as we need to make sure the commitments are submitted in order, +// according to the holocene rules. Therefore, we cache them and let the channelManager decide when to submit them. +func (s *channelManager) CacheAltDACommitment(txData txData, commitment altda.CommitmentData) { + if len(txData.frames) == 0 { + panic("no frames in txData") + } + firstFrame, lastFrame := txData.frames[0], txData.frames[len(txData.frames)-1] + if firstFrame.id.chID != lastFrame.id.chID { + // The current implementation caches commitments inside channels, + // so it assumes that a txData only contains frames from a single channel. + // If this ever panics (hopefully in tests...) it shouldn't be too hard to fix. + panic("commitment spans multiple channels") + } + if channel, ok := s.txChannels[txData.ID().String()]; ok { + channel.CacheAltDACommitment(txData, commitment) + } else { + s.log.Warn("Trying to cache altda commitment for txData from unknown channel. Probably some state reset (from reorg?) happened.", "id", txData.ID()) + } +} + +// AltDASubmissionFailed marks a DA submission as having failed to be submitted to the DA layer. +// The frames will be pushed back into the corresponding channel such that they can be pulled again by the +// driver main loop and resent to the DA layer. failoverToEthDA should be set to true when using altDA // and altDA is down. This will switch the channel to submit frames to ethDA instead. -func (s *channelManager) TxFailed(_id txID, failoverToEthDA bool) { +func (s *channelManager) AltDASubmissionFailed(_id txID, failoverToEthDA bool) { + id := _id.String() + if channel, ok := s.txChannels[id]; ok { + delete(s.txChannels, id) + channel.AltDASubmissionFailed(id, failoverToEthDA) + } else { + s.log.Warn("transaction from unknown channel marked as failed", "id", id) + } +} + +// TxFailed records a transaction as failed. It will attempt to resubmit the data +// in the failed transaction. +func (s *channelManager) TxFailed(_id txID) { id := _id.String() if channel, ok := s.txChannels[id]; ok { delete(s.txChannels, id) - channel.TxFailed(id, failoverToEthDA) + channel.TxFailed(id) } else { s.log.Warn("transaction from unknown channel marked as failed", "id", id) } @@ -217,6 +252,20 @@ func (s *channelManager) nextTxData(channel *channel) (txData, error) { return tx, nil } +func (s *channelManager) getNextAltDACommitment() (txData, bool) { + for _, channel := range s.channelQueue { + // if all frames have already been sent to altda, skip this channel + if int(channel.altDAFrameCursor) == channel.channelBuilder.TotalFrames() { + continue + } + if txData, ok := channel.NextAltDACommitment(); ok { + return txData, true + } + break // We need to send the commitments in order, so we can't skip to the next channel + } + return emptyTxData, false +} + // TxData returns the next tx data that should be submitted to L1. // // If the current channel is @@ -227,6 +276,10 @@ func (s *channelManager) nextTxData(channel *channel) (txData, error) { // When switching DA type, the channelManager state will be rebuilt // with a new ChannelConfig. func (s *channelManager) TxData(l1Head eth.BlockID, isPectra bool) (txData, error) { + // if any altda commitment is ready, return it + if txdata, ok := s.getNextAltDACommitment(); ok { + return txdata, nil + } channel, err := s.getReadyChannel(l1Head) if err != nil { return emptyTxData, err @@ -284,7 +337,7 @@ func (s *channelManager) getReadyChannel(l1Head eth.BlockID) (*channel, error) { } dataPending := firstWithTxData != nil - s.log.Debug("Requested tx data", "l1Head", l1Head, "txdata_pending", dataPending, "blocks_pending", s.blocks.Len()) + s.log.Debug("Requested tx data", "l1Head", l1Head, "txdata_pending", dataPending, "blocks_pending", s.pendingBlocks()) // Short circuit if there is pending tx data or the channel manager is closed if dataPending { diff --git a/op-batcher/batcher/channel_manager_test.go b/op-batcher/batcher/channel_manager_test.go index dccd4c3fd265a..16ebe0197c5fd 100644 --- a/op-batcher/batcher/channel_manager_test.go +++ b/op-batcher/batcher/channel_manager_test.go @@ -216,7 +216,7 @@ func ChannelManager_TxResend(t *testing.T, batchType uint) { require.ErrorIs(err, io.EOF) // requeue frame - m.TxFailed(txdata0.ID(), false) + m.TxFailed(txdata0.ID()) txdata1, err := m.TxData(eth.BlockID{}, false) require.NoError(err) diff --git a/op-batcher/batcher/channel_test.go b/op-batcher/batcher/channel_test.go index 8167febfb3afc..19759c40ef8de 100644 --- a/op-batcher/batcher/channel_test.go +++ b/op-batcher/batcher/channel_test.go @@ -305,13 +305,13 @@ func TestChannelTxFailed(t *testing.T) { // Trying to mark an unknown pending transaction as failed // shouldn't modify state - m.TxFailed(zeroFrameTxID(0), false) + m.TxFailed(zeroFrameTxID(0)) require.Equal(t, 0, m.currentChannel.PendingFrames()) require.Equal(t, expectedTxData, m.currentChannel.pendingTransactions[expectedChannelID.String()]) // Now we still have a pending transaction // Let's mark it as failed - m.TxFailed(expectedChannelID, false) + m.TxFailed(expectedChannelID) require.Empty(t, m.currentChannel.pendingTransactions) // There should be a frame in the pending channel now require.Equal(t, 1, m.currentChannel.PendingFrames()) diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index 4ec8c35c8771c..f04353a676c35 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -81,6 +81,10 @@ type RollupClient interface { SyncStatus(ctx context.Context) (*eth.SyncStatus, error) } +type AltDAClient interface { + SetInput(ctx context.Context, data []byte) (altda.CommitmentData, error) +} + // DriverSetup is the collection of input/output interfaces and configuration that the driver operates on. type DriverSetup struct { Log log.Logger @@ -91,7 +95,7 @@ type DriverSetup struct { L1Client L1Client EndpointProvider dial.L2EndpointProvider ChannelConfig ChannelConfigProvider - AltDA *altda.DAClient + AltDA AltDAClient ChannelOutFactory ChannelOutFactory ActiveSeqChanged chan struct{} // optional } @@ -755,6 +759,12 @@ func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[t } l.Metr.RecordLatestL1Block(l1tip) + // In AltDA mode, before pulling data out of the state, we make sure + // that the daGroup has not reached the maximum number of goroutines. + // This is to prevent blocking the main event loop when submitting the data to the DA Provider. + if l.Config.UseAltDA && !daGroup.TryGo(func() error { return nil }) { + return io.EOF + } // Collect next transaction data. This pulls data out of the channel, so we need to make sure // to put it back if ever da or txmgr requests fail, by calling l.recordFailedDARequest/recordFailedTx. l.channelMgrMutex.Lock() @@ -814,11 +824,16 @@ func (l *BatchSubmitter) cancelBlockingTx(queue *txmgr.Queue[txRef], receiptsCh l.sendTx(txData{}, true, candidate, queue, receiptsCh) } -// publishToAltDAAndL1 posts the txdata to the DA Provider and then sends the commitment to L1. -func (l *BatchSubmitter) publishToAltDAAndL1(txdata txData, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef], daGroup *errgroup.Group) { +// publishToAltDAAndStoreCommitment posts the txdata to the DA Provider and stores the returned commitment +// in the channelMgr. The commitment will later be sent to the L1 while making sure to follow holocene's strict ordering rules. +func (l *BatchSubmitter) publishToAltDAAndStoreCommitment(txdata txData, daGroup *errgroup.Group) { + if txdata.daType != DaTypeAltDA { + l.Log.Crit("publishToAltDAAndStoreCommitment called with non-AltDA txdata") + } + // when posting txdata to an external DA Provider, we use a goroutine to avoid blocking the main loop // since it may take a while for the request to return. - goroutineSpawned := daGroup.TryGo(func() error { + daGroup.Go(func() error { // TODO: probably shouldn't be using the global shutdownCtx here, see https://go.dev/blog/context-and-structs // but sendTransaction receives l.killCtx as an argument, which currently is only canceled after waiting for the main loop // to exit, which would wait on this DA call to finish, which would take a long time. @@ -837,17 +852,12 @@ func (l *BatchSubmitter) publishToAltDAAndL1(txdata txData, queue *txmgr.Queue[t } return nil } - l.Log.Info("Set altda input", "commitment", comm, "tx", txdata.ID()) - candidate := l.calldataTxCandidate(comm.TxData()) - l.sendTx(txdata, false, candidate, queue, receiptsCh) + l.Log.Info("Sent txdata to altda layer and received commitment", "commitment", comm, "tx", txdata.ID()) + l.channelMgrMutex.Lock() + l.channelMgr.CacheAltDACommitment(txdata, comm) + l.channelMgrMutex.Unlock() return nil }) - if !goroutineSpawned { - // We couldn't start the goroutine because the errgroup.Group limit - // is already reached. Since we can't send the txdata, we have to - // return it for later processing. We use nil error to skip error logging. - l.recordFailedDARequest(txdata.ID(), nil) - } } // sendTransaction creates & queues for sending a transaction to the batch inbox address with the given `txData`. @@ -861,10 +871,20 @@ func (l *BatchSubmitter) sendTransaction(txdata txData, queue *txmgr.Queue[txRef if !l.Config.UseAltDA { l.Log.Crit("Received AltDA type txdata without AltDA being enabled") } - // if Alt DA is enabled we post the txdata to the DA Provider and replace it with the commitment. - l.publishToAltDAAndL1(txdata, queue, receiptsCh, daGroup) - // we return nil to allow publishStateToL1 to keep processing the next txdata - return nil + if txdata.altDACommitment == nil { + // This means the txdata was not sent to the DA Provider yet. + // This will send the txdata to the DA Provider and store the commitment in the channelMgr. + // Next time this txdata is requested, we will have the commitment and can send it to the L1 (else branch below). + l.publishToAltDAAndStoreCommitment(txdata, daGroup) + // We return here because publishToAltDA is an async operation; the commitment + // is not yet ready to be submitted to the L1. + return nil + } + // This means the txdata was already sent to the DA Provider and we have the commitment + // so we can send the commitment to the L1 + l.Log.Info("Sending altda commitment to L1", "commitment", txdata.altDACommitment, "tx", txdata.ID()) + candidate = l.calldataTxCandidate(txdata.altDACommitment.TxData()) + case DaTypeBlob: if candidate, err = l.blobTxCandidate(txdata); err != nil { // We could potentially fall through and try a calldata tx instead, but this would @@ -882,7 +902,9 @@ func (l *BatchSubmitter) sendTransaction(txdata txData, queue *txmgr.Queue[txRef default: l.Log.Crit("Unknown DA type", "da_type", txdata.daType) } - + if candidate == nil { + l.Log.Crit("txcandidate should have been set by one of the three branches above.") + } l.sendTx(txdata, false, candidate, queue, receiptsCh) return nil } @@ -950,14 +972,14 @@ func (l *BatchSubmitter) recordFailedDARequest(id txID, err error) { if err != nil { l.Log.Warn("DA request failed", append([]interface{}{"failoverToEthDA", failover}, logFields(id, err)...)...) } - l.channelMgr.TxFailed(id, failover) + l.channelMgr.AltDASubmissionFailed(id, failover) } func (l *BatchSubmitter) recordFailedTx(id txID, err error) { l.channelMgrMutex.Lock() defer l.channelMgrMutex.Unlock() l.Log.Warn("Transaction failed to send", logFields(id, err)...) - l.channelMgr.TxFailed(id, false) + l.channelMgr.TxFailed(id) } func (l *BatchSubmitter) recordConfirmedTx(id txID, receipt *types.Receipt) { diff --git a/op-batcher/batcher/driver_test.go b/op-batcher/batcher/driver_test.go index d2e7621858c00..b3b7e42968c5e 100644 --- a/op-batcher/batcher/driver_test.go +++ b/op-batcher/batcher/driver_test.go @@ -3,15 +3,23 @@ package batcher import ( "context" "errors" + "math/big" "sync" "testing" + "time" + altda "github.com/ethereum-optimism/optimism/op-alt-da" + "github.com/ethereum-optimism/optimism/op-batcher/compressor" "github.com/ethereum-optimism/optimism/op-batcher/metrics" + "github.com/ethereum-optimism/optimism/op-node/rollup" + "github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-service/dial" "github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/testlog" "github.com/ethereum-optimism/optimism/op-service/testutils" "github.com/ethereum-optimism/optimism/op-service/txmgr" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" "github.com/stretchr/testify/require" ) @@ -116,6 +124,8 @@ func TestBatchSubmitter_SafeL1Origin_FailsToResolveRollupClient(t *testing.T) { ep.rollupClientErr = errors.New("failed to resolve rollup client") _, err := bs.safeL1Origin(context.Background()) + log := testlog.Logger(t, log.LevelDebug) + log.Debug("Err", err) require.Error(t, err) } @@ -160,3 +170,219 @@ func TestBatchSubmitter_sendTx_FloorDataGas(t *testing.T) { expectedFloorDataGas := uint64(21_000 + 12*10) require.GreaterOrEqual(t, candidateOut.GasLimit, expectedFloorDataGas) } + +// ======= ALTDA TESTS ======= + +// fakeL1Client is just a dummy struct. All fault injection is done via the fakeTxMgr (which doesn't interact with this fakeL1Client). +type fakeL1Client struct { +} + +func (f *fakeL1Client) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { + if number == nil { + number = big.NewInt(0) + } + return &types.Header{ + Number: number, + ParentHash: common.Hash{}, + Time: 0, + }, nil +} +func (f *fakeL1Client) NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) { + return 0, nil +} + +func altDASetup(_ *testing.T, log log.Logger) (*BatchSubmitter, *mockL2EndpointProvider, *altda.MockDAClient, *testutils.FakeTxMgr) { + ep := newEndpointProvider() + + rollupCfg := &rollup.Config{ + Genesis: rollup.Genesis{L2: eth.BlockID{Number: 0}, L1: eth.BlockID{Number: genesisL1Origin}}, + L2ChainID: big.NewInt(1234), + } + batcherCfg := BatcherConfig{ + PollInterval: 10 * time.Millisecond, + UseAltDA: true, + } + + fakeTxMgr := testutils.NewFakeTxMgr(log.With("subsystem", "fake-txmgr"), common.Address{0}) + l1Client := &fakeL1Client{} + + channelCfg := ChannelConfig{ + // SeqWindowSize: 15, + // SubSafetyMargin: 4, + ChannelTimeout: 10, + MaxFrameSize: 150, // so that each channel has exactly 1 frame + TargetNumFrames: 1, + BatchType: derive.SingularBatchType, + CompressorConfig: compressor.Config{ + Kind: compressor.NoneKind, + }, + DaType: DaTypeAltDA, + } + mockAltDAClient := altda.NewCountingGenericCommitmentMockDAClient(log.With("subsystem", "da-client")) + return NewBatchSubmitter(DriverSetup{ + Log: log, + Metr: metrics.NoopMetrics, + RollupConfig: rollupCfg, + ChannelConfig: channelCfg, + Config: batcherCfg, + EndpointProvider: ep, + Txmgr: fakeTxMgr, + L1Client: l1Client, + AltDA: mockAltDAClient, + }), ep, mockAltDAClient, fakeTxMgr +} + +func fakeSyncStatus(unsafeL2BlockNum uint64, L1BlockRef eth.L1BlockRef) *eth.SyncStatus { + return ð.SyncStatus{ + UnsafeL2: eth.L2BlockRef{ + Number: unsafeL2BlockNum, + L1Origin: eth.BlockID{ + Number: 0, + }, + }, + SafeL2: eth.L2BlockRef{ + Number: 0, + L1Origin: eth.BlockID{ + Number: 0, + }, + }, + HeadL1: L1BlockRef, + } +} + +// There are 4 failure cases (unhappy paths) that the op-batcher has to deal with. +// They are outlined in https://github.com/ethereum-optimism/optimism/tree/develop/op-batcher#happy-path +// This test suite covers these 4 cases in the context of AltDA. +func TestBatchSubmitter_AltDA_FailureCase1_L2Reorg(t *testing.T) { + t.Parallel() + log := testlog.Logger(t, log.LevelDebug) + bs, ep, mockAltDAClient, fakeTxMgr := altDASetup(t, log) + + L1Block0 := types.NewBlock(&types.Header{ + Number: big.NewInt(0), + }, nil, nil, nil, types.DefaultBlockConfig) + L1Block0Ref := eth.L1BlockRef{ + Hash: L1Block0.Hash(), + Number: L1Block0.NumberU64(), + } + // We return incremental syncStatuses to force the op-batcher to entirely process each L2 block one by one. + // To test multi channel behavior, we could return a sync status that is multiple blocks ahead of the current L2 block. + ep.rollupClient.Mock.On("SyncStatus").Times(10).Return(fakeSyncStatus(1, L1Block0Ref), nil) + ep.rollupClient.Mock.On("SyncStatus").Times(10).Return(fakeSyncStatus(2, L1Block0Ref), nil) + ep.rollupClient.Mock.On("SyncStatus").Times(10).Return(fakeSyncStatus(3, L1Block0Ref), nil) + ep.rollupClient.Mock.On("SyncStatus").Times(10).Return(fakeSyncStatus(1, L1Block0Ref), nil) + ep.rollupClient.Mock.On("SyncStatus").Times(10).Return(fakeSyncStatus(2, L1Block0Ref), nil) + ep.rollupClient.Mock.On("SyncStatus").Return(fakeSyncStatus(3, L1Block0Ref), nil) + + L2Block0 := newMiniL2BlockWithNumberParent(1, big.NewInt(0), common.HexToHash("0x0")) + L2Block1 := newMiniL2BlockWithNumberParent(1, big.NewInt(1), L2Block0.Hash()) + L2Block2 := newMiniL2BlockWithNumberParent(1, big.NewInt(2), L2Block1.Hash()) + L2Block2Prime := newMiniL2BlockWithNumberParentAndL1Information(1, big.NewInt(2), L2Block1.Hash(), 101, 0) + L2Block3Prime := newMiniL2BlockWithNumberParent(1, big.NewInt(3), L2Block2Prime.Hash()) + + // L2block0 is the genesis block which is considered safe, so never loaded into the state. + ep.ethClient.Mock.On("BlockByNumber", big.NewInt(1)).Twice().Return(L2Block1, nil) + ep.ethClient.Mock.On("BlockByNumber", big.NewInt(2)).Once().Return(L2Block2, nil) + ep.ethClient.Mock.On("BlockByNumber", big.NewInt(2)).Once().Return(L2Block2Prime, nil) + ep.ethClient.Mock.On("BlockByNumber", big.NewInt(3)).Twice().Return(L2Block3Prime, nil) + + err := bs.StartBatchSubmitting() + require.NoError(t, err) + time.Sleep(1 * time.Second) // 1 second is enough to process all blocks at 10ms poll interval + err = bs.StopBatchSubmitting(context.Background()) + require.NoError(t, err) + + // After the reorg, block 1 needs to be reprocessed, hence why we see 5 store calls: 1, 2, 1, 2', 3' + require.Equal(t, 5, mockAltDAClient.StoreCount) + require.Equal(t, uint64(5), fakeTxMgr.Nonce) + +} + +func TestBatchSubmitter_AltDA_FailureCase2_FailedL1Tx(t *testing.T) { + t.Parallel() + log := testlog.Logger(t, log.LevelDebug) + bs, ep, mockAltDAClient, fakeTxMgr := altDASetup(t, log) + + L1Block0 := types.NewBlock(&types.Header{ + Number: big.NewInt(0), + }, nil, nil, nil, types.DefaultBlockConfig) + L1Block0Ref := eth.L1BlockRef{ + Hash: L1Block0.Hash(), + Number: L1Block0.NumberU64(), + } + // We return incremental syncStatuses to force the op-batcher to entirely process each L2 block one by one. + // To test multi channel behavior, we could return a sync status that is multiple blocks ahead of the current L2 block. + ep.rollupClient.Mock.On("SyncStatus").Times(10).Return(fakeSyncStatus(1, L1Block0Ref), nil) + ep.rollupClient.Mock.On("SyncStatus").Times(10).Return(fakeSyncStatus(2, L1Block0Ref), nil) + ep.rollupClient.Mock.On("SyncStatus").Times(10).Return(fakeSyncStatus(3, L1Block0Ref), nil) + ep.rollupClient.Mock.On("SyncStatus").Return(fakeSyncStatus(4, L1Block0Ref), nil) + + L2Block0 := newMiniL2BlockWithNumberParent(1, big.NewInt(0), common.HexToHash("0x0")) + L2Block1 := newMiniL2BlockWithNumberParent(1, big.NewInt(1), L2Block0.Hash()) + L2Block2 := newMiniL2BlockWithNumberParent(1, big.NewInt(2), L2Block1.Hash()) + L2Block3 := newMiniL2BlockWithNumberParent(1, big.NewInt(3), L2Block2.Hash()) + L2Block4 := newMiniL2BlockWithNumberParent(1, big.NewInt(4), L2Block3.Hash()) + + // L2block0 is the genesis block which is considered safe, so never loaded into the state. + ep.ethClient.Mock.On("BlockByNumber", big.NewInt(1)).Once().Return(L2Block1, nil) + ep.ethClient.Mock.On("BlockByNumber", big.NewInt(2)).Once().Return(L2Block2, nil) + ep.ethClient.Mock.On("BlockByNumber", big.NewInt(3)).Once().Return(L2Block3, nil) + ep.ethClient.Mock.On("BlockByNumber", big.NewInt(4)).Once().Return(L2Block4, nil) + + fakeTxMgr.ErrorEveryNthSend(2) + err := bs.StartBatchSubmitting() + require.NoError(t, err) + time.Sleep(1 * time.Second) // 1 second is enough to process all blocks at 10ms poll interval + err = bs.StopBatchSubmitting(context.Background()) + require.NoError(t, err) + + require.Equal(t, 4, mockAltDAClient.StoreCount) + // TODO: we should prob also check that the commitments are in order? + require.Equal(t, uint64(4), fakeTxMgr.Nonce) +} + +func TestBatchSubmitter_AltDA_FailureCase3_ChannelTimeout(t *testing.T) { + // This function is not implemented because the batcher channel logic makes it very difficult to inject faults. + // A version of this test was implemented here: https://github.com/Layr-Labs/optimism/blob/4b79c981a13bf096ae2984634d976956fbbfddff/op-batcher/batcher/driver_test.go#L300 + // However we opted to not merge it into the main branch because it has an external dependency on the https://github.com/pingcap/failpoint package, + // and requires a lot of custom test setup and failpoint code injection into the batcher's codebase. + // See https://github.com/ethereum-optimism/optimism/commit/4b79c981a13bf096ae2984634d976956fbbfddff for the full implementation. +} + +func TestBatchSubmitter_AltDA_FailureCase4_FailedBlobSubmission(t *testing.T) { + t.Parallel() + log := testlog.Logger(t, log.LevelDebug) + bs, ep, mockAltDAClient, fakeTxMgr := altDASetup(t, log) + + L1Block0 := types.NewBlock(&types.Header{ + Number: big.NewInt(0), + }, nil, nil, nil, types.DefaultBlockConfig) + L1Block0Ref := eth.L1BlockRef{ + Hash: L1Block0.Hash(), + Number: L1Block0.NumberU64(), + } + ep.rollupClient.Mock.On("SyncStatus").Return(fakeSyncStatus(4, L1Block0Ref), nil) + + L2Block0 := newMiniL2BlockWithNumberParent(1, big.NewInt(0), common.HexToHash("0x0")) + L2Block1 := newMiniL2BlockWithNumberParent(1, big.NewInt(1), L2Block0.Hash()) + L2Block2 := newMiniL2BlockWithNumberParent(1, big.NewInt(2), L2Block1.Hash()) + L2Block3 := newMiniL2BlockWithNumberParent(1, big.NewInt(3), L2Block2.Hash()) + L2Block4 := newMiniL2BlockWithNumberParent(1, big.NewInt(4), L2Block3.Hash()) + + // L2block0 is the genesis block which is considered safe, so never loaded into the state. + ep.ethClient.Mock.On("BlockByNumber", big.NewInt(1)).Once().Return(L2Block1, nil) + ep.ethClient.Mock.On("BlockByNumber", big.NewInt(2)).Once().Return(L2Block2, nil) + ep.ethClient.Mock.On("BlockByNumber", big.NewInt(3)).Once().Return(L2Block3, nil) + ep.ethClient.Mock.On("BlockByNumber", big.NewInt(4)).Once().Return(L2Block4, nil) + + mockAltDAClient.DropEveryNthPut(2) + + err := bs.StartBatchSubmitting() + require.NoError(t, err) + time.Sleep(1 * time.Second) // 1 second is enough to process all blocks at 10ms poll interval + err = bs.StopBatchSubmitting(context.Background()) + require.NoError(t, err) + + require.Equal(t, 4, mockAltDAClient.StoreCount) + require.Equal(t, uint64(4), fakeTxMgr.Nonce) +} diff --git a/op-batcher/batcher/service.go b/op-batcher/batcher/service.go index ebdbf4290c093..35250e102b616 100644 --- a/op-batcher/batcher/service.go +++ b/op-batcher/batcher/service.go @@ -409,10 +409,11 @@ func (bs *BatcherService) initRPCServer(cfg *CLIConfig) error { func (bs *BatcherService) initAltDA(cfg *CLIConfig) error { config := cfg.AltDA - if err := config.Check(); err != nil { + daClient, err := config.NewDAClient() + if err != nil { return err } - bs.AltDA = config.NewDAClient() + bs.AltDA = daClient bs.UseAltDA = config.Enabled return nil } diff --git a/op-batcher/batcher/tx_data.go b/op-batcher/batcher/tx_data.go index da484cd02ef64..ead74374ea7b1 100644 --- a/op-batcher/batcher/tx_data.go +++ b/op-batcher/batcher/tx_data.go @@ -4,6 +4,7 @@ import ( "fmt" "strings" + altda "github.com/ethereum-optimism/optimism/op-alt-da" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive/params" "github.com/ethereum-optimism/optimism/op-service/eth" @@ -43,6 +44,10 @@ type txData struct { frames []frameData // daType represents the DA type which the frames data will be submitted to. daType DaType + // altDACommitment is non-nil when the frames have been sent to the alt-da server, + // and the received commitment needs to be sent to the L1. + // Should only be present when daType is DaTypeAltDA. + altDACommitment altda.CommitmentData } func singleFrameTxData(frame frameData) txData { diff --git a/op-batcher/readme.md b/op-batcher/readme.md index bce2f84741af5..635396b05116d 100644 --- a/op-batcher/readme.md +++ b/op-batcher/readme.md @@ -49,7 +49,7 @@ The `publishingLoop` which 1. Waits for a signal from the `blockLoadingLoop` 2. Enqueues a new channel, if necessary. 3. Processes some unprocessed blocks into the current channel, triggers the compression of the block data and the creation of frames. -4. Sends frames from the channel queue to the DA layer as (e.g. to Ethereum L1 as calldata or blob transactions). +4. Sends frames from the channel queue to the DA layer (e.g. to Ethereum L1 as calldata or blob transactions). 5. If there is more transaction data to send, go to 2. Else go to 1. The `receiptsLoop` which @@ -97,18 +97,26 @@ architecture-beta The `blockCursor` state variable tracks the next unprocessed block. In each channel, the `frameCursor` tracks the next unsent frame. -### Reorgs +### Failure Cases -When an L2 unsafe reorg is detected, the batch submitter will reset its state, and wait for any in flight transactions to be ingested by the verifier nodes before starting work again. +#### Reorgs -### Tx Failed +When an L2 reorg (safe or unsafe) is detected, the batch submitter will reset its state, and wait for any in flight transactions to be ingested by the verifier nodes before starting work again. + +#### Tx Failed When a Tx fails, an asynchronous receipts handler is triggered. The channel from whence the Tx's frames came has its `frameCursor` rewound, so that all the frames can be resubmitted in order. -### Channel Times Out +> Note: there is an issue with this simple logic. See https://github.com/ethereum-optimism/optimism/issues/13283 + +#### Channel Times Out When a Tx is confirmed, an asynchronous receipts handler is triggered. We only update the batcher's state if the channel timed out on chain. In that case, the `blockCursor` is rewound to the first block added to that channel, and the channel queue is cleared out. This allows the batcher to start fresh building a new channel starting from the same block -- it does not need to refetch blocks from the sequencer. +#### AltDA Submission Fails + +When an AltDA submission fails, the frames get pushed back into their respective channel, and will be retried in the next tick. If the da-server returns a 503 HTTP error, then failover to ethDA-calldata is triggered for that specific channel. Each channel will independently always first try to submit to EigenDA. + ## Design Principles and Optimization Targets At the current time, the batcher should be optimized for correctness, simplicity and robustness. It is considered preferable to prioritize these properties, even at the expense of other potentially desirable properties such as frugality. For example, it is preferable to have the batcher resubmit some data from time to time ("wasting" money on data availability costs) instead of avoiding that by e.g. adding some persistent state to the batcher. diff --git a/op-chain-ops/genesis/config.go b/op-chain-ops/genesis/config.go index 490e9155b6908..0ee18bbf958df 100644 --- a/op-chain-ops/genesis/config.go +++ b/op-chain-ops/genesis/config.go @@ -1176,7 +1176,7 @@ func (d *L1Deployments) Check(deployConfig *DeployConfig) error { (name == "OptimismPortal" || name == "L2OutputOracle" || name == "L2OutputOracleProxy") { continue } - if !deployConfig.UseAltDA && + if (!deployConfig.UseAltDA || deployConfig.DACommitmentType == altda.GenericCommitmentString) && (name == "DataAvailabilityChallenge" || name == "DataAvailabilityChallengeProxy") { continue diff --git a/op-e2e/config/init.go b/op-e2e/config/init.go index 148e0625195fb..561dfbdcb4ac7 100644 --- a/op-e2e/config/init.go +++ b/op-e2e/config/init.go @@ -49,10 +49,11 @@ const ( type AllocType string const ( - AllocTypeStandard AllocType = "standard" - AllocTypeAltDA AllocType = "alt-da" - AllocTypeL2OO AllocType = "l2oo" - AllocTypeMTCannon AllocType = "mt-cannon" + AllocTypeStandard AllocType = "standard" + AllocTypeAltDA AllocType = "alt-da" + AllocTypeAltDAGeneric AllocType = "alt-da-generic" + AllocTypeL2OO AllocType = "l2oo" + AllocTypeMTCannon AllocType = "mt-cannon" DefaultAllocType = AllocTypeStandard ) @@ -66,14 +67,14 @@ func (a AllocType) Check() error { func (a AllocType) UsesProofs() bool { switch a { - case AllocTypeStandard, AllocTypeMTCannon, AllocTypeAltDA: + case AllocTypeStandard, AllocTypeMTCannon, AllocTypeAltDA, AllocTypeAltDAGeneric: return true default: return false } } -var allocTypes = []AllocType{AllocTypeStandard, AllocTypeAltDA, AllocTypeL2OO, AllocTypeMTCannon} +var allocTypes = []AllocType{AllocTypeStandard, AllocTypeAltDA, AllocTypeAltDAGeneric, AllocTypeL2OO, AllocTypeMTCannon} var ( // All of the following variables are set in the init function diff --git a/op-e2e/system/altda/concurrent_test.go b/op-e2e/system/altda/concurrent_test.go index 19c0a0103bb4d..45918db78fd33 100644 --- a/op-e2e/system/altda/concurrent_test.go +++ b/op-e2e/system/altda/concurrent_test.go @@ -7,20 +7,22 @@ import ( "time" op_e2e "github.com/ethereum-optimism/optimism/op-e2e" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum-optimism/optimism/op-batcher/flags" + "github.com/ethereum-optimism/optimism/op-e2e/config" "github.com/ethereum-optimism/optimism/op-e2e/e2eutils/geth" "github.com/ethereum-optimism/optimism/op-e2e/e2eutils/transactions" "github.com/ethereum-optimism/optimism/op-e2e/system/e2esys" - "github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/stretchr/testify/require" ) +// TestBatcherConcurrentAltDARequests tests that the batcher can submit parallel requests +// to the alt-da server. It does not check that the requests are correctly ordered and interpreted +// by op nodes. func TestBatcherConcurrentAltDARequests(t *testing.T) { op_e2e.InitParallel(t) - numL1TxsExpected := int64(10) - cfg := e2esys.DefaultSystemConfig(t) // Manually configure these since the alt-DA values aren't // set at all in the standard config unless UseAltDA is set. @@ -32,11 +34,9 @@ func TestBatcherConcurrentAltDARequests(t *testing.T) { cfg.DeployConfig.DABondSize = 1000000 cfg.DeployConfig.DAResolverRefundPercentage = 0 cfg.BatcherMaxPendingTransactions = 0 // no limit on parallel txs - // ensures that batcher txs are as small as possible - cfg.BatcherMaxL1TxSizeBytes = derive.FrameV0OverHeadSize + 1 /*version bytes*/ + 1 cfg.BatcherBatchType = 0 cfg.DataAvailabilityType = flags.CalldataType - cfg.BatcherMaxConcurrentDARequest = uint64(numL1TxsExpected) + cfg.BatcherMaxConcurrentDARequest = 2 // disable batcher because we start it manually below cfg.DisableBatcher = true @@ -46,14 +46,15 @@ func TestBatcherConcurrentAltDARequests(t *testing.T) { sys.Close() }) - // make every request take 5 seconds, such that only concurrent requests will be able to make progress fast enough + // make every request take 5 seconds, such that only if 2 altda requests are made + // concurrently will 2 batcher txs be able to land in a single L1 block sys.FakeAltDAServer.SetPutRequestLatency(5 * time.Second) l1Client := sys.NodeClient("l1") l2Seq := sys.NodeClient("sequencer") - // we wait for numL1TxsExpected L2 blocks to have been produced, just to make sure the sequencer is working properly - _, err = geth.WaitForBlock(big.NewInt(numL1TxsExpected), l2Seq) + // we wait for 10 L2 blocks to have been produced, just to make sure the sequencer is working properly + _, err = geth.WaitForBlock(big.NewInt(10), l2Seq) require.NoError(t, err, "Waiting for L2 blocks") ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() @@ -65,8 +66,7 @@ func TestBatcherConcurrentAltDARequests(t *testing.T) { err = driver.StartBatchSubmitting() require.NoError(t, err) - // Iterate over up to 10 blocks. The number of transactions sent by the batcher should - // exceed the number of blocks. + // We make sure that some block has more than 1 batcher tx checkBlocks := 10 for i := 0; i < checkBlocks; i++ { block, err := geth.WaitForBlock(big.NewInt(int64(startingL1BlockNum)+int64(i)), l1Client) @@ -82,3 +82,58 @@ func TestBatcherConcurrentAltDARequests(t *testing.T) { t.Fatalf("did not find more than 1 batcher tx per block in %d blocks", checkBlocks) } + +// The Holocene fork enforced a new strict batch ordering rule, see https://specs.optimism.io/protocol/holocene/derivation.html +// This test makes sure that concurrent requests to the alt-da server that are responded out of order +// are submitted to the L1 chain in the correct order by the batcher. +func TestBatcherCanHandleOutOfOrderDAServerResponses(t *testing.T) { + op_e2e.InitParallel(t) + // Not sure whether WithAllocType is needed here, as the tests pass even without them + // (see mslipper's comments for the TestBatcherConcurrentAltDARequests test above)) + // TODO: understand how the DeployConfigs are related to the AllocTypes + // I asked here https://discord.com/channels/1244729134312198194/1332175015180767265/1332456541067935834 but have yet to get an answer. + cfg := e2esys.HoloceneSystemConfig(t, new(hexutil.Uint64), e2esys.WithAllocType(config.AllocTypeAltDAGeneric)) + cfg.DeployConfig.UseAltDA = true + cfg.DeployConfig.DACommitmentType = "GenericCommitment" + // TODO: figure out why the below are needed even in GenericCommitment mode which doesn't use the DAChallenge Contract + cfg.DeployConfig.DAChallengeWindow = 16 + cfg.DeployConfig.DAResolveWindow = 16 + cfg.DeployConfig.DABondSize = 1000000 + cfg.DeployConfig.DAResolverRefundPercentage = 0 + cfg.BatcherMaxPendingTransactions = 0 // no limit on parallel txs + cfg.BatcherBatchType = 0 + cfg.DataAvailabilityType = flags.CalldataType + cfg.BatcherMaxConcurrentDARequest = 2 + cfg.BatcherMaxL1TxSizeBytes = 150 // enough to fit a single compressed empty L1 block, but not 2 + cfg.Nodes["sequencer"].SafeDBPath = t.TempDir() // needed for SafeHeadAtL1Block() below + + sys, err := cfg.Start(t) + require.NoError(t, err, "Error starting up system") + t.Cleanup(func() { + sys.Close() + }) + sys.FakeAltDAServer.SetOutOfOrderResponses(true) + + l1Client := sys.NodeClient("l1") + l2SeqCL := sys.RollupClient("sequencer") + + checkBlocksL1 := int64(15) + l2SafeHeadMovedCount := 0 + l2SafeHeadMovedCountExpected := 3 + l2SafeHeadCur := uint64(0) + for i := int64(0); i < checkBlocksL1; i++ { + _, err := geth.WaitForBlock(big.NewInt(i), l1Client, geth.WithNoChangeTimeout(5*time.Minute)) + require.NoError(t, err, "Waiting for l1 blocks") + newL2SafeHead, err := l2SeqCL.SafeHeadAtL1Block(context.Background(), uint64(i)) + require.NoError(t, err) + if newL2SafeHead.SafeHead.Number > l2SafeHeadCur { + l2SafeHeadMovedCount++ + l2SafeHeadCur = newL2SafeHead.SafeHead.Number + } + if l2SafeHeadMovedCount == l2SafeHeadMovedCountExpected { + return + } + } + t.Fatalf("L2SafeHead only advanced %d times (expected >= %d) in %d L1 blocks", l2SafeHeadMovedCount, l2SafeHeadMovedCountExpected, checkBlocksL1) + +} diff --git a/op-e2e/system/e2esys/setup.go b/op-e2e/system/e2esys/setup.go index 2dff3057f125e..6cc28c6e9b640 100644 --- a/op-e2e/system/e2esys/setup.go +++ b/op-e2e/system/e2esys/setup.go @@ -857,6 +857,24 @@ func (cfg SystemConfig) Start(t *testing.T, startOpts ...StartOption) (*System, } } + // The altDACLIConfig is shared by the batcher and rollup nodes. + var altDACLIConfig altda.CLIConfig + if cfg.DeployConfig.UseAltDA { + fakeAltDAServer := altda.NewFakeDAServer("127.0.0.1", 0, sys.Cfg.Loggers["da-server"]) + if err := fakeAltDAServer.Start(); err != nil { + return nil, fmt.Errorf("failed to start fake altDA server: %w", err) + } + sys.FakeAltDAServer = fakeAltDAServer + + altDACLIConfig = altda.CLIConfig{ + Enabled: cfg.DeployConfig.UseAltDA, + DAServerURL: fakeAltDAServer.HttpEndpoint(), + VerifyOnRead: true, + GenericDA: true, + MaxConcurrentRequests: cfg.BatcherMaxConcurrentDARequest, + } + } + // Rollup nodes // Ensure we are looping through the nodes in alphabetical order @@ -871,7 +889,7 @@ func (cfg SystemConfig) Start(t *testing.T, startOpts ...StartOption) (*System, if err := c.LoadPersisted(cfg.Loggers[name]); err != nil { return nil, err } - + c.AltDA = altDACLIConfig if p, ok := p2pNodes[name]; ok { c.P2P = p @@ -973,22 +991,6 @@ func (cfg SystemConfig) Start(t *testing.T, startOpts ...StartOption) (*System, batcherTargetNumFrames = 1 } - var batcherAltDACLIConfig altda.CLIConfig - if cfg.DeployConfig.UseAltDA { - fakeAltDAServer := altda.NewFakeDAServer("127.0.0.1", 0, sys.Cfg.Loggers["da-server"]) - if err := fakeAltDAServer.Start(); err != nil { - return nil, fmt.Errorf("failed to start fake altDA server: %w", err) - } - sys.FakeAltDAServer = fakeAltDAServer - - batcherAltDACLIConfig = altda.CLIConfig{ - Enabled: cfg.DeployConfig.UseAltDA, - DAServerURL: fakeAltDAServer.HttpEndpoint(), - VerifyOnRead: true, - GenericDA: true, - MaxConcurrentRequests: cfg.BatcherMaxConcurrentDARequest, - } - } batcherCLIConfig := &bss.CLIConfig{ L1EthRpc: sys.EthInstances[RoleL1].UserRPC().RPC(), L2EthRpc: sys.EthInstances[RoleSeq].UserRPC().RPC(), @@ -1011,7 +1013,7 @@ func (cfg SystemConfig) Start(t *testing.T, startOpts ...StartOption) (*System, MaxBlocksPerSpanBatch: cfg.BatcherMaxBlocksPerSpanBatch, DataAvailabilityType: sys.Cfg.DataAvailabilityType, CompressionAlgo: derive.Zlib, - AltDA: batcherAltDACLIConfig, + AltDA: altDACLIConfig, } // Apply batcher cli modifications diff --git a/op-node/node/node.go b/op-node/node/node.go index d95aa1ed79a55..6b539e9b7e0be 100644 --- a/op-node/node/node.go +++ b/op-node/node/node.go @@ -423,7 +423,10 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config) error { if cfg.AltDA.Enabled && err != nil { return fmt.Errorf("failed to get altDA config: %w", err) } - altDA := altda.NewAltDA(n.log, cfg.AltDA, rpCfg, n.metrics.AltDAMetrics) + altDA, err := altda.NewAltDA(n.log, cfg.AltDA, rpCfg, n.metrics.AltDAMetrics) + if err != nil { + return fmt.Errorf("failed to create altDA: %w", err) + } if cfg.SafeDBPath != "" { n.log.Info("Safe head database enabled", "path", cfg.SafeDBPath) safeDB, err := safedb.NewSafeDB(n.log, cfg.SafeDBPath) diff --git a/op-service/testutils/fake_txmgr.go b/op-service/testutils/fake_txmgr.go new file mode 100644 index 0000000000000..828ba18426887 --- /dev/null +++ b/op-service/testutils/fake_txmgr.go @@ -0,0 +1,77 @@ +package testutils + +import ( + "context" + "errors" + "math/big" + + "github.com/ethereum-optimism/optimism/op-service/txmgr" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rpc" +) + +// FakeTxMgr is a fake txmgr.TxManager for testing the op-batcher. +type FakeTxMgr struct { + log log.Logger + FromAddr common.Address + Closed bool + Nonce uint64 + errorEveryNthSend uint // 0 means never error, 1 means every send errors, etc. + sendCount uint +} + +var _ txmgr.TxManager = (*FakeTxMgr)(nil) + +func NewFakeTxMgr(log log.Logger, from common.Address) *FakeTxMgr { + return &FakeTxMgr{ + log: log, + FromAddr: from, + } +} + +func (f *FakeTxMgr) ErrorEveryNthSend(n uint) { + f.errorEveryNthSend = n +} + +func (f *FakeTxMgr) Send(ctx context.Context, candidate txmgr.TxCandidate) (*types.Receipt, error) { + // We currently only use the FakeTxMgr to test the op-batcher, which only uses SendAsync. + // Send makes it harder to track failures and nonce management (prob need to add mutex, etc). + // We can implement this if/when its needed. + panic("FakeTxMgr does not implement Send") +} +func (f *FakeTxMgr) SendAsync(ctx context.Context, candidate txmgr.TxCandidate, ch chan txmgr.SendResponse) { + f.log.Debug("SendingAsync tx", "nonce", f.Nonce) + f.sendCount++ + var sendResponse txmgr.SendResponse + if f.errorEveryNthSend != 0 && f.sendCount%f.errorEveryNthSend == 0 { + sendResponse.Err = errors.New("errorEveryNthSend") + } else { + sendResponse.Receipt = &types.Receipt{ + BlockHash: common.Hash{}, + BlockNumber: big.NewInt(0), + } + sendResponse.Nonce = f.Nonce + f.Nonce++ + } + ch <- sendResponse +} +func (f *FakeTxMgr) From() common.Address { + return f.FromAddr +} +func (f *FakeTxMgr) BlockNumber(ctx context.Context) (uint64, error) { + return 0, nil +} +func (f *FakeTxMgr) API() rpc.API { + return rpc.API{} +} +func (f *FakeTxMgr) Close() { + f.Closed = true +} +func (f *FakeTxMgr) IsClosed() bool { + return f.Closed +} +func (f *FakeTxMgr) SuggestGasPriceCaps(ctx context.Context) (tipCap *big.Int, baseFee *big.Int, blobBaseFee *big.Int, err error) { + return nil, nil, nil, nil +}