diff --git a/cmd/container.go b/cmd/container.go index 7670505b2..7f170442e 100644 --- a/cmd/container.go +++ b/cmd/container.go @@ -43,6 +43,7 @@ func resolveOptions(v *viper.Viper, userOptions ...fx.Option) []fx.Option { // Handle OpenTelemetry options = append(options, otlptraces.CLITracesModule(v)) + redisLockStrategy := false switch v.GetString(lockStrategyFlag) { case "redis": var tlsConfig *tls.Config @@ -58,13 +59,15 @@ func resolveOptions(v *viper.Viper, userOptions ...fx.Option) []fx.Option { LockRetry: v.GetDuration(lockStrategyRedisRetryFlag), TLSConfig: tlsConfig, })) + redisLockStrategy = true } // Handle api part options = append(options, api.Module(api.Config{ - StorageDriver: v.GetString(storageDriverFlag), - Version: Version, - UseScopes: v.GetBool(authBearerUseScopesFlag), + StorageDriver: v.GetString(storageDriverFlag), + Version: Version, + UseScopes: v.GetBool(authBearerUseScopesFlag), + RedisLockStrategy: redisLockStrategy, })) // Handle storage driver diff --git a/pkg/api/api.go b/pkg/api/api.go index ca25373cc..487e8a6a8 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -29,9 +29,10 @@ func NewAPI(routes *routes.Routes) *API { } type Config struct { - StorageDriver string - Version string - UseScopes bool + StorageDriver string + Version string + UseScopes bool + RedisLockStrategy bool } func Module(cfg Config) fx.Option { diff --git a/pkg/storage/sqlstorage/accounts.go b/pkg/storage/sqlstorage/accounts.go index 7543ac225..de17783e6 100644 --- a/pkg/storage/sqlstorage/accounts.go +++ b/pkg/storage/sqlstorage/accounts.go @@ -202,7 +202,9 @@ func (s *Store) GetAccounts(ctx context.Context, q ledger.AccountsQuery) (api.Cu func (s *Store) GetAccount(ctx context.Context, addr string) (*core.Account, error) { entry, ok := s.cache.Get(addr) - if ok { + // When having a single instance of the ledger, we can use the cached account. + // Otherwise, compute it every single time for now. + if s.singleWriter && ok { return entry.(*core.AccountWithVolumes).Account.Copy(), nil } diff --git a/pkg/storage/sqlstorage/aggregations.go b/pkg/storage/sqlstorage/aggregations.go index 7158172f4..9d83b06b3 100644 --- a/pkg/storage/sqlstorage/aggregations.go +++ b/pkg/storage/sqlstorage/aggregations.go @@ -13,7 +13,9 @@ import ( func (s *Store) GetAccountWithVolumes(ctx context.Context, address string) (*core.AccountWithVolumes, error) { account, ok := s.cache.Get(address) - if ok { + // When having a single instance of the ledger, we can use the cached account. + // Otherwise, compute it every single time for now. + if s.singleWriter && ok { return account.(*core.AccountWithVolumes).Copy(), nil } diff --git a/pkg/storage/sqlstorage/driver.go b/pkg/storage/sqlstorage/driver.go index f5fe0458c..873e44479 100644 --- a/pkg/storage/sqlstorage/driver.go +++ b/pkg/storage/sqlstorage/driver.go @@ -104,6 +104,7 @@ type Driver struct { systemSchema Schema registeredLedgers map[string]*Store lock sync.Mutex + redisLockStrategy bool } func (d *Driver) GetSystemStore() storage.SystemStore { @@ -154,7 +155,7 @@ func (d *Driver) GetLedgerStore(ctx context.Context, name string, create bool) ( return nil, false, err } - ret = NewStore(schema, defaultExecutorProvider(schema), func(ctx context.Context) error { + ret = NewStore(schema, d.redisLockStrategy, defaultExecutorProvider(schema), func(ctx context.Context) error { d.lock.Lock() defer d.lock.Unlock() @@ -228,11 +229,12 @@ func (d *Driver) Close(ctx context.Context) error { return d.db.Close(ctx) } -func NewDriver(name string, db DB) *Driver { +func NewDriver(name string, db DB, redisLockStrategy bool) *Driver { return &Driver{ db: db, name: name, registeredLedgers: map[string]*Store{}, + redisLockStrategy: redisLockStrategy, } } diff --git a/pkg/storage/sqlstorage/logs.go b/pkg/storage/sqlstorage/logs.go index ebcecb809..6bf7e9ead 100644 --- a/pkg/storage/sqlstorage/logs.go +++ b/pkg/storage/sqlstorage/logs.go @@ -77,7 +77,9 @@ func (s *Store) appendLog(ctx context.Context, log ...core.Log) error { } func (s *Store) GetLastLog(ctx context.Context) (*core.Log, error) { - if s.lastLog == nil { + // When having a single instance of the ledger, we can use the cached last log. + // Otherwise, compute it every single time for now. + if !s.singleWriter || s.lastLog == nil { sb := sqlbuilder.NewSelectBuilder() sb.From(s.schema.Table("log")) sb.Select("id", "type", "hash", "date", "data") diff --git a/pkg/storage/sqlstorage/module.go b/pkg/storage/sqlstorage/module.go index ecbd03e6e..3b14e11cf 100644 --- a/pkg/storage/sqlstorage/module.go +++ b/pkg/storage/sqlstorage/module.go @@ -21,9 +21,10 @@ type PostgresConfig struct { } type ModuleConfig struct { - StorageDriver string - SQLiteConfig *SQLiteConfig - PostgresConfig *PostgresConfig + StorageDriver string + SQLiteConfig *SQLiteConfig + PostgresConfig *PostgresConfig + RedisLockStrategy bool } func OpenSQLDB(flavor Flavor, dataSourceName string) (*sql.DB, error) { @@ -49,7 +50,7 @@ func DriverModule(cfg ModuleConfig) fx.Option { return NewPostgresDB(db) })) options = append(options, fx.Provide(func(db DB) (*Driver, error) { - return NewDriver(PostgreSQL.String(), db), nil + return NewDriver(PostgreSQL.String(), db, cfg.RedisLockStrategy), nil })) options = append(options, health.ProvideHealthCheck(func(db *sql.DB) health.NamedCheck { return health.NewNamedCheck(PostgreSQL.String(), health.CheckFn(db.PingContext)) @@ -59,7 +60,7 @@ func DriverModule(cfg ModuleConfig) fx.Option { return NewSQLiteDB(cfg.SQLiteConfig.Dir, cfg.SQLiteConfig.DBName) })) options = append(options, fx.Provide(func(db DB) (*Driver, error) { - return NewDriver(SQLite.String(), db), nil + return NewDriver(SQLite.String(), db, cfg.RedisLockStrategy), nil })) options = append(options, health.ProvideHealthCheck(func() health.NamedCheck { return health.NewNamedCheck(SQLite.String(), health.CheckFn(func(ctx context.Context) error { diff --git a/pkg/storage/sqlstorage/store_ledger.go b/pkg/storage/sqlstorage/store_ledger.go index 0112654d4..edf5ee05a 100644 --- a/pkg/storage/sqlstorage/store_ledger.go +++ b/pkg/storage/sqlstorage/store_ledger.go @@ -24,6 +24,7 @@ type Store struct { lastLog *core.Log lastTx *core.ExpandedTransaction cache *cache.Cache + singleWriter bool } func (s *Store) error(err error) error { @@ -63,7 +64,7 @@ func (s *Store) Close(ctx context.Context) error { return s.onClose(ctx) } -func NewStore(schema Schema, executorProvider func(ctx context.Context) (executor, error), +func NewStore(schema Schema, singleWriter bool, executorProvider func(ctx context.Context) (executor, error), onClose, onDelete func(ctx context.Context) error) *Store { return &Store{ @@ -71,6 +72,7 @@ func NewStore(schema Schema, executorProvider func(ctx context.Context) (executo schema: schema, onClose: onClose, onDelete: onDelete, + singleWriter: singleWriter, cache: cache.New(5*time.Minute, 10*time.Minute), } } diff --git a/pkg/storage/sqlstorage/transactions.go b/pkg/storage/sqlstorage/transactions.go index 386e8d0f5..bcec8cb69 100644 --- a/pkg/storage/sqlstorage/transactions.go +++ b/pkg/storage/sqlstorage/transactions.go @@ -278,7 +278,9 @@ func (s *Store) GetTransaction(ctx context.Context, txId uint64) (*core.Expanded } func (s *Store) GetLastTransaction(ctx context.Context) (*core.ExpandedTransaction, error) { - if s.lastTx == nil { + // When having a single instance of the ledger, we can use the cached last transaction. + // Otherwise, compute it every single time for now. + if !s.singleWriter || s.lastTx == nil { sb := sqlbuilder.NewSelectBuilder() sb.Select("id", "timestamp", "reference", "metadata", "postings", "pre_commit_volumes", "post_commit_volumes") sb.From(s.schema.Table("transactions"))