Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions op-alt-da/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,12 @@ func (c CLIConfig) Check() error {
return nil
}

func (c CLIConfig) NewDAClient() *DAClient {
return &DAClient{url: c.DAServerURL, verify: c.VerifyOnRead, precompute: !c.GenericDA, getTimeout: c.GetTimeout, putTimeout: c.PutTimeout}
func (c CLIConfig) NewDAClient() (*DAClient, error) {
err := c.Check()
if err != nil {
return nil, fmt.Errorf("check daclient CLIConfig: %w", err)
}
return &DAClient{url: c.DAServerURL, verify: c.VerifyOnRead, precompute: !c.GenericDA, getTimeout: c.GetTimeout, putTimeout: c.PutTimeout}, nil
}

func ReadCLIConfig(c *cli.Context) CLIConfig {
Expand Down
6 changes: 4 additions & 2 deletions op-alt-da/daclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ func TestDAClientPrecomputed(t *testing.T) {
}
require.NoError(t, cfg.Check())

client := cfg.NewDAClient()
client, err := cfg.NewDAClient()
require.NoError(t, err)

rng := rand.New(rand.NewSource(1234))

Expand Down Expand Up @@ -85,7 +86,8 @@ func TestDAClientService(t *testing.T) {
}
require.NoError(t, cfg.Check())

client := cfg.NewDAClient()
client, err := cfg.NewDAClient()
require.NoError(t, err)

rng := rand.New(rand.NewSource(1234))

Expand Down
8 changes: 6 additions & 2 deletions op-alt-da/damgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,12 @@ type DA struct {
}

// NewAltDA creates a new AltDA instance with the given log and CLIConfig.
func NewAltDA(log log.Logger, cli CLIConfig, cfg Config, metrics Metricer) *DA {
return NewAltDAWithStorage(log, cfg, cli.NewDAClient(), metrics)
func NewAltDA(log log.Logger, cli CLIConfig, cfg Config, metrics Metricer) (*DA, error) {
daClient, err := cli.NewDAClient()
if err != nil {
return nil, fmt.Errorf("new DAClient: %w", err)
}
return NewAltDAWithStorage(log, cfg, daClient, metrics), nil
}

// NewAltDAWithStorage creates a new AltDA instance with the given log and DAStorage interface.
Expand Down
109 changes: 102 additions & 7 deletions op-alt-da/damock.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package altda

import (
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"net/http"
"sync"
Expand All @@ -16,11 +18,16 @@ import (
)

// MockDAClient mocks a DA storage provider to avoid running an HTTP DA server
// in unit tests.
// in unit tests. MockDAClient is goroutine-safe.
type MockDAClient struct {
CommitmentType CommitmentType
store ethdb.KeyValueStore
log log.Logger
mu sync.Mutex
CommitmentType CommitmentType
GenericCommitmentCount uint16 // next generic commitment (use counting commitment instead of hash to help with testing)
store ethdb.KeyValueStore
StoreCount int
log log.Logger
dropEveryNthPut uint // 0 means nothing gets dropped, 1 means every put errors, etc.
setInputRequestCount uint // number of put requests received, irrespective of whether they were successful
}

func NewMockDAClient(log log.Logger) *MockDAClient {
Expand All @@ -31,7 +38,30 @@ func NewMockDAClient(log log.Logger) *MockDAClient {
}
}

// NewCountingGenericCommitmentMockDAClient creates a MockDAClient that uses counting commitments.
// Its commitments are big-endian encoded uint16s of 0, 1, 2, etc. instead of actual hash or altda-layer related commitments.
// Used for testing to make sure we receive commitments in order following Holocene strict ordering rules.
func NewCountingGenericCommitmentMockDAClient(log log.Logger) *MockDAClient {
return &MockDAClient{
CommitmentType: GenericCommitmentType,
store: memorydb.New(),
log: log,
}
}

// Fakes a da server that drops/errors on every Nth put request.
// Useful for testing the batcher's error handling.
// 0 means nothing gets dropped, 1 means every put errors, etc.
func (c *MockDAClient) DropEveryNthPut(n uint) {
c.mu.Lock()
defer c.mu.Unlock()
c.dropEveryNthPut = n
}

func (c *MockDAClient) GetInput(ctx context.Context, key CommitmentData) ([]byte, error) {
c.mu.Lock()
defer c.mu.Unlock()
c.log.Debug("Getting input", "key", key)
bytes, err := c.store.Get(key.Encode())
if err != nil {
return nil, ErrNotFound
Expand All @@ -40,12 +70,46 @@ func (c *MockDAClient) GetInput(ctx context.Context, key CommitmentData) ([]byte
}

func (c *MockDAClient) SetInput(ctx context.Context, data []byte) (CommitmentData, error) {
key := NewCommitmentData(c.CommitmentType, data)
return key, c.store.Put(key.Encode(), data)
c.mu.Lock()
defer c.mu.Unlock()
c.setInputRequestCount++
var key CommitmentData
if c.CommitmentType == GenericCommitmentType {
countCommitment := make([]byte, 2)
binary.BigEndian.PutUint16(countCommitment, c.GenericCommitmentCount)
key = NewGenericCommitment(countCommitment)
} else {
key = NewKeccak256Commitment(data)
}
var action string = "put"
if c.dropEveryNthPut > 0 && c.setInputRequestCount%c.dropEveryNthPut == 0 {
action = "dropped"
}
c.log.Debug("Setting input", "action", action, "key", key, "data", fmt.Sprintf("%x", data))
if action == "dropped" {
return nil, errors.New("put dropped")
}
err := c.store.Put(key.Encode(), data)
if err == nil {
c.GenericCommitmentCount++
c.StoreCount++
}
return key, err
}

func (c *MockDAClient) DeleteData(key []byte) error {
return c.store.Delete(key)
c.mu.Lock()
defer c.mu.Unlock()
c.log.Debug("Deleting data", "key", key)
// memorydb.Delete() returns nil even when the key doesn't exist, so we need to check if the key exists
// before decrementing StoreCount.
var err error
if _, err = c.store.Get(key); err == nil {
if err = c.store.Delete(key); err == nil {
c.StoreCount--
}
}
return err
}

// DAErrFaker is a DA client that can be configured to return errors on GetInput
Expand Down Expand Up @@ -121,6 +185,12 @@ type FakeDAServer struct {
getRequestLatency time.Duration
// next failoverCount Put requests will return 503 status code for failover testing
failoverCount uint64
// outOfOrderResponses is a flag that, when set, causes the server to send responses out of order.
// It will only respond to pairs of request, returning the second response first, and waiting 1 second before sending the first response.
// This is used to test the batcher's ability to handle out of order responses, while still ensuring holocene's strict ordering rules.
outOfOrderResponses bool
oooMu sync.Mutex
oooWaitChan chan struct{}
}

func NewFakeDAServer(host string, port int, log log.Logger) *FakeDAServer {
Expand All @@ -145,6 +215,21 @@ func (s *FakeDAServer) HandlePut(w http.ResponseWriter, r *http.Request) {
s.failoverCount--
return
}
if s.outOfOrderResponses {
s.oooMu.Lock()
if s.oooWaitChan == nil {
s.log.Info("Received put request while in out-of-order mode, waiting for next request")
s.oooWaitChan = make(chan struct{})
s.oooMu.Unlock()
<-s.oooWaitChan
time.Sleep(1 * time.Second)
} else {
s.log.Info("Received second put request in out-of-order mode, responding to this one first, then the first one")
close(s.oooWaitChan)
s.oooWaitChan = nil
s.oooMu.Unlock()
}
}
s.DAServer.HandlePut(w, r)
}

Expand All @@ -162,10 +247,12 @@ func (s *FakeDAServer) Start() error {
}

func (s *FakeDAServer) SetPutRequestLatency(latency time.Duration) {
s.log.Info("Setting put request latency", "latency", latency)
s.putRequestLatency = latency
}

func (s *FakeDAServer) SetGetRequestLatency(latency time.Duration) {
s.log.Info("Setting get request latency", "latency", latency)
s.getRequestLatency = latency
}

Expand All @@ -174,6 +261,14 @@ func (s *FakeDAServer) SetPutFailoverForNRequests(n uint64) {
s.failoverCount = n
}

// When ooo=true, causes the server to send responses out of order.
// It will only respond to pairs of request, returning the second response first, and waiting 1 second before sending the first response.
// This is used to test the batcher's ability to handle out of order responses, while still ensuring holocene's strict ordering rules.
func (s *FakeDAServer) SetOutOfOrderResponses(ooo bool) {
s.log.Info("Setting out of order responses", "ooo", ooo)
s.outOfOrderResponses = ooo
}

type MemStore struct {
db map[string][]byte
lock sync.RWMutex
Expand Down
65 changes: 65 additions & 0 deletions op-alt-da/damock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package altda

import (
"net/http/httptest"
"sync"
"testing"
"time"

"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum/go-ethereum/log"
)

func TestFakeDAServer_OutOfOrderResponses(t *testing.T) {
logger := testlog.Logger(t, log.LevelDebug)
daServer := NewFakeDAServer("localhost", 0, logger)
daServer.SetOutOfOrderResponses(true)

// Channel to track completion order
completionOrder := make(chan int, 2)

// Start two concurrent requests
var wg sync.WaitGroup
wg.Add(2)

// First request
go func() {
defer wg.Done()
w := httptest.NewRecorder()
r := httptest.NewRequest("PUT", "/data", nil)

daServer.HandlePut(w, r)
completionOrder <- 1
}()

// Small delay to ensure first request starts first
time.Sleep(100 * time.Millisecond)

// Second request
go func() {
defer wg.Done()
w := httptest.NewRecorder()
r := httptest.NewRequest("PUT", "/data", nil)

daServer.HandlePut(w, r)
completionOrder <- 2
}()

// Wait for both requests to complete
wg.Wait()
close(completionOrder)

// Check completion order
var order []int
for n := range completionOrder {
order = append(order, n)
}

// Second request should complete before first
if len(order) != 2 {
t.Fatalf("expected 2 requests to complete, got %d", len(order))
}
if order[0] != 2 || order[1] != 1 {
t.Errorf("expected completion order [2,1], got %v", order)
}
}
Loading