Skip to content

Commit

Permalink
Let other services use config cache (#4166)
Browse files Browse the repository at this point in the history
* chore(pkg): export config cache to other services

This is the first general step toward fixing #3892.
It allows for accessing config cache service from other
services. Actually using config cache service will be
implemented in separate PRs.

Ref #3892

* feat(configcache): add ReadAll method

It's going to be useful when fixing #3892.

Ref #3892

* fix(pkg): init config cache with cluster
  • Loading branch information
Michal-Leszczynski authored Dec 17, 2024
1 parent 0b4609a commit ad742f6
Show file tree
Hide file tree
Showing 11 changed files with 148 additions and 24 deletions.
3 changes: 3 additions & 0 deletions pkg/cmd/scylla-manager/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func (s *server) makeServices(ctx context.Context) error {
metrics.NewRepairMetrics().MustRegister(),
s.clusterSvc.Client,
s.clusterSvc.GetSession,
s.configCacheSvc,
s.logger.Named("repair"),
)
if err != nil {
Expand All @@ -115,6 +116,7 @@ func (s *server) makeServices(ctx context.Context) error {
s.clusterSvc.GetClusterName,
s.clusterSvc.Client,
s.clusterSvc.GetSession,
s.configCacheSvc,
s.logger.Named("backup"),
)
if err != nil {
Expand All @@ -128,6 +130,7 @@ func (s *server) makeServices(ctx context.Context) error {
metrics.NewRestoreMetrics().MustRegister(),
s.clusterSvc.Client,
s.clusterSvc.GetSession,
s.configCacheSvc,
s.logger.Named("restore"),
)
if err != nil {
Expand Down
8 changes: 6 additions & 2 deletions pkg/service/backup/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/scylladb/scylla-manager/v3/pkg/scyllaclient"
. "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec"
"github.com/scylladb/scylla-manager/v3/pkg/service/cluster"
"github.com/scylladb/scylla-manager/v3/pkg/service/configcache"
"github.com/scylladb/scylla-manager/v3/pkg/service/scheduler"
"github.com/scylladb/scylla-manager/v3/pkg/util"
"github.com/scylladb/scylla-manager/v3/pkg/util/inexlist/dcfilter"
Expand All @@ -45,13 +46,15 @@ type Service struct {
clusterName cluster.NameFunc
scyllaClient scyllaclient.ProviderFunc
clusterSession cluster.SessionFunc
configCache configcache.ConfigCacher
logger log.Logger

dth deduplicateTestHooks
}

func NewService(session gocqlx.Session, config Config, metrics metrics.BackupMetrics,
clusterName cluster.NameFunc, scyllaClient scyllaclient.ProviderFunc, clusterSession cluster.SessionFunc, logger log.Logger,
func NewService(session gocqlx.Session, config Config, metrics metrics.BackupMetrics, clusterName cluster.NameFunc,
scyllaClient scyllaclient.ProviderFunc, clusterSession cluster.SessionFunc, configCache configcache.ConfigCacher,
logger log.Logger,
) (*Service, error) {
if session.Session == nil || session.Closed() {
return nil, errors.New("invalid session")
Expand All @@ -76,6 +79,7 @@ func NewService(session gocqlx.Session, config Config, metrics metrics.BackupMet
clusterName: clusterName,
scyllaClient: scyllaClient,
clusterSession: clusterSession,
configCache: configCache,
logger: logger,
}, nil
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/service/backup/service_backup_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ func newBackupTestHelperWithUser(t *testing.T, session gocqlx.Session, config ba
S3InitBucket(t, location.Path)

clusterID := uuid.MustRandom()

logger := log.NewDevelopmentWithLevel(zapcore.InfoLevel)

hrt := NewHackableRoundTripper(scyllaclient.DefaultTransport())
client := newTestClient(t, hrt, logger.Named("client"), clientConf)
service := newTestServiceWithUser(t, session, client, config, logger, user, pass)
service := newTestServiceWithUser(t, session, client, config, logger, clusterID, user, pass)
cHelper := &CommonTestHelper{
Session: session,
Hrt: hrt,
Expand Down Expand Up @@ -108,7 +108,7 @@ func newTestClient(t *testing.T, hrt *HackableRoundTripper, logger log.Logger, c
return c
}

func newTestServiceWithUser(t *testing.T, session gocqlx.Session, client *scyllaclient.Client, c backup.Config, logger log.Logger, user, pass string) *backup.Service {
func newTestServiceWithUser(t *testing.T, session gocqlx.Session, client *scyllaclient.Client, c backup.Config, logger log.Logger, clusterID uuid.UUID, user, pass string) *backup.Service {
t.Helper()

s, err := backup.NewService(
Expand All @@ -127,6 +127,7 @@ func newTestServiceWithUser(t *testing.T, session gocqlx.Session, client *scylla
}
return CreateManagedClusterSession(t, false, client, user, pass), nil
},
NewTestConfigCacheSvc(t, clusterID, client.Config().Hosts),
logger.Named("backup"),
)
if err != nil {
Expand Down
20 changes: 20 additions & 0 deletions pkg/service/configcache/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ type ConfigCacher interface {
// or ErrNoHostConfig if config of the particular host doesn't exist.
Read(clusterID uuid.UUID, host string) (NodeConfig, error)

// ReadAll calls Read on all AvailableHosts.
ReadAll(clusterID uuid.UUID) (map[string]NodeConfig, error)

// AvailableHosts returns list of hosts of given cluster that keep their configuration in cache.
AvailableHosts(ctx context.Context, clusterID uuid.UUID) ([]string, error)

Expand Down Expand Up @@ -88,6 +91,23 @@ func (svc *Service) Read(clusterID uuid.UUID, host string) (NodeConfig, error) {
return hostConfig, nil
}

// ReadAll calls Read on AvailableHosts.
func (svc *Service) ReadAll(clusterID uuid.UUID) (map[string]NodeConfig, error) {
clusterConfig, err := svc.readClusterConfig(clusterID)
if err != nil {
return nil, err
}

out := make(map[string]NodeConfig)
clusterConfig.Range(func(key, value any) bool {
host := key.(string)
cfg := value.(NodeConfig)
out[host] = cfg
return true
})
return out, nil
}

// Run starts the infinity loop responsible for updating the clusters configuration periodically.
func (svc *Service) Run(ctx context.Context) {
freq := time.NewTicker(svc.svcConfig.UpdateFrequency)
Expand Down
6 changes: 5 additions & 1 deletion pkg/service/repair/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/scylladb/scylla-manager/v3/pkg/schema/table"
"github.com/scylladb/scylla-manager/v3/pkg/scyllaclient"
"github.com/scylladb/scylla-manager/v3/pkg/service/cluster"
"github.com/scylladb/scylla-manager/v3/pkg/service/configcache"
"github.com/scylladb/scylla-manager/v3/pkg/util"
"github.com/scylladb/scylla-manager/v3/pkg/util/inexlist/dcfilter"
"github.com/scylladb/scylla-manager/v3/pkg/util/inexlist/ksfilter"
Expand All @@ -38,14 +39,16 @@ type Service struct {

scyllaClient scyllaclient.ProviderFunc
clusterSession cluster.SessionFunc
configCache configcache.ConfigCacher
logger log.Logger

intensityHandlers map[uuid.UUID]*intensityParallelHandler
mu sync.Mutex
}

func NewService(session gocqlx.Session, config Config, metrics metrics.RepairMetrics,
scyllaClient scyllaclient.ProviderFunc, clusterSession cluster.SessionFunc, logger log.Logger,
scyllaClient scyllaclient.ProviderFunc, clusterSession cluster.SessionFunc, configCache configcache.ConfigCacher,
logger log.Logger,
) (*Service, error) {
if err := config.Validate(); err != nil {
return nil, errors.Wrap(err, "invalid config")
Expand All @@ -61,6 +64,7 @@ func NewService(session gocqlx.Session, config Config, metrics metrics.RepairMet
metrics: metrics,
scyllaClient: scyllaClient,
clusterSession: clusterSession,
configCache: configCache,
logger: logger,
intensityHandlers: make(map[uuid.UUID]*intensityParallelHandler),
}, nil
Expand Down
16 changes: 10 additions & 6 deletions pkg/service/repair/service_repair_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,21 @@ type repairTestHelper struct {
func newRepairTestHelper(t *testing.T, session gocqlx.Session, config repair.Config) *repairTestHelper {
t.Helper()

clusterID := uuid.MustRandom()
logger := log.NewDevelopmentWithLevel(zapcore.InfoLevel)

hrt := NewHackableRoundTripper(scyllaclient.DefaultTransport())
hrt.SetInterceptor(repairInterceptor(scyllaclient.CommandSuccessful))
c := newTestClient(t, hrt, log.NopLogger)
s := newTestService(t, session, c, config, logger)
s := newTestService(t, session, c, config, logger, clusterID)

return &repairTestHelper{
CommonTestHelper: &CommonTestHelper{
Logger: logger,
Session: session,
Hrt: hrt,
Client: c,
ClusterID: uuid.MustRandom(),
ClusterID: clusterID,
TaskID: uuid.MustRandom(),
RunID: uuid.NewTime(),
T: t,
Expand All @@ -86,16 +87,17 @@ func newRepairWithClusterSessionTestHelper(t *testing.T, session gocqlx.Session,
hrt *HackableRoundTripper, c *scyllaclient.Client, config repair.Config) *repairTestHelper {
t.Helper()

clusterID := uuid.MustRandom()
logger := log.NewDevelopmentWithLevel(zapcore.InfoLevel)
s := newTestServiceWithClusterSession(t, session, c, config, logger)
s := newTestServiceWithClusterSession(t, session, c, config, logger, clusterID)

return &repairTestHelper{
CommonTestHelper: &CommonTestHelper{
Logger: logger,
Session: session,
Hrt: hrt,
Client: c,
ClusterID: uuid.MustRandom(),
ClusterID: clusterID,
TaskID: uuid.MustRandom(),
RunID: uuid.NewTime(),
T: t,
Expand Down Expand Up @@ -389,7 +391,7 @@ func newTestClient(t *testing.T, hrt *HackableRoundTripper, logger log.Logger) *
return c
}

func newTestService(t *testing.T, session gocqlx.Session, client *scyllaclient.Client, c repair.Config, logger log.Logger) *repair.Service {
func newTestService(t *testing.T, session gocqlx.Session, client *scyllaclient.Client, c repair.Config, logger log.Logger, clusterID uuid.UUID) *repair.Service {
t.Helper()

s, err := repair.NewService(
Expand All @@ -402,6 +404,7 @@ func newTestService(t *testing.T, session gocqlx.Session, client *scyllaclient.C
func(ctx context.Context, clusterID uuid.UUID, _ ...cluster.SessionConfigOption) (gocqlx.Session, error) {
return gocqlx.Session{}, errors.New("not implemented")
},
NewTestConfigCacheSvc(t, clusterID, client.Config().Hosts),
logger.Named("repair"),
)
if err != nil {
Expand All @@ -411,7 +414,7 @@ func newTestService(t *testing.T, session gocqlx.Session, client *scyllaclient.C
return s
}

func newTestServiceWithClusterSession(t *testing.T, session gocqlx.Session, client *scyllaclient.Client, c repair.Config, logger log.Logger) *repair.Service {
func newTestServiceWithClusterSession(t *testing.T, session gocqlx.Session, client *scyllaclient.Client, c repair.Config, logger log.Logger, clusterID uuid.UUID) *repair.Service {
t.Helper()

s, err := repair.NewService(
Expand All @@ -424,6 +427,7 @@ func newTestServiceWithClusterSession(t *testing.T, session gocqlx.Session, clie
func(ctx context.Context, clusterID uuid.UUID, _ ...cluster.SessionConfigOption) (gocqlx.Session, error) {
return CreateSession(t, client), nil
},
NewTestConfigCacheSvc(t, clusterID, client.Config().Hosts),
logger.Named("repair"),
)
if err != nil {
Expand Down
13 changes: 9 additions & 4 deletions pkg/service/restore/helper_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,15 @@ func newTestHelper(t *testing.T, srcHosts, dstHosts []string) *testHelper {

return &testHelper{
srcCluster: srcCluster,
srcBackupSvc: newBackupSvc(t, srcCluster.Session, srcCluster.Client),
srcBackupSvc: newBackupSvc(t, srcCluster.Session, srcCluster.Client, srcCluster.ClusterID),
dstCluster: dstCluster,
dstRestoreSvc: newRestoreSvc(t, dstCluster.Session, dstCluster.Client, user, pass),
dstRestoreSvc: newRestoreSvc(t, dstCluster.Session, dstCluster.Client, dstCluster.ClusterID, user, pass),
dstUser: user,
dstPass: pass,
}
}

func newBackupSvc(t *testing.T, mgrSession gocqlx.Session, client *scyllaclient.Client) *backup.Service {
func newBackupSvc(t *testing.T, mgrSession gocqlx.Session, client *scyllaclient.Client, clusterID uuid.UUID) *backup.Service {
svc, err := backup.NewService(
mgrSession,
defaultBackupTestConfig(),
Expand All @@ -135,6 +135,7 @@ func newBackupSvc(t *testing.T, mgrSession gocqlx.Session, client *scyllaclient.
func(ctx context.Context, clusterID uuid.UUID, _ ...cluster.SessionConfigOption) (gocqlx.Session, error) {
return CreateSession(t, client), nil
},
NewTestConfigCacheSvc(t, clusterID, client.Config().Hosts),
log.NewDevelopmentWithLevel(zapcore.ErrorLevel).Named("backup"),
)
if err != nil {
Expand All @@ -143,7 +144,9 @@ func newBackupSvc(t *testing.T, mgrSession gocqlx.Session, client *scyllaclient.
return svc
}

func newRestoreSvc(t *testing.T, mgrSession gocqlx.Session, client *scyllaclient.Client, user, pass string) *Service {
func newRestoreSvc(t *testing.T, mgrSession gocqlx.Session, client *scyllaclient.Client, clusterID uuid.UUID, user, pass string) *Service {
configCacheSvc := NewTestConfigCacheSvc(t, clusterID, client.Config().Hosts)

repairSvc, err := repair.NewService(
mgrSession,
repair.DefaultConfig(),
Expand All @@ -154,6 +157,7 @@ func newRestoreSvc(t *testing.T, mgrSession gocqlx.Session, client *scyllaclient
func(ctx context.Context, clusterID uuid.UUID, _ ...cluster.SessionConfigOption) (gocqlx.Session, error) {
return CreateSession(t, client), nil
},
configCacheSvc,
log.NewDevelopmentWithLevel(zapcore.ErrorLevel).Named("repair"),
)
if err != nil {
Expand All @@ -171,6 +175,7 @@ func newRestoreSvc(t *testing.T, mgrSession gocqlx.Session, client *scyllaclient
func(ctx context.Context, clusterID uuid.UUID, _ ...cluster.SessionConfigOption) (gocqlx.Session, error) {
return CreateManagedClusterSession(t, false, client, user, pass), nil
},
configCacheSvc,
log.NewDevelopmentWithLevel(zapcore.InfoLevel).Named("restore"),
)
if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion pkg/service/restore/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/scylladb/scylla-manager/v3/pkg/schema/table"
"github.com/scylladb/scylla-manager/v3/pkg/scyllaclient"
"github.com/scylladb/scylla-manager/v3/pkg/service/cluster"
"github.com/scylladb/scylla-manager/v3/pkg/service/configcache"
"github.com/scylladb/scylla-manager/v3/pkg/service/repair"
"github.com/scylladb/scylla-manager/v3/pkg/util/uuid"
)
Expand All @@ -29,11 +30,13 @@ type Service struct {

scyllaClient scyllaclient.ProviderFunc
clusterSession cluster.SessionFunc
configCache configcache.ConfigCacher
logger log.Logger
}

func NewService(repairSvc *repair.Service, session gocqlx.Session, config Config, metrics metrics.RestoreMetrics,
scyllaClient scyllaclient.ProviderFunc, clusterSession cluster.SessionFunc, logger log.Logger,
scyllaClient scyllaclient.ProviderFunc, clusterSession cluster.SessionFunc, configCache configcache.ConfigCacher,
logger log.Logger,
) (*Service, error) {
if session.Session == nil || session.Closed() {
return nil, errors.New("invalid session")
Expand All @@ -53,6 +56,7 @@ func NewService(repairSvc *repair.Service, session gocqlx.Session, config Config
metrics: metrics,
scyllaClient: scyllaClient,
clusterSession: clusterSession,
configCache: configCache,
logger: logger,
}, nil
}
Expand Down
9 changes: 7 additions & 2 deletions pkg/service/restore/service_restore_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func newRestoreTestHelper(t *testing.T, session gocqlx.Session, config Config, l
logger := log.NewDevelopmentWithLevel(zapcore.InfoLevel)
hrt := NewHackableRoundTripper(scyllaclient.DefaultTransport())
client := newTestClient(t, hrt, logger.Named("client"), clientConf)
service, backupSvc := newTestService(t, session, client, config, logger, user, pass)
service, backupSvc := newTestService(t, session, client, config, logger, clusterID, user, pass)
cHelper := &CommonTestHelper{
Session: session,
Hrt: hrt,
Expand Down Expand Up @@ -106,9 +106,11 @@ func newTestClient(t *testing.T, hrt *HackableRoundTripper, logger log.Logger, c
return c
}

func newTestService(t *testing.T, session gocqlx.Session, client *scyllaclient.Client, c Config, logger log.Logger, user, pass string) (*Service, *backup.Service) {
func newTestService(t *testing.T, session gocqlx.Session, client *scyllaclient.Client, c Config, logger log.Logger, clusterID uuid.UUID, user, pass string) (*Service, *backup.Service) {
t.Helper()

configCacheSvc := NewTestConfigCacheSvc(t, clusterID, client.Config().Hosts)

repairSvc, err := repair.NewService(
session,
repair.DefaultConfig(),
Expand All @@ -119,6 +121,7 @@ func newTestService(t *testing.T, session gocqlx.Session, client *scyllaclient.C
func(ctx context.Context, clusterID uuid.UUID, _ ...cluster.SessionConfigOption) (gocqlx.Session, error) {
return CreateManagedClusterSession(t, false, client, user, pass), nil
},
configCacheSvc,
log.NewDevelopmentWithLevel(zapcore.ErrorLevel).Named("repair"),
)
if err != nil {
Expand All @@ -138,6 +141,7 @@ func newTestService(t *testing.T, session gocqlx.Session, client *scyllaclient.C
func(ctx context.Context, clusterID uuid.UUID, _ ...cluster.SessionConfigOption) (gocqlx.Session, error) {
return CreateManagedClusterSession(t, false, client, user, pass), nil
},
configCacheSvc,
log.NewDevelopmentWithLevel(zapcore.ErrorLevel).Named("backup"),
)
if err != nil {
Expand All @@ -155,6 +159,7 @@ func newTestService(t *testing.T, session gocqlx.Session, client *scyllaclient.C
func(ctx context.Context, clusterID uuid.UUID, _ ...cluster.SessionConfigOption) (gocqlx.Session, error) {
return CreateManagedClusterSession(t, false, client, user, pass), nil
},
configCacheSvc,
logger.Named("restore"),
)
if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion pkg/testutils/testconfig/testconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,11 @@ func ScyllaManagerDBCluster() string {
// IsSSLEnabled is a helper function to parse SSL_ENABLED env var.
// SSL_ENABLED env var indicates if scylla cluster is configured to use ssl or not.
func IsSSLEnabled() bool {
sslEnabled, err := strconv.ParseBool(os.Getenv("SSL_ENABLED"))
raw := os.Getenv("SSL_ENABLED")
if raw == "" {
return false
}
sslEnabled, err := strconv.ParseBool(raw)
if err != nil {
panic("parse SSL_ENABLED env var:" + err.Error())
}
Expand Down
Loading

0 comments on commit ad742f6

Please sign in to comment.