diff --git a/cmd/server/cadence/cadence.go b/cmd/server/cadence/cadence.go index fbe9256db92..40731d41cc5 100644 --- a/cmd/server/cadence/cadence.go +++ b/cmd/server/cadence/cadence.go @@ -34,6 +34,7 @@ import ( "github.com/uber/cadence/common" "github.com/uber/cadence/common/client" "github.com/uber/cadence/common/config" + "github.com/uber/cadence/common/log/loggerimpl" "github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra/gocql" "github.com/uber/cadence/common/service" "github.com/uber/cadence/tools/cassandra" @@ -57,8 +58,15 @@ func startHandler(c *cli.Context) error { if err != nil { return fmt.Errorf("Config file corrupted: %w", err) } + + zapLogger, err := cfg.Log.NewZapLogger() + if err != nil { + log.Fatal("failed to create the zap logger, err: ", err.Error()) + } + rootLogger := loggerimpl.NewLogger(zapLogger) + if cfg.Log.Level == "debug" { - log.Printf("config=%v", cfg.String()) + zapLogger.Sugar().Debugf("config=%v", cfg.String()) } if cfg.DynamicConfig.Client == "" { cfg.DynamicConfigClient.Filepath = constructPathIfNeed(rootDir, cfg.DynamicConfigClient.Filepath) @@ -83,13 +91,13 @@ func startHandler(c *cli.Context) error { sigc := make(chan os.Signal, 1) signal.Notify(sigc, syscall.SIGTERM, syscall.SIGINT) for _, svc := range services { - server := newServer(svc, &cfg) + server := newServer(svc, &cfg, rootLogger) daemons = append(daemons, server) server.Start() } <-sigc - log.Println("Received SIGTERM signal, initiating shutdown.") + rootLogger.Info("Received SIGTERM signal, initiating shutdown.") for _, daemon := range daemons { daemon.Stop() } diff --git a/cmd/server/cadence/server.go b/cmd/server/cadence/server.go index 15d3163cc76..8ef70e8ffd1 100644 --- a/cmd/server/cadence/server.go +++ b/cmd/server/cadence/server.go @@ -40,7 +40,7 @@ import ( "github.com/uber/cadence/common/dynamicconfig/configstore" "github.com/uber/cadence/common/elasticsearch" "github.com/uber/cadence/common/isolationgroup/isolationgroupapi" - "github.com/uber/cadence/common/log/loggerimpl" + cadenceLog "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/membership" "github.com/uber/cadence/common/messaging/kafka" @@ -60,20 +60,22 @@ import ( type ( server struct { - name string - cfg *config.Config - doneC chan struct{} - daemon common.Daemon + name string + cfg *config.Config + doneC chan struct{} + daemon common.Daemon + rootLogger cadenceLog.Logger } ) // newServer returns a new instance of a daemon // that represents a cadence service -func newServer(service string, cfg *config.Config) common.Daemon { +func newServer(service string, cfg *config.Config, rootLogger cadenceLog.Logger) common.Daemon { return &server{ - cfg: cfg, - name: service, - doneC: make(chan struct{}), + cfg: cfg, + name: service, + doneC: make(chan struct{}), + rootLogger: rootLogger, } } @@ -111,11 +113,7 @@ func (s *server) startService() common.Daemon { params := resource.Params{} params.Name = service.FullName(s.name) - zapLogger, err := s.cfg.Log.NewZapLogger() - if err != nil { - log.Fatal("failed to create the zap logger, err: ", err.Error()) - } - params.Logger = loggerimpl.NewLogger(zapLogger).WithTags(tag.Service(params.Name)) + params.Logger = s.rootLogger.WithTags(tag.Service(params.Name)) params.PersistenceConfig = s.cfg.Persistence diff --git a/cmd/server/cadence/server_test.go b/cmd/server/cadence/server_test.go index 5e1913f6233..48a92bb6779 100644 --- a/cmd/server/cadence/server_test.go +++ b/cmd/server/cadence/server_test.go @@ -37,7 +37,9 @@ import ( "github.com/uber/cadence/common" "github.com/uber/cadence/common/config" "github.com/uber/cadence/common/dynamicconfig" + cadenceLog "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/loggerimpl" + "github.com/uber/cadence/common/log/testlogger" "github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra/gocql" "github.com/uber/cadence/common/resource" "github.com/uber/cadence/common/service" @@ -51,6 +53,7 @@ import ( type ServerSuite struct { *require.Assertions suite.Suite + logger cadenceLog.Logger } func TestServerSuite(t *testing.T) { @@ -60,6 +63,7 @@ func TestServerSuite(t *testing.T) { func (s *ServerSuite) SetupTest() { s.Assertions = require.New(s.T()) + s.logger = testlogger.New(s.T()) } /* @@ -107,7 +111,7 @@ func (s *ServerSuite) TestServerStartup() { var daemons []common.Daemon services := service.ShortNames(service.List) for _, svc := range services { - server := newServer(svc, &cfg) + server := newServer(svc, &cfg, s.logger) daemons = append(daemons, server) server.Start() } diff --git a/common/persistence/nosql/nosqlplugin/cassandra/admin.go b/common/persistence/nosql/nosqlplugin/cassandra/admin.go index 262cc668ca3..a850f3e582e 100644 --- a/common/persistence/nosql/nosqlplugin/cassandra/admin.go +++ b/common/persistence/nosql/nosqlplugin/cassandra/admin.go @@ -68,7 +68,7 @@ func (db *cdb) SetupTestDatabase(schemaBaseDir string, replicas int) error { // loadSchema from PersistenceTestCluster interface func (db *cdb) loadSchema(fileNames []string, schemaBaseDir string) error { workflowSchemaDir := schemaBaseDir + "/cadence" - err := loadCassandraSchema(workflowSchemaDir, fileNames, db.cfg.Hosts, db.cfg.Port, db.cfg.Keyspace, true, nil, db.cfg.ProtoVersion) + err := db.loadCassandraSchema(workflowSchemaDir, fileNames, db.cfg.Hosts, db.cfg.Port, db.cfg.Keyspace, true, nil, db.cfg.ProtoVersion) if err != nil && !strings.Contains(err.Error(), "AlreadyExists") { // TODO: should we remove the second condition? return err @@ -79,7 +79,7 @@ func (db *cdb) loadSchema(fileNames []string, schemaBaseDir string) error { // loadVisibilitySchema from PersistenceTestCluster interface func (db *cdb) loadVisibilitySchema(fileNames []string, schemaBaseDir string) error { workflowSchemaDir := schemaBaseDir + "/visibility" - err := loadCassandraSchema(workflowSchemaDir, fileNames, db.cfg.Hosts, db.cfg.Port, db.cfg.Keyspace, false, nil, db.cfg.ProtoVersion) + err := db.loadCassandraSchema(workflowSchemaDir, fileNames, db.cfg.Hosts, db.cfg.Port, db.cfg.Keyspace, false, nil, db.cfg.ProtoVersion) if err != nil && !strings.Contains(err.Error(), "AlreadyExists") { // TODO: should we remove the second condition? return err @@ -129,7 +129,7 @@ func (db *cdb) dropCassandraKeyspace(s gocql.Session, keyspace string) (err erro } // loadCassandraSchema loads the schema from the given .cql files on this keyspace -func loadCassandraSchema( +func (db *cdb) loadCassandraSchema( dir string, fileNames []string, hosts string, @@ -177,7 +177,7 @@ func loadCassandraSchema( }, } - err = cassandra.SetupSchema(config) + err = cassandra.SetupSchema(config, db.logger) if err != nil { err = fmt.Errorf("error loading schema:%v", err.Error()) } diff --git a/common/util.go b/common/util.go index c406d51c00a..2ccc01787a2 100644 --- a/common/util.go +++ b/common/util.go @@ -340,9 +340,9 @@ func PrettyPrintHistory(history *types.History, logger log.Logger) { logger.Error("Error serializing history: %v\n", tag.Error(err)) } - fmt.Println("******************************************") - fmt.Println("History", tag.DetailInfo(string(data))) - fmt.Println("******************************************") + logger.Debug("******************************************") + logger.Debug("History", tag.DetailInfo(string(data))) + logger.Debug("******************************************") } // IsValidContext checks that the thrift context is not expired on cancelled. diff --git a/host/service.go b/host/service.go index 9095aa4f6fb..933db43621b 100644 --- a/host/service.go +++ b/host/service.go @@ -21,7 +21,6 @@ package host import ( - "math/rand" "sync/atomic" "time" @@ -173,8 +172,6 @@ func (h *serviceImpl) Start() { // The service is now started up h.logger.Info("service started") - // seed the random generator once for this service - rand.Seed(time.Now().UTC().UnixNano()) } // Stop closes the associated transport diff --git a/tools/cassandra/main.go b/tools/cassandra/main.go index 042c75128b9..1cdf756e5cc 100644 --- a/tools/cassandra/main.go +++ b/tools/cassandra/main.go @@ -25,6 +25,7 @@ import ( "github.com/urfave/cli/v2" + "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra/gocql" "github.com/uber/cadence/tools/common/schema" ) @@ -36,7 +37,7 @@ func RunTool(args []string) error { } // SetupSchema setups the cassandra schema -func SetupSchema(config *SetupSchemaConfig) error { +func SetupSchema(config *SetupSchemaConfig, logger log.Logger) error { if err := validateCQLClientConfig(&config.CQLClientConfig); err != nil { return err } @@ -44,7 +45,7 @@ func SetupSchema(config *SetupSchemaConfig) error { if err != nil { return err } - return schema.SetupFromConfig(&config.SetupConfig, db) + return schema.SetupFromConfig(&config.SetupConfig, logger, db) } // root handler for all cli commands diff --git a/tools/common/schema/handler.go b/tools/common/schema/handler.go index 347b210c8c5..34066d5b1d0 100644 --- a/tools/common/schema/handler.go +++ b/tools/common/schema/handler.go @@ -25,6 +25,9 @@ import ( "os" "github.com/urfave/cli/v2" + + "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/log/loggerimpl" ) // VerifyCompatibleVersion ensures that the installed version is greater than or equal to the expected version. @@ -52,66 +55,70 @@ func VerifyCompatibleVersion( } // SetupFromConfig sets up schema tables based on the given config -func SetupFromConfig(config *SetupConfig, db SchemaClient) error { +func SetupFromConfig(config *SetupConfig, logger log.Logger, db SchemaClient) error { if err := validateSetupConfig(config); err != nil { return err } - return newSetupSchemaTask(db, config).Run() + return newSetupSchemaTask(db, logger, config).Run() } // Setup sets up schema tables func Setup(cli *cli.Context, db SchemaClient) error { - cfg, err := newSetupConfig(cli) + cfg, logger, err := newSetupConfig(cli) if err != nil { return err } - return newSetupSchemaTask(db, cfg).Run() + return SetupFromConfig(cfg, logger, db) } // UpdateFromConfig updates the schema for the specified database based on the given config -func UpdateFromConfig(config *UpdateConfig, db SchemaClient) error { +func UpdateFromConfig(config *UpdateConfig, logger log.Logger, db SchemaClient) error { if err := validateUpdateConfig(config); err != nil { return err } - return NewUpdateSchemaTask(db, config).Run() + return NewUpdateSchemaTask(db, logger, config).Run() } // Update updates the schema for the specified database func Update(cli *cli.Context, db SchemaClient) error { - cfg, err := newUpdateConfig(cli) + cfg, logger, err := newUpdateConfig(cli) if err != nil { return err } - return NewUpdateSchemaTask(db, cfg).Run() + return UpdateFromConfig(cfg, logger, db) } -func newUpdateConfig(cli *cli.Context) (*UpdateConfig, error) { +func newUpdateConfig(cli *cli.Context) (*UpdateConfig, log.Logger, error) { config := new(UpdateConfig) schemaDir := cli.String(CLIOptSchemaDir) if len(schemaDir) == 0 { - return nil, NewConfigError("missing " + flag(CLIOptSchemaDir) + " argument ") + return nil, nil, NewConfigError("missing " + flag(CLIOptSchemaDir) + " argument ") } config.SchemaFS = os.DirFS(schemaDir) config.IsDryRun = cli.Bool(CLIOptDryrun) config.TargetVersion = cli.String(CLIOptTargetVersion) - if err := validateUpdateConfig(config); err != nil { - return nil, err + logger, err := loggerimpl.NewDevelopment() + if err != nil { + return nil, nil, fmt.Errorf("build logger: %w", err) } - return config, nil + + return config, logger, nil } -func newSetupConfig(cli *cli.Context) (*SetupConfig, error) { +func newSetupConfig(cli *cli.Context) (*SetupConfig, log.Logger, error) { config := new(SetupConfig) config.SchemaFilePath = cli.String(CLIOptSchemaFile) config.InitialVersion = cli.String(CLIOptVersion) config.DisableVersioning = cli.Bool(CLIOptDisableVersioning) config.Overwrite = cli.Bool(CLIOptOverwrite) - if err := validateSetupConfig(config); err != nil { - return nil, err + logger, err := loggerimpl.NewDevelopment() + if err != nil { + return nil, nil, fmt.Errorf("build logger: %w", err) } - return config, nil + + return config, logger, nil } func validateSetupConfig(config *SetupConfig) error { diff --git a/tools/common/schema/setuptask.go b/tools/common/schema/setuptask.go index 0b514d06a1f..bf0072dc51e 100644 --- a/tools/common/schema/setuptask.go +++ b/tools/common/schema/setuptask.go @@ -21,8 +21,10 @@ package schema import ( - "log" + "fmt" "os" + + "github.com/uber/cadence/common/log" ) // SetupTask represents a task @@ -31,19 +33,21 @@ import ( type SetupTask struct { db SchemaClient config *SetupConfig + logger log.Logger } -func newSetupSchemaTask(db SchemaClient, config *SetupConfig) *SetupTask { +func newSetupSchemaTask(db SchemaClient, logger log.Logger, config *SetupConfig) *SetupTask { return &SetupTask{ db: db, config: config, + logger: logger, } } // Run executes the task func (task *SetupTask) Run() error { config := task.config - log.Printf("Starting schema setup, config=%+v\n", config) + task.logger.Info(fmt.Sprintf("Starting schema setup, config=%+v\n", config)) if config.Overwrite { err := task.db.DropAllTables() @@ -53,7 +57,7 @@ func (task *SetupTask) Run() error { } if !config.DisableVersioning { - log.Printf("Setting up version tables\n") + task.logger.Info(fmt.Sprintf("Setting up version tables\n")) if err := task.db.CreateSchemaVersionTables(); err != nil { return err } @@ -69,30 +73,30 @@ func (task *SetupTask) Run() error { return err } - log.Println("----- Creating types and tables -----") + task.logger.Info("----- Creating types and tables -----") for _, stmt := range stmts { - log.Println(rmspaceRegex.ReplaceAllString(stmt, " ")) + task.logger.Info(rmspaceRegex.ReplaceAllString(stmt, " ")) if err := task.db.ExecDDLQuery(stmt); err != nil { return err } } - log.Println("----- Done -----") + task.logger.Info("----- Done -----") } if !config.DisableVersioning { - log.Printf("Setting initial schema version to %v\n", config.InitialVersion) + task.logger.Info(fmt.Sprintf("Setting initial schema version to %v\n", config.InitialVersion)) err := task.db.UpdateSchemaVersion(config.InitialVersion, config.InitialVersion) if err != nil { return err } - log.Printf("Updating schema update log\n") + task.logger.Info("Updating schema update log\n") err = task.db.WriteSchemaUpdateLog("0", config.InitialVersion, "", "initial version") if err != nil { return err } } - log.Println("Schema setup complete") + task.logger.Info("Schema setup complete") return nil } diff --git a/tools/common/schema/updatetask.go b/tools/common/schema/updatetask.go index 778a256deed..5d1aa5cbdb5 100644 --- a/tools/common/schema/updatetask.go +++ b/tools/common/schema/updatetask.go @@ -30,10 +30,11 @@ import ( "fmt" "io" "io/fs" - "log" "sort" "strings" "time" + + "github.com/uber/cadence/common/log" ) type ( @@ -42,6 +43,7 @@ type ( UpdateTask struct { db SchemaClient config *UpdateConfig + logger log.Logger } // manifest is a value type that represents @@ -86,9 +88,10 @@ var ( ) // NewUpdateSchemaTask returns a new instance of UpdateTask. -func NewUpdateSchemaTask(db SchemaClient, config *UpdateConfig) *UpdateTask { +func NewUpdateSchemaTask(db SchemaClient, logger log.Logger, config *UpdateConfig) *UpdateTask { return &UpdateTask{ db: db, + logger: logger, config: config, } } @@ -97,7 +100,7 @@ func NewUpdateSchemaTask(db SchemaClient, config *UpdateConfig) *UpdateTask { func (task *UpdateTask) Run() error { config := task.config - log.Printf("UpdateSchemeTask started, config=%+v\n", config) + task.logger.Info(fmt.Sprintf("UpdateSchemeTask started, config=%+v\n", config)) currVer, err := task.db.ReadSchemaVersion() if err != nil { @@ -110,14 +113,14 @@ func (task *UpdateTask) Run() error { } if config.IsDryRun { - log.Println("In DryRun mode, this command will only print queries without executing.....") + task.logger.Info("In DryRun mode, this command will only print queries without executing.....") if len(updates) == 0 { - log.Println("Found zero updates to run") + task.logger.Info("Found zero updates to run") } for _, upd := range updates { - log.Printf("DryRun of updating to version: %s, manifest: %s \n", upd.Version, upd.manifest) + task.logger.Info(fmt.Sprintf("DryRun of updating to version: %s, manifest: %s \n", upd.Version, upd.manifest)) for _, stmt := range upd.CqlStmts { - log.Printf("DryRun query:%s \n", stmt) + task.logger.Info(fmt.Sprintf("DryRun query:%s \n", stmt)) } } } else { @@ -126,7 +129,7 @@ func (task *UpdateTask) Run() error { return err } } - log.Printf("UpdateSchemeTask done\n") + task.logger.Info("UpdateSchemeTask done") return nil } @@ -134,7 +137,7 @@ func (task *UpdateTask) Run() error { func (task *UpdateTask) executeUpdates(currVer string, updates []ChangeSet) error { if len(updates) == 0 { - log.Printf("found zero updates from current version %v", currVer) + task.logger.Info(fmt.Sprintf("found zero updates from current version %v", currVer)) return nil } updStart := time.Now() @@ -150,25 +153,25 @@ func (task *UpdateTask) executeUpdates(currVer string, updates []ChangeSet) erro return err } - log.Printf("Schema updated from %v to %v, elapsed %v\n", currVer, cs.Version, time.Since(csStart)) + task.logger.Info(fmt.Sprintf("Schema updated from %v to %v, elapsed %v\n", currVer, cs.Version, time.Since(csStart))) currVer = cs.Version } - log.Printf("All schema changes completed in %v\n", time.Since(updStart)) + task.logger.Info(fmt.Sprintf("All schema changes completed in %v\n", time.Since(updStart))) return nil } func (task *UpdateTask) execCQLStmts(ver string, stmts []string) error { - log.Printf("---- Executing updates for version %v ----\n", ver) + task.logger.Info(fmt.Sprintf("---- Executing updates for version %v ----\n", ver)) for _, stmt := range stmts { - log.Println(rmspaceRegex.ReplaceAllString(stmt, " ")) + task.logger.Info(rmspaceRegex.ReplaceAllString(stmt, " ")) e := task.db.ExecDDLQuery(stmt) if e != nil { return fmt.Errorf("error executing CQL statement:%v", e) } } - log.Printf("---- Done ----\n") + task.logger.Info("---- Done ----") return nil }