diff --git a/cmd/cartesi-rollups-evm-reader/main.go b/cmd/cartesi-rollups-evm-reader/main.go index e211863f6..be71eb922 100644 --- a/cmd/cartesi-rollups-evm-reader/main.go +++ b/cmd/cartesi-rollups-evm-reader/main.go @@ -127,12 +127,12 @@ func run(cmd *cobra.Command, args []string) { } // setup log - startup.ConfigLogs(c) + startup.ConfigLogs(c.LogLevel, c.LogPrettyEnabled) slog.Info("Starting the Cartesi Rollups Node EVM Reader", "version", buildVersion, "config", c) // Validate Schema - err := startup.ValidateSchema(c) + err := startup.ValidateSchema(c.PostgresEndpoint.Value, c.PostgresSslDisabled) if err != nil { slog.Error("EVM Reader exited with an error", "error", err) os.Exit(1) diff --git a/cmd/cartesi-rollups-machines/main.go b/cmd/cartesi-rollups-machines/main.go new file mode 100644 index 000000000..36c6b5171 --- /dev/null +++ b/cmd/cartesi-rollups-machines/main.go @@ -0,0 +1,90 @@ +// (c) Cartesi and individual authors (see AUTHORS) +// SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +package main + +import ( + "context" + "fmt" + "log/slog" + "os" + "os/signal" + "syscall" + + "github.com/cartesi/rollups-node/internal/node/advancer" + "github.com/cartesi/rollups-node/internal/node/advancer/machines" + "github.com/cartesi/rollups-node/internal/node/config" + "github.com/cartesi/rollups-node/internal/node/startup" + "github.com/cartesi/rollups-node/internal/repository" + + "github.com/spf13/cobra" +) + +const CMD_NAME = "advancer" + +var ( + buildVersion = "devel" + Cmd = &cobra.Command{ + Use: CMD_NAME, + Short: "Runs the Advancer", + Long: "Runs the Advancer in standalone mode", + RunE: run, + } +) + +func main() { + err := Cmd.Execute() + if err != nil { + os.Exit(1) + } +} + +func getDatabase(ctx context.Context, endpoint string, sslMode bool) (*repository.Database, error) { + err := startup.ValidateSchema(endpoint, sslMode) + if err != nil { + return nil, fmt.Errorf("invalid database schema: %w", err) + } + + database, err := repository.Connect(ctx, endpoint) + if err != nil { + return nil, fmt.Errorf("failed to connect to the database: %w", err) + } + + return database, nil +} + +func run(cmd *cobra.Command, args []string) error { + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() + + c := config.GetAdvancerConfig() + startup.ConfigLogs(c.LogLevel, c.LogPrettyEnabled) + + slog.Info("Starting the Cartesi Rollups Node Advancer", "version", buildVersion, "config", c) + + database, err := getDatabase(ctx, c.PostgresEndpoint.Value, c.PostgresSslMode) + if err != nil { + return err + } + defer database.Close() + + repo := &repository.MachineRepository{Database: database} + + machines, err := machines.Load(ctx, repo, c.MachineServerVerbosity) + if err != nil { + return fmt.Errorf("failed to load the machines: %w", err) + } + defer machines.Close() + + advancer, err := advancer.New(machines, repo) + if err != nil { + return fmt.Errorf("failed to create the advancer: %w", err) + } + + poller, err := advancer.Poller(c.AdvancerPollingInterval) + if err != nil { + return fmt.Errorf("failed to create the advancer service: %w", err) + } + + return poller.Start(ctx) +} diff --git a/cmd/cartesi-rollups-node/main.go b/cmd/cartesi-rollups-node/main.go index ee3fa4904..1a1117bc3 100644 --- a/cmd/cartesi-rollups-node/main.go +++ b/cmd/cartesi-rollups-node/main.go @@ -32,10 +32,10 @@ func main() { config := config.FromEnv() // setup log - startup.ConfigLogs(config) + startup.ConfigLogs(config.LogLevel, config.LogPrettyEnabled) slog.Info("Starting the Cartesi Rollups Node", "version", buildVersion, "config", config) - err := startup.ValidateSchema(config) + err := startup.ValidateSchema(config.PostgresEndpoint.Value, config.PostgresSslDisabled) if err != nil { slog.Error("Node exited with an error", "error", err) os.Exit(1) diff --git a/cmd/cartesi-rollups-validator/main.go b/cmd/cartesi-rollups-validator/main.go index 9ca66f0f3..ef187be2e 100644 --- a/cmd/cartesi-rollups-validator/main.go +++ b/cmd/cartesi-rollups-validator/main.go @@ -81,12 +81,12 @@ func run(cmd *cobra.Command, args []string) { c.PostgresEndpoint = config.Redacted[string]{Value: postgresEndpoint} } - startup.ConfigLogs(c) + startup.ConfigLogs(c.LogLevel, c.LogPrettyEnabled) slog.Info("Starting the Cartesi Rollups Node Validator", "version", buildVersion, "config", c) // Validate Schema - err := startup.ValidateSchema(c) + err := startup.ValidateSchema(c.PostgresEndpoint.Value, c.PostgresSslDisabled) if err != nil { slog.Error("failed to validate database schema", "error", err) os.Exit(1) diff --git a/internal/node/advancer/advancer.go b/internal/node/advancer/advancer.go index 32ef675ab..4967aa42f 100644 --- a/internal/node/advancer/advancer.go +++ b/internal/node/advancer/advancer.go @@ -10,6 +10,7 @@ import ( "log/slog" "time" + "github.com/cartesi/rollups-node/internal/node/advancer/machines" "github.com/cartesi/rollups-node/internal/node/advancer/poller" . "github.com/cartesi/rollups-node/internal/node/model" "github.com/cartesi/rollups-node/internal/nodemachine" @@ -49,10 +50,10 @@ func (advancer *Advancer) Poller(pollingInterval time.Duration) (*poller.Poller, // runs them through the cartesi machine, // and updates the repository with the ouputs. func (advancer *Advancer) Step(ctx context.Context) error { - apps := keysFrom(advancer.machines) + apps := advancer.machines.Apps() // Gets the unprocessed inputs (of all apps) from the repository. - slog.Info("advancer: getting unprocessed inputs") + slog.Info("advancer: querying for unprocessed inputs") inputs, err := advancer.repository.GetUnprocessedInputs(ctx, apps) if err != nil { return err @@ -67,14 +68,22 @@ func (advancer *Advancer) Step(ctx context.Context) error { } } + // Updates the status of the epochs. + for _, app := range apps { + err := advancer.repository.UpdateEpochs(ctx, app) + if err != nil { + return err + } + } + return nil } // process sequentially processes inputs from the the application. func (advancer *Advancer) process(ctx context.Context, app Address, inputs []*Input) error { // Asserts that the app has an associated machine. - machine, ok := advancer.machines[app] - if !ok { + machine := advancer.machines.GetAdvanceMachine(app) + if machine == nil { panic(fmt.Errorf("%w %s", ErrNoApp, app.String())) } @@ -99,11 +108,7 @@ func (advancer *Advancer) process(ctx context.Context, app Address, inputs []*In } } - // Updates the status of the epochs based on the last processed input. - lastInput := inputs[len(inputs)-1] - err := advancer.repository.UpdateEpochs(ctx, app, lastInput) - - return err + return nil } // ------------------------------------------------------------------------------------------------ @@ -114,25 +119,14 @@ type Repository interface { StoreAdvanceResult(context.Context, *Input, *nodemachine.AdvanceResult) error - UpdateEpochs(_ context.Context, app Address, lastInput *Input) error + UpdateEpochs(_ context.Context, app Address) error } -// A map of application addresses to machines. -type Machines = map[Address]Machine +type Machines interface { + GetAdvanceMachine(app Address) machines.AdvanceMachine + Apps() []Address +} type Machine interface { Advance(_ context.Context, input []byte, index uint64) (*nodemachine.AdvanceResult, error) } - -// ------------------------------------------------------------------------------------------------ - -// keysFrom returns a slice with the keysFrom of a map. -func keysFrom[K comparable, V any](m map[K]V) []K { - keys := make([]K, len(m)) - i := 0 - for k := range m { - keys[i] = k - i++ - } - return keys -} diff --git a/internal/node/advancer/advancer_test.go b/internal/node/advancer/advancer_test.go index 07b6bb6c0..67bd8e489 100644 --- a/internal/node/advancer/advancer_test.go +++ b/internal/node/advancer/advancer_test.go @@ -12,6 +12,7 @@ import ( mrand "math/rand" "testing" + "github.com/cartesi/rollups-node/internal/node/advancer/machines" . "github.com/cartesi/rollups-node/internal/node/model" "github.com/cartesi/rollups-node/internal/nodemachine" @@ -27,7 +28,8 @@ type AdvancerSuite struct{ suite.Suite } func (s *AdvancerSuite) TestNew() { s.Run("Ok", func() { require := s.Require() - var machines map[Address]Machine = Machines{randomAddress(): &MockMachine{}} + machines := newMockMachines() + machines.Map[randomAddress()] = &MockMachine{} var repository Repository = &MockRepository{} advancer, err := New(machines, repository) require.NotNil(advancer) @@ -36,7 +38,7 @@ func (s *AdvancerSuite) TestNew() { s.Run("InvalidMachines", func() { require := s.Require() - var machines map[Address]Machine = nil + var machines Machines = nil var repository Repository = &MockRepository{} advancer, err := New(machines, repository) require.Nil(advancer) @@ -46,7 +48,8 @@ func (s *AdvancerSuite) TestNew() { s.Run("InvalidRepository", func() { require := s.Require() - var machines map[Address]Machine = Machines{randomAddress(): &MockMachine{}} + machines := newMockMachines() + machines.Map[randomAddress()] = &MockMachine{} var repository Repository = nil advancer, err := New(machines, repository) require.Nil(advancer) @@ -63,11 +66,11 @@ func (s *AdvancerSuite) TestRun() { s.Run("Ok", func() { require := s.Require() - machines := Machines{} + machines := newMockMachines() app1 := randomAddress() - machines[app1] = &MockMachine{} + machines.Map[app1] = &MockMachine{} app2 := randomAddress() - machines[app2] = &MockMachine{} + machines.Map[app2] = &MockMachine{} res1 := randomAdvanceResult() res2 := randomAdvanceResult() res3 := randomAdvanceResult() @@ -94,14 +97,18 @@ func (s *AdvancerSuite) TestRun() { require.Len(repository.StoredResults, 3) }) + s.Run("Error/UpdateEpochs", func() { + s.T().Skip("TODO") + }) + // NOTE: missing more test cases } func (s *AdvancerSuite) TestProcess() { setup := func() (Machines, *MockRepository, *Advancer, Address) { app := randomAddress() - machines := Machines{} - machines[app] = &MockMachine{} + machines := newMockMachines() + machines.Map[app] = &MockMachine{} repository := &MockRepository{} advancer := &Advancer{machines, repository} return machines, repository, advancer, app @@ -124,7 +131,6 @@ func (s *AdvancerSuite) TestProcess() { err := advancer.process(context.Background(), app, inputs) require.Nil(err) require.Len(repository.StoredResults, 7) - require.Equal(*inputs[6], repository.LastInput) }) s.Run("Panic", func() { @@ -183,29 +189,7 @@ func (s *AdvancerSuite) TestProcess() { require.Errorf(err, "store-advance error") require.Len(repository.StoredResults, 1) }) - - s.Run("UpdateEpochs", func() { - require := s.Require() - - _, repository, advancer, app := setup() - inputs := []*Input{ - {Id: 1, RawData: marshal(randomAdvanceResult())}, - {Id: 2, RawData: marshal(randomAdvanceResult())}, - {Id: 3, RawData: marshal(randomAdvanceResult())}, - {Id: 4, RawData: marshal(randomAdvanceResult())}, - } - repository.UpdateEpochsError = errors.New("update-epochs error") - - err := advancer.process(context.Background(), app, inputs) - require.Errorf(err, "update-epochs error") - require.Len(repository.StoredResults, 4) - }) }) - -} - -func (s *AdvancerSuite) TestKeysFrom() { - s.T().Skip("TODO") } // ------------------------------------------------------------------------------------------------ @@ -227,6 +211,26 @@ func (mock *MockMachine) Advance( // ------------------------------------------------------------------------------------------------ +type MachinesMock struct { + Map map[Address]machines.AdvanceMachine +} + +func newMockMachines() *MachinesMock { + return &MachinesMock{ + Map: map[Address]machines.AdvanceMachine{}, + } +} + +func (mock *MachinesMock) GetAdvanceMachine(app Address) machines.AdvanceMachine { + return mock.Map[app] +} + +func (mock *MachinesMock) Apps() []Address { + return []Address{} +} + +// ------------------------------------------------------------------------------------------------ + type MockRepository struct { GetInputsReturn map[Address][]*Input GetInputsError error @@ -234,7 +238,6 @@ type MockRepository struct { UpdateEpochsError error StoredResults []*nodemachine.AdvanceResult - LastInput Input } func (mock *MockRepository) GetUnprocessedInputs( @@ -253,12 +256,7 @@ func (mock *MockRepository) StoreAdvanceResult( return mock.StoreAdvanceError } -func (mock *MockRepository) UpdateEpochs( - _ context.Context, - _ Address, - lastInput *Input, -) error { - mock.LastInput = *lastInput +func (mock *MockRepository) UpdateEpochs(_ context.Context, _ Address) error { return mock.UpdateEpochsError } diff --git a/internal/node/advancer/machines/machines.go b/internal/node/advancer/machines/machines.go index b66854e5f..2e70388f6 100644 --- a/internal/node/advancer/machines/machines.go +++ b/internal/node/advancer/machines/machines.go @@ -136,7 +136,7 @@ func (m *Machines) Delete(app Address) *nm.NodeMachine { // Apps returns the addresses of the applications for which there are machines. func (m *Machines) Apps() []Address { m.mutex.RLock() - defer m.mutex.Unlock() + defer m.mutex.RUnlock() keys := make([]Address, len(m.machines)) i := 0 @@ -163,7 +163,7 @@ func (m *Machines) Close() error { func (m *Machines) getMachine(app Address) *nm.NodeMachine { m.mutex.RLock() - defer m.mutex.Unlock() + defer m.mutex.RUnlock() return m.machines[app] } diff --git a/internal/node/advancer/poller/poller.go b/internal/node/advancer/poller/poller.go index f8f469e98..174f63fb1 100644 --- a/internal/node/advancer/poller/poller.go +++ b/internal/node/advancer/poller/poller.go @@ -8,7 +8,6 @@ import ( "errors" "fmt" "log/slog" - "sync/atomic" "time" ) @@ -17,10 +16,9 @@ type Service interface { } type Poller struct { - name string - service Service - shouldStop atomic.Bool - ticker *time.Ticker + name string + service Service + ticker *time.Ticker } var ErrInvalidPollingInterval = errors.New("polling interval must be greater than zero") @@ -33,10 +31,8 @@ func New(name string, service Service, pollingInterval time.Duration) (*Poller, return &Poller{name: name, service: service, ticker: ticker}, nil } -func (poller *Poller) Start(ctx context.Context, ready chan<- struct{}) error { - ready <- struct{}{} - - slog.Debug(fmt.Sprintf("%s poller started", poller.name)) +func (poller *Poller) Start(ctx context.Context) error { + slog.Debug(fmt.Sprintf("%s: poller started", poller.name)) for { // Runs the service's inner routine. @@ -45,19 +41,13 @@ func (poller *Poller) Start(ctx context.Context, ready chan<- struct{}) error { return err } - // Checks if the service was ordered to stop. - if poller.shouldStop.Load() { - poller.shouldStop.Store(false) - slog.Debug(fmt.Sprintf("%s poller stopped", poller.name)) + // Waits for the polling interval to elapse (or for the context to be canceled). + select { + case <-poller.ticker.C: + continue + case <-ctx.Done(): + poller.ticker.Stop() return nil } - - // Waits for the polling interval to elapse. - <-poller.ticker.C } } - -// Stop orders the service to stop, which will happen before the next poll. -func (poller *Poller) Stop() { - poller.shouldStop.Store(true) -} diff --git a/internal/node/config/config.go b/internal/node/config/config.go index c2b7d7123..aa83c63ee 100644 --- a/internal/node/config/config.go +++ b/internal/node/config/config.go @@ -157,3 +157,26 @@ func authFromEnv() Auth { panic("invalid auth kind") } } + +// ------------------------------------------------------------------------------------------------ + +type AdvancerConfig struct { + LogLevel LogLevel + LogPrettyEnabled bool + PostgresEndpoint Redacted[string] + PostgresSslMode bool + AdvancerPollingInterval Duration + MachineServerVerbosity cartesimachine.ServerVerbosity +} + +func GetAdvancerConfig() AdvancerConfig { + var config AdvancerConfig + config.LogLevel = getLogLevel() + config.LogPrettyEnabled = getLogPrettyEnabled() + config.PostgresEndpoint = Redacted[string]{getPostgresEndpoint()} + config.PostgresSslMode = getPostgresSslEnabled() + config.AdvancerPollingInterval = getAdvancerPollingInterval() + // Temporary. + config.MachineServerVerbosity = cartesimachine.ServerVerbosity(getMachineServerVerbosity()) + return config +} diff --git a/internal/node/startup/startup.go b/internal/node/startup/startup.go index a57a58410..38d4bd5fa 100644 --- a/internal/node/startup/startup.go +++ b/internal/node/startup/startup.go @@ -20,9 +20,8 @@ import ( ) // Validates the Node Database Schema Version -func ValidateSchema(config config.NodeConfig) error { - endpoint := config.PostgresEndpoint.Value - if config.PostgresSslDisabled { +func ValidateSchema(endpoint string, sslMode bool) error { + if sslMode { endpoint += "?sslmode=disable" } @@ -37,11 +36,11 @@ func ValidateSchema(config config.NodeConfig) error { } // Configure the node logs -func ConfigLogs(config config.NodeConfig) { +func ConfigLogs(logLevel slog.Level, logPrettyEnabled bool) { opts := &tint.Options{ - Level: config.LogLevel, - AddSource: config.LogLevel == slog.LevelDebug, - NoColor: !config.LogPrettyEnabled || !isatty.IsTerminal(os.Stdout.Fd()), + Level: logLevel, + AddSource: logLevel == slog.LevelDebug, + NoColor: !logPrettyEnabled || !isatty.IsTerminal(os.Stdout.Fd()), TimeFormat: "2006-01-02T15:04:05.000", // RFC3339 with milliseconds and without timezone } handler := tint.NewHandler(os.Stdout, opts) diff --git a/internal/repository/machine.go b/internal/repository/machine.go index 59a09394b..16a38e35b 100644 --- a/internal/repository/machine.go +++ b/internal/repository/machine.go @@ -99,7 +99,7 @@ func (repo *MachineRepository) GetUnprocessedInputs( SELECT id, application_address, raw_data FROM input WHERE status = 'NONE' - AND application_address IN %s + AND application_address IN %s ORDER BY index ASC, application_address `, addressesToSqlInValues(apps)) // NOTE: not sanitized rows, err := repo.db.Query(ctx, query)