Skip to content

Commit

Permalink
feat: add the standalone advancer command
Browse files Browse the repository at this point in the history
  • Loading branch information
renan061 authored and vfusco committed Sep 27, 2024
1 parent f63c12d commit 7ac0330
Show file tree
Hide file tree
Showing 11 changed files with 194 additions and 100 deletions.
4 changes: 2 additions & 2 deletions cmd/cartesi-rollups-evm-reader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,12 @@ func run(cmd *cobra.Command, args []string) {
}

// setup log
startup.ConfigLogs(c)
startup.ConfigLogs(c.LogLevel, c.LogPrettyEnabled)

slog.Info("Starting the Cartesi Rollups Node EVM Reader", "version", buildVersion, "config", c)

// Validate Schema
err := startup.ValidateSchema(c)
err := startup.ValidateSchema(c.PostgresEndpoint.Value, c.PostgresSslDisabled)
if err != nil {
slog.Error("EVM Reader exited with an error", "error", err)
os.Exit(1)
Expand Down
90 changes: 90 additions & 0 deletions cmd/cartesi-rollups-machines/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// (c) Cartesi and individual authors (see AUTHORS)
// SPDX-License-Identifier: Apache-2.0 (see LICENSE)

package main

import (
"context"
"fmt"
"log/slog"
"os"
"os/signal"
"syscall"

"github.com/cartesi/rollups-node/internal/node/advancer"
"github.com/cartesi/rollups-node/internal/node/advancer/machines"
"github.com/cartesi/rollups-node/internal/node/config"
"github.com/cartesi/rollups-node/internal/node/startup"
"github.com/cartesi/rollups-node/internal/repository"

"github.com/spf13/cobra"
)

const CMD_NAME = "advancer"

var (
buildVersion = "devel"
Cmd = &cobra.Command{
Use: CMD_NAME,
Short: "Runs the Advancer",
Long: "Runs the Advancer in standalone mode",
RunE: run,
}
)

func main() {
err := Cmd.Execute()
if err != nil {
os.Exit(1)
}
}

func getDatabase(ctx context.Context, endpoint string, sslMode bool) (*repository.Database, error) {
err := startup.ValidateSchema(endpoint, sslMode)
if err != nil {
return nil, fmt.Errorf("invalid database schema: %w", err)
}

database, err := repository.Connect(ctx, endpoint)
if err != nil {
return nil, fmt.Errorf("failed to connect to the database: %w", err)
}

return database, nil
}

func run(cmd *cobra.Command, args []string) error {
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()

c := config.GetAdvancerConfig()
startup.ConfigLogs(c.LogLevel, c.LogPrettyEnabled)

slog.Info("Starting the Cartesi Rollups Node Advancer", "version", buildVersion, "config", c)

database, err := getDatabase(ctx, c.PostgresEndpoint.Value, c.PostgresSslMode)
if err != nil {
return err
}
defer database.Close()

repo := &repository.MachineRepository{Database: database}

machines, err := machines.Load(ctx, repo, c.MachineServerVerbosity)
if err != nil {
return fmt.Errorf("failed to load the machines: %w", err)
}
defer machines.Close()

advancer, err := advancer.New(machines, repo)
if err != nil {
return fmt.Errorf("failed to create the advancer: %w", err)
}

poller, err := advancer.Poller(c.AdvancerPollingInterval)
if err != nil {
return fmt.Errorf("failed to create the advancer service: %w", err)
}

return poller.Start(ctx)
}
4 changes: 2 additions & 2 deletions cmd/cartesi-rollups-node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ func main() {
config := config.FromEnv()

// setup log
startup.ConfigLogs(config)
startup.ConfigLogs(config.LogLevel, config.LogPrettyEnabled)
slog.Info("Starting the Cartesi Rollups Node", "version", buildVersion, "config", config)

err := startup.ValidateSchema(config)
err := startup.ValidateSchema(config.PostgresEndpoint.Value, config.PostgresSslDisabled)
if err != nil {
slog.Error("Node exited with an error", "error", err)
os.Exit(1)
Expand Down
4 changes: 2 additions & 2 deletions cmd/cartesi-rollups-validator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,12 @@ func run(cmd *cobra.Command, args []string) {
c.PostgresEndpoint = config.Redacted[string]{Value: postgresEndpoint}
}

startup.ConfigLogs(c)
startup.ConfigLogs(c.LogLevel, c.LogPrettyEnabled)

slog.Info("Starting the Cartesi Rollups Node Validator", "version", buildVersion, "config", c)

// Validate Schema
err := startup.ValidateSchema(c)
err := startup.ValidateSchema(c.PostgresEndpoint.Value, c.PostgresSslDisabled)
if err != nil {
slog.Error("failed to validate database schema", "error", err)
os.Exit(1)
Expand Down
44 changes: 19 additions & 25 deletions internal/node/advancer/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"log/slog"
"time"

"github.com/cartesi/rollups-node/internal/node/advancer/machines"
"github.com/cartesi/rollups-node/internal/node/advancer/poller"
. "github.com/cartesi/rollups-node/internal/node/model"
"github.com/cartesi/rollups-node/internal/nodemachine"
Expand Down Expand Up @@ -49,10 +50,10 @@ func (advancer *Advancer) Poller(pollingInterval time.Duration) (*poller.Poller,
// runs them through the cartesi machine,
// and updates the repository with the ouputs.
func (advancer *Advancer) Step(ctx context.Context) error {
apps := keysFrom(advancer.machines)
apps := advancer.machines.Apps()

// Gets the unprocessed inputs (of all apps) from the repository.
slog.Info("advancer: getting unprocessed inputs")
slog.Info("advancer: querying for unprocessed inputs")
inputs, err := advancer.repository.GetUnprocessedInputs(ctx, apps)
if err != nil {
return err
Expand All @@ -67,14 +68,22 @@ func (advancer *Advancer) Step(ctx context.Context) error {
}
}

// Updates the status of the epochs.
for _, app := range apps {
err := advancer.repository.UpdateEpochs(ctx, app)
if err != nil {
return err
}
}

return nil
}

// process sequentially processes inputs from the the application.
func (advancer *Advancer) process(ctx context.Context, app Address, inputs []*Input) error {
// Asserts that the app has an associated machine.
machine, ok := advancer.machines[app]
if !ok {
machine := advancer.machines.GetAdvanceMachine(app)
if machine == nil {
panic(fmt.Errorf("%w %s", ErrNoApp, app.String()))
}

Expand All @@ -99,11 +108,7 @@ func (advancer *Advancer) process(ctx context.Context, app Address, inputs []*In
}
}

// Updates the status of the epochs based on the last processed input.
lastInput := inputs[len(inputs)-1]
err := advancer.repository.UpdateEpochs(ctx, app, lastInput)

return err
return nil
}

// ------------------------------------------------------------------------------------------------
Expand All @@ -114,25 +119,14 @@ type Repository interface {

StoreAdvanceResult(context.Context, *Input, *nodemachine.AdvanceResult) error

UpdateEpochs(_ context.Context, app Address, lastInput *Input) error
UpdateEpochs(_ context.Context, app Address) error
}

// A map of application addresses to machines.
type Machines = map[Address]Machine
type Machines interface {
GetAdvanceMachine(app Address) machines.AdvanceMachine
Apps() []Address
}

type Machine interface {
Advance(_ context.Context, input []byte, index uint64) (*nodemachine.AdvanceResult, error)
}

// ------------------------------------------------------------------------------------------------

// keysFrom returns a slice with the keysFrom of a map.
func keysFrom[K comparable, V any](m map[K]V) []K {
keys := make([]K, len(m))
i := 0
for k := range m {
keys[i] = k
i++
}
return keys
}
74 changes: 36 additions & 38 deletions internal/node/advancer/advancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
mrand "math/rand"
"testing"

"github.com/cartesi/rollups-node/internal/node/advancer/machines"
. "github.com/cartesi/rollups-node/internal/node/model"
"github.com/cartesi/rollups-node/internal/nodemachine"

Expand All @@ -27,7 +28,8 @@ type AdvancerSuite struct{ suite.Suite }
func (s *AdvancerSuite) TestNew() {
s.Run("Ok", func() {
require := s.Require()
var machines map[Address]Machine = Machines{randomAddress(): &MockMachine{}}
machines := newMockMachines()
machines.Map[randomAddress()] = &MockMachine{}
var repository Repository = &MockRepository{}
advancer, err := New(machines, repository)
require.NotNil(advancer)
Expand All @@ -36,7 +38,7 @@ func (s *AdvancerSuite) TestNew() {

s.Run("InvalidMachines", func() {
require := s.Require()
var machines map[Address]Machine = nil
var machines Machines = nil
var repository Repository = &MockRepository{}
advancer, err := New(machines, repository)
require.Nil(advancer)
Expand All @@ -46,7 +48,8 @@ func (s *AdvancerSuite) TestNew() {

s.Run("InvalidRepository", func() {
require := s.Require()
var machines map[Address]Machine = Machines{randomAddress(): &MockMachine{}}
machines := newMockMachines()
machines.Map[randomAddress()] = &MockMachine{}
var repository Repository = nil
advancer, err := New(machines, repository)
require.Nil(advancer)
Expand All @@ -63,11 +66,11 @@ func (s *AdvancerSuite) TestRun() {
s.Run("Ok", func() {
require := s.Require()

machines := Machines{}
machines := newMockMachines()
app1 := randomAddress()
machines[app1] = &MockMachine{}
machines.Map[app1] = &MockMachine{}
app2 := randomAddress()
machines[app2] = &MockMachine{}
machines.Map[app2] = &MockMachine{}
res1 := randomAdvanceResult()
res2 := randomAdvanceResult()
res3 := randomAdvanceResult()
Expand All @@ -94,14 +97,18 @@ func (s *AdvancerSuite) TestRun() {
require.Len(repository.StoredResults, 3)
})

s.Run("Error/UpdateEpochs", func() {
s.T().Skip("TODO")
})

// NOTE: missing more test cases
}

func (s *AdvancerSuite) TestProcess() {
setup := func() (Machines, *MockRepository, *Advancer, Address) {
app := randomAddress()
machines := Machines{}
machines[app] = &MockMachine{}
machines := newMockMachines()
machines.Map[app] = &MockMachine{}
repository := &MockRepository{}
advancer := &Advancer{machines, repository}
return machines, repository, advancer, app
Expand All @@ -124,7 +131,6 @@ func (s *AdvancerSuite) TestProcess() {
err := advancer.process(context.Background(), app, inputs)
require.Nil(err)
require.Len(repository.StoredResults, 7)
require.Equal(*inputs[6], repository.LastInput)
})

s.Run("Panic", func() {
Expand Down Expand Up @@ -183,29 +189,7 @@ func (s *AdvancerSuite) TestProcess() {
require.Errorf(err, "store-advance error")
require.Len(repository.StoredResults, 1)
})

s.Run("UpdateEpochs", func() {
require := s.Require()

_, repository, advancer, app := setup()
inputs := []*Input{
{Id: 1, RawData: marshal(randomAdvanceResult())},
{Id: 2, RawData: marshal(randomAdvanceResult())},
{Id: 3, RawData: marshal(randomAdvanceResult())},
{Id: 4, RawData: marshal(randomAdvanceResult())},
}
repository.UpdateEpochsError = errors.New("update-epochs error")

err := advancer.process(context.Background(), app, inputs)
require.Errorf(err, "update-epochs error")
require.Len(repository.StoredResults, 4)
})
})

}

func (s *AdvancerSuite) TestKeysFrom() {
s.T().Skip("TODO")
}

// ------------------------------------------------------------------------------------------------
Expand All @@ -227,14 +211,33 @@ func (mock *MockMachine) Advance(

// ------------------------------------------------------------------------------------------------

type MachinesMock struct {
Map map[Address]machines.AdvanceMachine
}

func newMockMachines() *MachinesMock {
return &MachinesMock{
Map: map[Address]machines.AdvanceMachine{},
}
}

func (mock *MachinesMock) GetAdvanceMachine(app Address) machines.AdvanceMachine {
return mock.Map[app]
}

func (mock *MachinesMock) Apps() []Address {
return []Address{}
}

// ------------------------------------------------------------------------------------------------

type MockRepository struct {
GetInputsReturn map[Address][]*Input
GetInputsError error
StoreAdvanceError error
UpdateEpochsError error

StoredResults []*nodemachine.AdvanceResult
LastInput Input
}

func (mock *MockRepository) GetUnprocessedInputs(
Expand All @@ -253,12 +256,7 @@ func (mock *MockRepository) StoreAdvanceResult(
return mock.StoreAdvanceError
}

func (mock *MockRepository) UpdateEpochs(
_ context.Context,
_ Address,
lastInput *Input,
) error {
mock.LastInput = *lastInput
func (mock *MockRepository) UpdateEpochs(_ context.Context, _ Address) error {
return mock.UpdateEpochsError
}

Expand Down
Loading

0 comments on commit 7ac0330

Please sign in to comment.