Skip to content

Commit

Permalink
fix(ledger): do not use caches when using multiple instances
Browse files Browse the repository at this point in the history
  • Loading branch information
paul-nicolas committed Oct 2, 2023
1 parent 8285d25 commit 57fca75
Show file tree
Hide file tree
Showing 9 changed files with 35 additions and 18 deletions.
9 changes: 6 additions & 3 deletions cmd/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func resolveOptions(v *viper.Viper, userOptions ...fx.Option) []fx.Option {
// Handle OpenTelemetry
options = append(options, otlptraces.CLITracesModule(v))

redisLockStrategy := false
switch v.GetString(lockStrategyFlag) {
case "redis":
var tlsConfig *tls.Config
Expand All @@ -58,13 +59,15 @@ func resolveOptions(v *viper.Viper, userOptions ...fx.Option) []fx.Option {
LockRetry: v.GetDuration(lockStrategyRedisRetryFlag),
TLSConfig: tlsConfig,
}))
redisLockStrategy = true
}

// Handle api part
options = append(options, api.Module(api.Config{
StorageDriver: v.GetString(storageDriverFlag),
Version: Version,
UseScopes: v.GetBool(authBearerUseScopesFlag),
StorageDriver: v.GetString(storageDriverFlag),
Version: Version,
UseScopes: v.GetBool(authBearerUseScopesFlag),
RedisLockStrategy: redisLockStrategy,
}))

// Handle storage driver
Expand Down
7 changes: 4 additions & 3 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ func NewAPI(routes *routes.Routes) *API {
}

type Config struct {
StorageDriver string
Version string
UseScopes bool
StorageDriver string
Version string
UseScopes bool
RedisLockStrategy bool
}

func Module(cfg Config) fx.Option {
Expand Down
4 changes: 3 additions & 1 deletion pkg/storage/sqlstorage/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,9 @@ func (s *Store) GetAccounts(ctx context.Context, q ledger.AccountsQuery) (api.Cu
func (s *Store) GetAccount(ctx context.Context, addr string) (*core.Account, error) {

entry, ok := s.cache.Get(addr)
if ok {
// When having a single instance of the ledger, we can use the cached account.
// Otherwise, compute it every single time for now.
if s.singleWriter && ok {
return entry.(*core.AccountWithVolumes).Account.Copy(), nil
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/storage/sqlstorage/aggregations.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import (

func (s *Store) GetAccountWithVolumes(ctx context.Context, address string) (*core.AccountWithVolumes, error) {
account, ok := s.cache.Get(address)
if ok {
// When having a single instance of the ledger, we can use the cached account.
// Otherwise, compute it every single time for now.
if s.singleWriter && ok {
return account.(*core.AccountWithVolumes).Copy(), nil
}

Expand Down
6 changes: 4 additions & 2 deletions pkg/storage/sqlstorage/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ type Driver struct {
systemSchema Schema
registeredLedgers map[string]*Store
lock sync.Mutex
redisLockStrategy bool
}

func (d *Driver) GetSystemStore() storage.SystemStore {
Expand Down Expand Up @@ -154,7 +155,7 @@ func (d *Driver) GetLedgerStore(ctx context.Context, name string, create bool) (
return nil, false, err
}

ret = NewStore(schema, defaultExecutorProvider(schema), func(ctx context.Context) error {
ret = NewStore(schema, d.redisLockStrategy, defaultExecutorProvider(schema), func(ctx context.Context) error {
d.lock.Lock()
defer d.lock.Unlock()

Expand Down Expand Up @@ -228,11 +229,12 @@ func (d *Driver) Close(ctx context.Context) error {
return d.db.Close(ctx)
}

func NewDriver(name string, db DB) *Driver {
func NewDriver(name string, db DB, redisLockStrategy bool) *Driver {
return &Driver{
db: db,
name: name,
registeredLedgers: map[string]*Store{},
redisLockStrategy: redisLockStrategy,
}
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/storage/sqlstorage/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ func (s *Store) appendLog(ctx context.Context, log ...core.Log) error {
}

func (s *Store) GetLastLog(ctx context.Context) (*core.Log, error) {
if s.lastLog == nil {
// When having a single instance of the ledger, we can use the cached last log.
// Otherwise, compute it every single time for now.
if !s.singleWriter || s.lastLog == nil {
sb := sqlbuilder.NewSelectBuilder()
sb.From(s.schema.Table("log"))
sb.Select("id", "type", "hash", "date", "data")
Expand Down
11 changes: 6 additions & 5 deletions pkg/storage/sqlstorage/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ type PostgresConfig struct {
}

type ModuleConfig struct {
StorageDriver string
SQLiteConfig *SQLiteConfig
PostgresConfig *PostgresConfig
StorageDriver string
SQLiteConfig *SQLiteConfig
PostgresConfig *PostgresConfig
RedisLockStrategy bool
}

func OpenSQLDB(flavor Flavor, dataSourceName string) (*sql.DB, error) {
Expand All @@ -49,7 +50,7 @@ func DriverModule(cfg ModuleConfig) fx.Option {
return NewPostgresDB(db)
}))
options = append(options, fx.Provide(func(db DB) (*Driver, error) {
return NewDriver(PostgreSQL.String(), db), nil
return NewDriver(PostgreSQL.String(), db, cfg.RedisLockStrategy), nil
}))
options = append(options, health.ProvideHealthCheck(func(db *sql.DB) health.NamedCheck {
return health.NewNamedCheck(PostgreSQL.String(), health.CheckFn(db.PingContext))
Expand All @@ -59,7 +60,7 @@ func DriverModule(cfg ModuleConfig) fx.Option {
return NewSQLiteDB(cfg.SQLiteConfig.Dir, cfg.SQLiteConfig.DBName)
}))
options = append(options, fx.Provide(func(db DB) (*Driver, error) {
return NewDriver(SQLite.String(), db), nil
return NewDriver(SQLite.String(), db, cfg.RedisLockStrategy), nil
}))
options = append(options, health.ProvideHealthCheck(func() health.NamedCheck {
return health.NewNamedCheck(SQLite.String(), health.CheckFn(func(ctx context.Context) error {
Expand Down
4 changes: 3 additions & 1 deletion pkg/storage/sqlstorage/store_ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Store struct {
lastLog *core.Log
lastTx *core.ExpandedTransaction
cache *cache.Cache
singleWriter bool
}

func (s *Store) error(err error) error {
Expand Down Expand Up @@ -63,14 +64,15 @@ func (s *Store) Close(ctx context.Context) error {
return s.onClose(ctx)
}

func NewStore(schema Schema, executorProvider func(ctx context.Context) (executor, error),
func NewStore(schema Schema, singleWriter bool, executorProvider func(ctx context.Context) (executor, error),
onClose, onDelete func(ctx context.Context) error) *Store {

return &Store{
executorProvider: executorProvider,
schema: schema,
onClose: onClose,
onDelete: onDelete,
singleWriter: singleWriter,
cache: cache.New(5*time.Minute, 10*time.Minute),
}
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/storage/sqlstorage/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,9 @@ func (s *Store) GetTransaction(ctx context.Context, txId uint64) (*core.Expanded
}

func (s *Store) GetLastTransaction(ctx context.Context) (*core.ExpandedTransaction, error) {
if s.lastTx == nil {
// When having a single instance of the ledger, we can use the cached last transaction.
// Otherwise, compute it every single time for now.
if !s.singleWriter || s.lastTx == nil {
sb := sqlbuilder.NewSelectBuilder()
sb.Select("id", "timestamp", "reference", "metadata", "postings", "pre_commit_volumes", "post_commit_volumes")
sb.From(s.schema.Table("transactions"))
Expand Down

0 comments on commit 57fca75

Please sign in to comment.