Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions op-alt-da/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,12 @@ func (c CLIConfig) Check() error {
return nil
}

func (c CLIConfig) NewDAClient() *DAClient {
return &DAClient{url: c.DAServerURL, verify: c.VerifyOnRead, precompute: !c.GenericDA, getTimeout: c.GetTimeout, putTimeout: c.PutTimeout}
func (c CLIConfig) NewDAClient() (*DAClient, error) {
err := c.Check()
if err != nil {
return nil, fmt.Errorf("check daclient CLIConfig: %w", err)
}
return &DAClient{url: c.DAServerURL, verify: c.VerifyOnRead, precompute: !c.GenericDA, getTimeout: c.GetTimeout, putTimeout: c.PutTimeout}, nil
}

func ReadCLIConfig(c *cli.Context) CLIConfig {
Expand Down
6 changes: 4 additions & 2 deletions op-alt-da/daclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ func TestDAClientPrecomputed(t *testing.T) {
}
require.NoError(t, cfg.Check())

client := cfg.NewDAClient()
client, err := cfg.NewDAClient()
require.NoError(t, err)

rng := rand.New(rand.NewSource(1234))

Expand Down Expand Up @@ -85,7 +86,8 @@ func TestDAClientService(t *testing.T) {
}
require.NoError(t, cfg.Check())

client := cfg.NewDAClient()
client, err := cfg.NewDAClient()
require.NoError(t, err)

rng := rand.New(rand.NewSource(1234))

Expand Down
8 changes: 6 additions & 2 deletions op-alt-da/damgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,12 @@ type DA struct {
}

// NewAltDA creates a new AltDA instance with the given log and CLIConfig.
func NewAltDA(log log.Logger, cli CLIConfig, cfg Config, metrics Metricer) *DA {
return NewAltDAWithStorage(log, cfg, cli.NewDAClient(), metrics)
func NewAltDA(log log.Logger, cli CLIConfig, cfg Config, metrics Metricer) (*DA, error) {
daClient, err := cli.NewDAClient()
if err != nil {
return nil, fmt.Errorf("new DAClient: %w", err)
}
return NewAltDAWithStorage(log, cfg, daClient, metrics), nil
}

// NewAltDAWithStorage creates a new AltDA instance with the given log and DAStorage interface.
Expand Down
109 changes: 102 additions & 7 deletions op-alt-da/damock.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package altda

import (
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"net/http"
"sync"
Expand All @@ -16,11 +18,16 @@ import (
)

// MockDAClient mocks a DA storage provider to avoid running an HTTP DA server
// in unit tests.
// in unit tests. MockDAClient is goroutine-safe.
type MockDAClient struct {
CommitmentType CommitmentType
store ethdb.KeyValueStore
log log.Logger
mu sync.Mutex
CommitmentType CommitmentType
GenericCommitmentCount uint16 // next generic commitment (use counting commitment instead of hash to help with testing)
store ethdb.KeyValueStore
StoreCount int
log log.Logger
dropEveryNthPut uint // 0 means nothing gets dropped, 1 means every put errors, etc.
setInputRequestCount uint // number of put requests received, irrespective of whether they were successful
}

func NewMockDAClient(log log.Logger) *MockDAClient {
Expand All @@ -31,7 +38,30 @@ func NewMockDAClient(log log.Logger) *MockDAClient {
}
}

// NewCountingGenericCommitmentMockDAClient creates a MockDAClient that uses counting commitments.
// Its commitments are big-endian encoded uint16s of 0, 1, 2, etc. instead of actual hash or altda-layer related commitments.
// Used for testing to make sure we receive commitments in order following Holocene strict ordering rules.
func NewCountingGenericCommitmentMockDAClient(log log.Logger) *MockDAClient {
return &MockDAClient{
CommitmentType: GenericCommitmentType,
store: memorydb.New(),
log: log,
}
}

// Fakes a da server that drops/errors on every Nth put request.
// Useful for testing the batcher's error handling.
// 0 means nothing gets dropped, 1 means every put errors, etc.
func (c *MockDAClient) DropEveryNthPut(n uint) {
c.mu.Lock()
defer c.mu.Unlock()
c.dropEveryNthPut = n
}

func (c *MockDAClient) GetInput(ctx context.Context, key CommitmentData) ([]byte, error) {
c.mu.Lock()
defer c.mu.Unlock()
c.log.Debug("Getting input", "key", key)
bytes, err := c.store.Get(key.Encode())
if err != nil {
return nil, ErrNotFound
Expand All @@ -40,12 +70,46 @@ func (c *MockDAClient) GetInput(ctx context.Context, key CommitmentData) ([]byte
}

func (c *MockDAClient) SetInput(ctx context.Context, data []byte) (CommitmentData, error) {
key := NewCommitmentData(c.CommitmentType, data)
return key, c.store.Put(key.Encode(), data)
c.mu.Lock()
defer c.mu.Unlock()
c.setInputRequestCount++
var key CommitmentData
if c.CommitmentType == GenericCommitmentType {
countCommitment := make([]byte, 2)
binary.BigEndian.PutUint16(countCommitment, c.GenericCommitmentCount)
key = NewGenericCommitment(countCommitment)
} else {
key = NewKeccak256Commitment(data)
}
var action string = "put"
if c.dropEveryNthPut > 0 && c.setInputRequestCount%c.dropEveryNthPut == 0 {
action = "dropped"
}
c.log.Debug("Setting input", "action", action, "key", key, "data", fmt.Sprintf("%x", data))
if action == "dropped" {
return nil, errors.New("put dropped")
}
err := c.store.Put(key.Encode(), data)
if err == nil {
c.GenericCommitmentCount++
c.StoreCount++
}
return key, err
}

func (c *MockDAClient) DeleteData(key []byte) error {
return c.store.Delete(key)
c.mu.Lock()
defer c.mu.Unlock()
c.log.Debug("Deleting data", "key", key)
// memorydb.Delete() returns nil even when the key doesn't exist, so we need to check if the key exists
// before decrementing StoreCount.
var err error
if _, err = c.store.Get(key); err == nil {
if err = c.store.Delete(key); err == nil {
c.StoreCount--
}
}
return err
}

// DAErrFaker is a DA client that can be configured to return errors on GetInput
Expand Down Expand Up @@ -121,6 +185,12 @@ type FakeDAServer struct {
getRequestLatency time.Duration
// next failoverCount Put requests will return 503 status code for failover testing
failoverCount uint64
// outOfOrderResponses is a flag that, when set, causes the server to send responses out of order.
// It will only respond to pairs of request, returning the second response first, and waiting 1 second before sending the first response.
// This is used to test the batcher's ability to handle out of order responses, while still ensuring holocene's strict ordering rules.
outOfOrderResponses bool
oooMu sync.Mutex
oooWaitChan chan struct{}
}

func NewFakeDAServer(host string, port int, log log.Logger) *FakeDAServer {
Expand All @@ -145,6 +215,21 @@ func (s *FakeDAServer) HandlePut(w http.ResponseWriter, r *http.Request) {
s.failoverCount--
return
}
if s.outOfOrderResponses {
s.oooMu.Lock()
if s.oooWaitChan == nil {
s.log.Info("Received put request while in out-of-order mode, waiting for next request")
s.oooWaitChan = make(chan struct{})
s.oooMu.Unlock()
<-s.oooWaitChan
time.Sleep(1 * time.Second)
} else {
s.log.Info("Received second put request in out-of-order mode, responding to this one first, then the first one")
close(s.oooWaitChan)
s.oooWaitChan = nil
s.oooMu.Unlock()
}
}
s.DAServer.HandlePut(w, r)
}

Expand All @@ -162,10 +247,12 @@ func (s *FakeDAServer) Start() error {
}

func (s *FakeDAServer) SetPutRequestLatency(latency time.Duration) {
s.log.Info("Setting put request latency", "latency", latency)
s.putRequestLatency = latency
}

func (s *FakeDAServer) SetGetRequestLatency(latency time.Duration) {
s.log.Info("Setting get request latency", "latency", latency)
s.getRequestLatency = latency
}

Expand All @@ -174,6 +261,14 @@ func (s *FakeDAServer) SetPutFailoverForNRequests(n uint64) {
s.failoverCount = n
}

// When ooo=true, causes the server to send responses out of order.
// It will only respond to pairs of request, returning the second response first, and waiting 1 second before sending the first response.
// This is used to test the batcher's ability to handle out of order responses, while still ensuring holocene's strict ordering rules.
func (s *FakeDAServer) SetOutOfOrderResponses(ooo bool) {
s.log.Info("Setting out of order responses", "ooo", ooo)
s.outOfOrderResponses = ooo
}

type MemStore struct {
db map[string][]byte
lock sync.RWMutex
Expand Down
65 changes: 65 additions & 0 deletions op-alt-da/damock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package altda

import (
"net/http/httptest"
"sync"
"testing"
"time"

"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum/go-ethereum/log"
)

func TestFakeDAServer_OutOfOrderResponses(t *testing.T) {
logger := testlog.Logger(t, log.LevelDebug)
daServer := NewFakeDAServer("localhost", 0, logger)
daServer.SetOutOfOrderResponses(true)

// Channel to track completion order
completionOrder := make(chan int, 2)

// Start two concurrent requests
var wg sync.WaitGroup
wg.Add(2)

// First request
go func() {
defer wg.Done()
w := httptest.NewRecorder()
r := httptest.NewRequest("PUT", "/data", nil)

daServer.HandlePut(w, r)
completionOrder <- 1
}()

// Small delay to ensure first request starts first
time.Sleep(100 * time.Millisecond)

// Second request
go func() {
defer wg.Done()
w := httptest.NewRecorder()
r := httptest.NewRequest("PUT", "/data", nil)

daServer.HandlePut(w, r)
completionOrder <- 2
}()

// Wait for both requests to complete
wg.Wait()
close(completionOrder)

// Check completion order
var order []int
for n := range completionOrder {
order = append(order, n)
}

// Second request should complete before first
if len(order) != 2 {
t.Fatalf("expected 2 requests to complete, got %d", len(order))
}
if order[0] != 2 || order[1] != 1 {
t.Errorf("expected completion order [2,1], got %v", order)
}
}
Loading