diff --git a/cmd/cartesi-rollups-machines/main.go b/cmd/cartesi-rollups-machines/main.go new file mode 100644 index 000000000..4b24123b6 --- /dev/null +++ b/cmd/cartesi-rollups-machines/main.go @@ -0,0 +1,98 @@ +// (c) Cartesi and individual authors (see AUTHORS) +// SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +package main + +import ( + "context" + "fmt" + "log/slog" + "os" + "os/signal" + "syscall" + "time" + + "github.com/cartesi/rollups-node/internal/node/advancer" + "github.com/cartesi/rollups-node/internal/node/advancer/machines" + "github.com/cartesi/rollups-node/internal/node/config" + "github.com/cartesi/rollups-node/internal/node/startup" + "github.com/cartesi/rollups-node/internal/repository" + + "github.com/spf13/cobra" +) + +const CMD_NAME = "advancer" + +var ( + buildVersion = "devel" + Cmd = &cobra.Command{ + Use: CMD_NAME, + Short: "Runs the Advancer", + Long: "Runs the Advancer in standalone mode", + RunE: run, + } +) + +func main() { + err := Cmd.Execute() + if err != nil { + os.Exit(1) + } +} + +func getDatabase(ctx context.Context, c config.NodeConfig) (*repository.Database, error) { + err := startup.ValidateSchema(c) + if err != nil { + return nil, fmt.Errorf("invalid database schema: %w", err) + } + + database, err := repository.Connect(ctx, c.PostgresEndpoint.Value) + if err != nil { + return nil, fmt.Errorf("failed to connect to the database: %w", err) + } + + return database, nil +} + +func run(cmd *cobra.Command, args []string) error { + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() + + c := config.FromEnv() + startup.ConfigLogs(c) + + slog.Info("Starting the Cartesi Rollups Node Advancer", "version", buildVersion, "config", c) + + database, err := getDatabase(ctx, c) + if err != nil { + return err + } + defer database.Close() + + repo := &repository.MachineRepository{Database: database} + + machines, err := machines.Load(ctx, repo, c.MachineServerVerbosity) + if err != nil { + return fmt.Errorf("failed to load the machines: %w", err) + } + defer machines.Close() + + advancer, err := advancer.New(machines, repo) + if err != nil { + return fmt.Errorf("failed to create the advancer: %w", err) + } + + poller, err := advancer.Poller(5 * time.Second) //nolint: mnd + if err != nil { + return fmt.Errorf("failed to create the advancer service: %w", err) + } + + ready := make(chan struct{}, 1) + + err = poller.Start(ctx, ready) + if err != nil { + return fmt.Errorf("failed to start the advancer service: %w", err) + } + + return nil +} diff --git a/internal/node/advancer/advancer.go b/internal/node/advancer/advancer.go index 32ef675ab..7865f89ec 100644 --- a/internal/node/advancer/advancer.go +++ b/internal/node/advancer/advancer.go @@ -10,6 +10,7 @@ import ( "log/slog" "time" + "github.com/cartesi/rollups-node/internal/node/advancer/machines" "github.com/cartesi/rollups-node/internal/node/advancer/poller" . "github.com/cartesi/rollups-node/internal/node/model" "github.com/cartesi/rollups-node/internal/nodemachine" @@ -49,7 +50,7 @@ func (advancer *Advancer) Poller(pollingInterval time.Duration) (*poller.Poller, // runs them through the cartesi machine, // and updates the repository with the ouputs. func (advancer *Advancer) Step(ctx context.Context) error { - apps := keysFrom(advancer.machines) + apps := advancer.machines.Apps() // Gets the unprocessed inputs (of all apps) from the repository. slog.Info("advancer: getting unprocessed inputs") @@ -67,14 +68,22 @@ func (advancer *Advancer) Step(ctx context.Context) error { } } + // Updates the status of the epochs. + for _, app := range apps { + err := advancer.repository.UpdateEpochs(ctx, app) + if err != nil { + return err + } + } + return nil } // process sequentially processes inputs from the the application. func (advancer *Advancer) process(ctx context.Context, app Address, inputs []*Input) error { // Asserts that the app has an associated machine. - machine, ok := advancer.machines[app] - if !ok { + machine := advancer.machines.GetAdvanceMachine(app) + if machine == nil { panic(fmt.Errorf("%w %s", ErrNoApp, app.String())) } @@ -99,11 +108,7 @@ func (advancer *Advancer) process(ctx context.Context, app Address, inputs []*In } } - // Updates the status of the epochs based on the last processed input. - lastInput := inputs[len(inputs)-1] - err := advancer.repository.UpdateEpochs(ctx, app, lastInput) - - return err + return nil } // ------------------------------------------------------------------------------------------------ @@ -114,25 +119,14 @@ type Repository interface { StoreAdvanceResult(context.Context, *Input, *nodemachine.AdvanceResult) error - UpdateEpochs(_ context.Context, app Address, lastInput *Input) error + UpdateEpochs(_ context.Context, app Address) error } -// A map of application addresses to machines. -type Machines = map[Address]Machine +type Machines interface { + GetAdvanceMachine(app Address) machines.AdvanceMachine + Apps() []Address +} type Machine interface { Advance(_ context.Context, input []byte, index uint64) (*nodemachine.AdvanceResult, error) } - -// ------------------------------------------------------------------------------------------------ - -// keysFrom returns a slice with the keysFrom of a map. -func keysFrom[K comparable, V any](m map[K]V) []K { - keys := make([]K, len(m)) - i := 0 - for k := range m { - keys[i] = k - i++ - } - return keys -} diff --git a/internal/node/advancer/advancer_test.go b/internal/node/advancer/advancer_test.go index 07b6bb6c0..67bd8e489 100644 --- a/internal/node/advancer/advancer_test.go +++ b/internal/node/advancer/advancer_test.go @@ -12,6 +12,7 @@ import ( mrand "math/rand" "testing" + "github.com/cartesi/rollups-node/internal/node/advancer/machines" . "github.com/cartesi/rollups-node/internal/node/model" "github.com/cartesi/rollups-node/internal/nodemachine" @@ -27,7 +28,8 @@ type AdvancerSuite struct{ suite.Suite } func (s *AdvancerSuite) TestNew() { s.Run("Ok", func() { require := s.Require() - var machines map[Address]Machine = Machines{randomAddress(): &MockMachine{}} + machines := newMockMachines() + machines.Map[randomAddress()] = &MockMachine{} var repository Repository = &MockRepository{} advancer, err := New(machines, repository) require.NotNil(advancer) @@ -36,7 +38,7 @@ func (s *AdvancerSuite) TestNew() { s.Run("InvalidMachines", func() { require := s.Require() - var machines map[Address]Machine = nil + var machines Machines = nil var repository Repository = &MockRepository{} advancer, err := New(machines, repository) require.Nil(advancer) @@ -46,7 +48,8 @@ func (s *AdvancerSuite) TestNew() { s.Run("InvalidRepository", func() { require := s.Require() - var machines map[Address]Machine = Machines{randomAddress(): &MockMachine{}} + machines := newMockMachines() + machines.Map[randomAddress()] = &MockMachine{} var repository Repository = nil advancer, err := New(machines, repository) require.Nil(advancer) @@ -63,11 +66,11 @@ func (s *AdvancerSuite) TestRun() { s.Run("Ok", func() { require := s.Require() - machines := Machines{} + machines := newMockMachines() app1 := randomAddress() - machines[app1] = &MockMachine{} + machines.Map[app1] = &MockMachine{} app2 := randomAddress() - machines[app2] = &MockMachine{} + machines.Map[app2] = &MockMachine{} res1 := randomAdvanceResult() res2 := randomAdvanceResult() res3 := randomAdvanceResult() @@ -94,14 +97,18 @@ func (s *AdvancerSuite) TestRun() { require.Len(repository.StoredResults, 3) }) + s.Run("Error/UpdateEpochs", func() { + s.T().Skip("TODO") + }) + // NOTE: missing more test cases } func (s *AdvancerSuite) TestProcess() { setup := func() (Machines, *MockRepository, *Advancer, Address) { app := randomAddress() - machines := Machines{} - machines[app] = &MockMachine{} + machines := newMockMachines() + machines.Map[app] = &MockMachine{} repository := &MockRepository{} advancer := &Advancer{machines, repository} return machines, repository, advancer, app @@ -124,7 +131,6 @@ func (s *AdvancerSuite) TestProcess() { err := advancer.process(context.Background(), app, inputs) require.Nil(err) require.Len(repository.StoredResults, 7) - require.Equal(*inputs[6], repository.LastInput) }) s.Run("Panic", func() { @@ -183,29 +189,7 @@ func (s *AdvancerSuite) TestProcess() { require.Errorf(err, "store-advance error") require.Len(repository.StoredResults, 1) }) - - s.Run("UpdateEpochs", func() { - require := s.Require() - - _, repository, advancer, app := setup() - inputs := []*Input{ - {Id: 1, RawData: marshal(randomAdvanceResult())}, - {Id: 2, RawData: marshal(randomAdvanceResult())}, - {Id: 3, RawData: marshal(randomAdvanceResult())}, - {Id: 4, RawData: marshal(randomAdvanceResult())}, - } - repository.UpdateEpochsError = errors.New("update-epochs error") - - err := advancer.process(context.Background(), app, inputs) - require.Errorf(err, "update-epochs error") - require.Len(repository.StoredResults, 4) - }) }) - -} - -func (s *AdvancerSuite) TestKeysFrom() { - s.T().Skip("TODO") } // ------------------------------------------------------------------------------------------------ @@ -227,6 +211,26 @@ func (mock *MockMachine) Advance( // ------------------------------------------------------------------------------------------------ +type MachinesMock struct { + Map map[Address]machines.AdvanceMachine +} + +func newMockMachines() *MachinesMock { + return &MachinesMock{ + Map: map[Address]machines.AdvanceMachine{}, + } +} + +func (mock *MachinesMock) GetAdvanceMachine(app Address) machines.AdvanceMachine { + return mock.Map[app] +} + +func (mock *MachinesMock) Apps() []Address { + return []Address{} +} + +// ------------------------------------------------------------------------------------------------ + type MockRepository struct { GetInputsReturn map[Address][]*Input GetInputsError error @@ -234,7 +238,6 @@ type MockRepository struct { UpdateEpochsError error StoredResults []*nodemachine.AdvanceResult - LastInput Input } func (mock *MockRepository) GetUnprocessedInputs( @@ -253,12 +256,7 @@ func (mock *MockRepository) StoreAdvanceResult( return mock.StoreAdvanceError } -func (mock *MockRepository) UpdateEpochs( - _ context.Context, - _ Address, - lastInput *Input, -) error { - mock.LastInput = *lastInput +func (mock *MockRepository) UpdateEpochs(_ context.Context, _ Address) error { return mock.UpdateEpochsError } diff --git a/internal/repository/machine.go b/internal/repository/machine.go index 73dda60ed..2a29c45cc 100644 --- a/internal/repository/machine.go +++ b/internal/repository/machine.go @@ -45,7 +45,6 @@ func (repo *MachineRepository) GetMachineConfigurations( return nil, fmt.Errorf("%w (failed querying applications): %w", ErrAdvancerRepository, err) } - // TODO: missing machine config fields res := []*machines.MachineConfig{} var row machines.MachineConfig @@ -76,14 +75,14 @@ func (repo *MachineRepository) GetProcessedInputs( app Address, index uint64, ) ([]*Input, error) { - query := fmt.Sprintf(` + query := ` SELECT id, index, status, raw_data FROM input WHERE application_address = @applicationAddress AND index >= @index AND status != 'NONE' ORDER BY index ASC - `) + ` args := pgx.NamedArgs{ "applicationAddress": app, "index": index,