From 8bcb76350d5c1141f2b0cb40ea7f138547fb586f Mon Sep 17 00:00:00 2001 From: Alex Shtin Date: Tue, 26 May 2020 17:49:21 -0700 Subject: [PATCH] Rename CloseConnection to Close (#140) --- client/client.go | 8 ++--- internal/client.go | 8 ++--- internal/internal_retry.go | 2 +- internal/internal_task_pollers.go | 42 +++++++++++++------------- internal/internal_worker.go | 18 ++++++------ internal/internal_worker_base.go | 44 ++++++++++++++-------------- internal/internal_workers_test.go | 2 +- internal/internal_workflow_client.go | 8 ++--- internal/worker.go | 2 +- mocks/Client.go | 4 +-- mocks/NamespaceClient.go | 4 +-- test/integration_test.go | 6 ++-- worker/worker.go | 8 ++--- 13 files changed, 78 insertions(+), 78 deletions(-) diff --git a/client/client.go b/client/client.go index 627f05135..a94564a68 100644 --- a/client/client.go +++ b/client/client.go @@ -352,8 +352,8 @@ type ( // - EntityNotExistError DescribeTaskList(ctx context.Context, tasklist string, tasklistType tasklistpb.TaskListType) (*workflowservice.DescribeTaskListResponse, error) - // CloseConnection closes underlying gRPC connection. - CloseConnection() + // Close client and clean up underlying resources. + Close() } // NamespaceClient is the client for managing operations on the namespace. @@ -383,8 +383,8 @@ type ( // - InternalServiceError Update(ctx context.Context, request *workflowservice.UpdateNamespaceRequest) error - // CloseConnection closes underlying gRPC connection. - CloseConnection() + // Close client and clean up underlying resources. + Close() } ) diff --git a/internal/client.go b/internal/client.go index c4a05c2c1..913b41c41 100644 --- a/internal/client.go +++ b/internal/client.go @@ -312,8 +312,8 @@ type ( // - EntityNotExistError DescribeTaskList(ctx context.Context, tasklist string, tasklistType tasklistpb.TaskListType) (*workflowservice.DescribeTaskListResponse, error) - // CloseConnection closes underlying gRPC connection. - CloseConnection() + // Close client and clean up underlying resources. + Close() } // ClientOptions are optional parameters for Client creation. @@ -511,8 +511,8 @@ type ( // - InternalServiceError Update(ctx context.Context, request *workflowservice.UpdateNamespaceRequest) error - // CloseConnection closes underlying gRPC connection. - CloseConnection() + // Close client and clean up underlying resources. + Close() } // WorkflowIDReusePolicy defines workflow ID reuse behavior. diff --git a/internal/internal_retry.go b/internal/internal_retry.go index 0b5ab4c0f..21846f23d 100644 --- a/internal/internal_retry.go +++ b/internal/internal_retry.go @@ -78,5 +78,5 @@ func isServiceTransientError(err error) bool { *serviceerror.CancellationAlreadyRequested: return false } - return err != errShutdown + return err != errStop } diff --git a/internal/internal_task_pollers.go b/internal/internal_task_pollers.go index e70ba2bcd..b8e8cbe6e 100644 --- a/internal/internal_task_pollers.go +++ b/internal/internal_task_pollers.go @@ -70,7 +70,7 @@ type ( // basePoller is the base class for all poller implementations basePoller struct { - shutdownC <-chan struct{} + stopC <-chan struct{} } // workflowTaskPoller implements polling/processing a workflow task @@ -180,10 +180,10 @@ func isClientSideError(err error) bool { return err == context.DeadlineExceeded } -// shuttingDown returns true if worker is shutting down right now -func (bp *basePoller) shuttingDown() bool { +// stopping returns true if worker is stopping right now +func (bp *basePoller) stopping() bool { select { - case <-bp.shutdownC: + case <-bp.stopC: return true default: return false @@ -191,10 +191,10 @@ func (bp *basePoller) shuttingDown() bool { } // doPoll runs the given pollFunc in a separate go routine. Returns when either of the conditions are met: -// - poll succeeds, poll fails or worker is shutting down +// - poll succeeds, poll fails or worker is stopping func (bp *basePoller) doPoll(pollFunc func(ctx context.Context) (interface{}, error)) (interface{}, error) { - if bp.shuttingDown() { - return nil, errShutdown + if bp.stopping() { + return nil, errStop } var err error @@ -212,16 +212,16 @@ func (bp *basePoller) doPoll(pollFunc func(ctx context.Context) (interface{}, er select { case <-doneC: return result, err - case <-bp.shutdownC: + case <-bp.stopC: cancel() - return nil, errShutdown + return nil, errStop } } // newWorkflowTaskPoller creates a new workflow task poller which must have a one to one relationship to workflow worker func newWorkflowTaskPoller(taskHandler WorkflowTaskHandler, service workflowservice.WorkflowServiceClient, params workerExecutionParameters) *workflowTaskPoller { return &workflowTaskPoller{ - basePoller: basePoller{shutdownC: params.WorkerStopChannel}, + basePoller: basePoller{stopC: params.WorkerStopChannel}, service: service, namespace: params.Namespace, taskListName: params.TaskList, @@ -249,8 +249,8 @@ func (wtp *workflowTaskPoller) PollTask() (interface{}, error) { // ProcessTask processes a task which could be workflow task or local activity result func (wtp *workflowTaskPoller) ProcessTask(task interface{}) error { - if wtp.shuttingDown() { - return errShutdown + if wtp.stopping() { + return errStop } switch task := task.(type) { @@ -425,7 +425,7 @@ func newLocalActivityPoller(params workerExecutionParameters, laTunnel *localAct tracer: params.Tracer, } return &localActivityTaskPoller{ - basePoller: basePoller{shutdownC: params.WorkerStopChannel}, + basePoller: basePoller{stopC: params.WorkerStopChannel}, handler: handler, metricsScope: params.MetricsScope, logger: params.Logger, @@ -438,8 +438,8 @@ func (latp *localActivityTaskPoller) PollTask() (interface{}, error) { } func (latp *localActivityTaskPoller) ProcessTask(task interface{}) error { - if latp.shuttingDown() { - return errShutdown + if latp.stopping() { + return errStop } result := latp.handler.executeLocalActivityTask(task.(*localActivityTask)) @@ -795,7 +795,7 @@ func newGetHistoryPageFunc( func newActivityTaskPoller(taskHandler ActivityTaskHandler, service workflowservice.WorkflowServiceClient, params workerExecutionParameters) *activityTaskPoller { return &activityTaskPoller{ - basePoller: basePoller{shutdownC: params.WorkerStopChannel}, + basePoller: basePoller{stopC: params.WorkerStopChannel}, taskHandler: taskHandler, service: service, namespace: params.Namespace, @@ -858,8 +858,8 @@ func (atp *activityTaskPoller) PollTask() (interface{}, error) { // ProcessTask processes a new task func (atp *activityTaskPoller) ProcessTask(task interface{}) error { - if atp.shuttingDown() { - return errShutdown + if atp.stopping() { + return errStop } activityTask := task.(*activityTask) @@ -888,9 +888,9 @@ func (atp *activityTaskPoller) ProcessTask(task interface{}) error { return nil } - // if worker is shutting down, don't bother reporting activity completion - if atp.shuttingDown() { - return errShutdown + // if worker is stopping, don't bother reporting activity completion + if atp.stopping() { + return errStop } responseStartTime := time.Now() diff --git a/internal/internal_worker.go b/internal/internal_worker.go index d9916cf15..d6cd89b1e 100644 --- a/internal/internal_worker.go +++ b/internal/internal_worker.go @@ -292,7 +292,7 @@ func newWorkflowTaskWorkerInternal(taskHandler WorkflowTaskHandler, service work taskWorker: poller, identity: params.Identity, workerType: "DecisionWorker", - shutdownTimeout: params.WorkerStopTimeout}, + stopTimeout: params.WorkerStopTimeout}, params.Logger, params.MetricsScope, nil, @@ -315,7 +315,7 @@ func newWorkflowTaskWorkerInternal(taskHandler WorkflowTaskHandler, service work taskWorker: localActivityTaskPoller, identity: params.Identity, workerType: "LocalActivityWorker", - shutdownTimeout: params.WorkerStopTimeout}, + stopTimeout: params.WorkerStopTimeout}, params.Logger, params.MetricsScope, nil, @@ -356,7 +356,7 @@ func (ww *workflowWorker) Run() error { return nil } -// Shutdown the worker. +// Stop the worker. func (ww *workflowWorker) Stop() { close(ww.stopC) // TODO: remove the stop methods in favor of the workerStopChannel @@ -445,7 +445,7 @@ func newActivityTaskWorker(taskHandler ActivityTaskHandler, service workflowserv taskWorker: poller, identity: workerParams.Identity, workerType: "ActivityWorker", - shutdownTimeout: workerParams.WorkerStopTimeout, + stopTimeout: workerParams.WorkerStopTimeout, userContextCancel: workerParams.UserContextCancel}, workerParams.Logger, workerParams.MetricsScope, @@ -482,7 +482,7 @@ func (aw *activityWorker) Run() error { return nil } -// Shutdown the worker. +// Stop the worker. func (aw *activityWorker) Stop() { close(aw.stopC) aw.worker.Stop() @@ -962,7 +962,7 @@ func (aw *AggregatedWorker) RegisterActivityWithOptions(a interface{}, options R aw.registry.RegisterActivityWithOptions(a, options) } -// Start starts the worker in a non-blocking fashion +// Start the worker in a non-blocking fashion. func (aw *AggregatedWorker) Start() error { if err := initBinaryChecksum(); err != nil { return fmt.Errorf("failed to get executable checksum: %v", err) @@ -1076,8 +1076,8 @@ func getBinaryChecksum() string { return binaryChecksum } -// Run is a blocking start and cleans up resources when killed -// returns error only if it fails to start the worker +// Run the worker in a blocking fashion. Stop the worker when process is killed with SIGINT or SIGTERM. +// Returns error only if worker fails to start. func (aw *AggregatedWorker) Run() error { if err := aw.Start(); err != nil { return err @@ -1088,7 +1088,7 @@ func (aw *AggregatedWorker) Run() error { return nil } -// Stop cleans up any resources opened by worker +// Stop the worker. func (aw *AggregatedWorker) Stop() { if !isInterfaceNil(aw.workflowWorker) { aw.workflowWorker.Stop() diff --git a/internal/internal_worker_base.go b/internal/internal_worker_base.go index d05edffb2..715c27719 100644 --- a/internal/internal_worker_base.go +++ b/internal/internal_worker_base.go @@ -53,7 +53,7 @@ var ( pollOperationRetryPolicy = createPollRetryPolicy() ) -var errShutdown = errors.New("worker shutting down") +var errStop = errors.New("worker stopping") type ( // resultHandler that returns result @@ -115,7 +115,7 @@ type ( taskWorker taskPoller identity string workerType string - shutdownTimeout time.Duration + stopTimeout time.Duration userContextCancel context.CancelFunc } @@ -123,8 +123,8 @@ type ( baseWorker struct { options baseWorkerOptions isWorkerStarted bool - shutdownCh chan struct{} // Channel used to shut down the go routines. - shutdownWG sync.WaitGroup // The WaitGroup for shutting down existing routines. + stopCh chan struct{} // Channel used to stop the go routines. + stopWG sync.WaitGroup // The WaitGroup for stopping existing routines. pollLimiter *rate.Limiter taskLimiter *rate.Limiter limiterContext context.Context @@ -159,7 +159,7 @@ func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope t ctx, cancel := context.WithCancel(context.Background()) bw := &baseWorker{ options: options, - shutdownCh: make(chan struct{}), + stopCh: make(chan struct{}), taskLimiter: rate.NewLimiter(rate.Limit(options.maxTaskPerSecond), 1), retrier: backoff.NewConcurrentRetrier(pollOperationRetryPolicy), logger: logger.With(zapcore.Field{Key: tagWorkerType, Type: zapcore.StringType, String: options.workerType}), @@ -187,11 +187,11 @@ func (bw *baseWorker) Start() { bw.metricsScope.Counter(metrics.WorkerStartCounter).Inc(1) for i := 0; i < bw.options.pollerCount; i++ { - bw.shutdownWG.Add(1) + bw.stopWG.Add(1) go bw.runPoller() } - bw.shutdownWG.Add(1) + bw.stopWG.Add(1) go bw.runTaskDispatcher() bw.isWorkerStarted = true @@ -204,9 +204,9 @@ func (bw *baseWorker) Start() { }) } -func (bw *baseWorker) isShutdown() bool { +func (bw *baseWorker) isStop() bool { select { - case <-bw.shutdownCh: + case <-bw.stopCh: return true default: return false @@ -214,12 +214,12 @@ func (bw *baseWorker) isShutdown() bool { } func (bw *baseWorker) runPoller() { - defer bw.shutdownWG.Done() + defer bw.stopWG.Done() bw.metricsScope.Counter(metrics.PollerStartCounter).Inc(1) for { select { - case <-bw.shutdownCh: + case <-bw.stopCh: return case <-bw.pollerRequestCh: if bw.sessionTokenBucket != nil { @@ -231,26 +231,26 @@ func (bw *baseWorker) runPoller() { } func (bw *baseWorker) runTaskDispatcher() { - defer bw.shutdownWG.Done() + defer bw.stopWG.Done() for i := 0; i < bw.options.maxConcurrentTask; i++ { bw.pollerRequestCh <- struct{}{} } for { - // wait for new task or shutdown + // wait for new task or worker stop select { - case <-bw.shutdownCh: + case <-bw.stopCh: return case task := <-bw.taskQueueCh: // for non-polled-task (local activity result as task), we don't need to rate limit _, isPolledTask := task.(*polledTask) if isPolledTask && bw.taskLimiter.Wait(bw.limiterContext) != nil { - if bw.isShutdown() { + if bw.isStop() { return } } - bw.shutdownWG.Add(1) + bw.stopWG.Add(1) go bw.processTask(task) } } @@ -275,7 +275,7 @@ func (bw *baseWorker) pollTask() { if task != nil { select { case bw.taskQueueCh <- &polledTask{task}: - case <-bw.shutdownCh: + case <-bw.stopCh: } } else { bw.pollerRequestCh <- struct{}{} // poll failed, trigger a new poll @@ -283,7 +283,7 @@ func (bw *baseWorker) pollTask() { } func (bw *baseWorker) processTask(task interface{}) { - defer bw.shutdownWG.Done() + defer bw.stopWG.Done() // If the task is from poller, after processing it we would need to request a new poll. Otherwise, the task is from // local activity worker, we don't need a new poll from server. polledTask, isPolledTask := task.(*polledTask) @@ -323,17 +323,17 @@ func (bw *baseWorker) Run() { bw.Stop() } -// Shutdown is a blocking call and cleans up all the resources associated with worker. +// Stop is a blocking call and cleans up all the resources associated with worker. func (bw *baseWorker) Stop() { if !bw.isWorkerStarted { return } - close(bw.shutdownCh) + close(bw.stopCh) bw.limiterContextCancel() - if success := awaitWaitGroup(&bw.shutdownWG, bw.options.shutdownTimeout); !success { + if success := awaitWaitGroup(&bw.stopWG, bw.options.stopTimeout); !success { traceLog(func() { - bw.logger.Info("Worker graceful shutdown timed out.", zap.Duration("Shutdown timeout", bw.options.shutdownTimeout)) + bw.logger.Info("Worker graceful stop timed out.", zap.Duration("Stop timeout", bw.options.stopTimeout)) }) } diff --git a/internal/internal_workers_test.go b/internal/internal_workers_test.go index 876d3e659..8255bf90e 100644 --- a/internal/internal_workers_test.go +++ b/internal/internal_workers_test.go @@ -188,7 +188,7 @@ func (s *WorkersTestSuite) TestActivityWorkerStop() { _ = activityTaskHandler.BlockedOnExecuteCalled() go worker.Stop() - <-worker.worker.shutdownCh + <-worker.worker.stopCh err := ctx.Err() s.NoError(err) diff --git a/internal/internal_workflow_client.go b/internal/internal_workflow_client.go index 314f5be0d..5b68d528a 100644 --- a/internal/internal_workflow_client.go +++ b/internal/internal_workflow_client.go @@ -943,8 +943,8 @@ func (wc *WorkflowClient) DescribeTaskList(ctx context.Context, taskList string, return resp, nil } -// CloseConnection closes underlying gRPC connection. -func (wc *WorkflowClient) CloseConnection() { +// Close client and clean up underlying resources. +func (wc *WorkflowClient) Close() { if wc.connectionCloser == nil { return } @@ -1023,8 +1023,8 @@ func (nc *namespaceClient) Update(ctx context.Context, request *workflowservice. }, createDynamicServiceRetryPolicy(ctx), isServiceTransientError) } -// CloseConnection closes underlying gRPC connection. -func (nc *namespaceClient) CloseConnection() { +// Close client and clean up underlying resources. +func (nc *namespaceClient) Close() { if nc.connectionCloser == nil { return } diff --git a/internal/worker.go b/internal/worker.go index 68dcc01e8..948d7d29d 100644 --- a/internal/worker.go +++ b/internal/worker.go @@ -134,7 +134,7 @@ type ( // default: BlockWorkflow, which just logs error but reply nothing back to server NonDeterministicWorkflowPolicy WorkflowPanicPolicy - // Optional: worker graceful shutdown timeout + // Optional: worker graceful stop timeout // default: 0s WorkerStopTimeout time.Duration diff --git a/mocks/Client.go b/mocks/Client.go index 7935ad1fd..8390aba97 100644 --- a/mocks/Client.go +++ b/mocks/Client.go @@ -518,8 +518,8 @@ func (_m *Client) TerminateWorkflow(ctx context.Context, workflowID string, runI return r0 } -// CloseConnection provides a mock function without given fields -func (_m *Client) CloseConnection() { +// Close provides a mock function without given fields +func (_m *Client) Close() { ret := _m.Called() if rf, ok := ret.Get(0).(func()); ok { diff --git a/mocks/NamespaceClient.go b/mocks/NamespaceClient.go index 561c5bdb8..37ea3b08b 100644 --- a/mocks/NamespaceClient.go +++ b/mocks/NamespaceClient.go @@ -89,8 +89,8 @@ func (_m *NamespaceClient) Update(ctx context.Context, request *workflowservice. return r0 } -// CloseConnection provides a mock function without given fields -func (_m *NamespaceClient) CloseConnection() { +// Close provides a mock function without given fields +func (_m *NamespaceClient) Close() { ret := _m.Called() if rf, ok := ret.Get(0).(func()); ok { diff --git a/test/integration_test.go b/test/integration_test.go index df5f2281e..c491905fd 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -116,9 +116,9 @@ func (ts *IntegrationTestSuite) SetupSuite() { func (ts *IntegrationTestSuite) TearDownSuite() { ts.Assertions = require.New(ts.T()) - ts.client.CloseConnection() + ts.client.Close() - // allow the pollers to shut down, and ensure there are no goroutine leaks. + // allow the pollers to stop, and ensure there are no goroutine leaks. // this will wait for up to 1 minute for leaks to subside, but exit relatively quickly if possible. max := time.After(time.Minute) var last error @@ -465,7 +465,7 @@ func (ts *IntegrationTestSuite) registerNamespace() { Name: name, WorkflowExecutionRetentionPeriodInDays: retention, }) - client.CloseConnection() + client.Close() if _, ok := err.(*serviceerror.NamespaceAlreadyExists); ok { return } diff --git a/worker/worker.go b/worker/worker.go index ae41dfa81..b163073fc 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -112,12 +112,12 @@ type ( // worker.RegisterActivityWithOptions(barActivity, RegisterActivityOptions{DisableAlreadyRegisteredCheck: true}) RegisterActivityWithOptions(a interface{}, options activity.RegisterOptions) - // Start starts the worker in a non-blocking fashion + // Start the worker in a non-blocking fashion. Start() error - // Run is a blocking start and cleans up resources when killed - // returns error only if it fails to start the worker + // Run the worker in a blocking fashion. Stop the worker when process is killed with SIGINT or SIGTERM. + // Returns error only if worker fails to start. Run() error - // Stop cleans up any resources opened by worker + // Stop the worker. Stop() }