Skip to content

Commit

Permalink
Rename CloseConnection to Close (#140)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin committed May 27, 2020
1 parent 73e0324 commit 8bcb763
Show file tree
Hide file tree
Showing 13 changed files with 78 additions and 78 deletions.
8 changes: 4 additions & 4 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,8 @@ type (
// - EntityNotExistError
DescribeTaskList(ctx context.Context, tasklist string, tasklistType tasklistpb.TaskListType) (*workflowservice.DescribeTaskListResponse, error)

// CloseConnection closes underlying gRPC connection.
CloseConnection()
// Close client and clean up underlying resources.
Close()
}

// NamespaceClient is the client for managing operations on the namespace.
Expand Down Expand Up @@ -383,8 +383,8 @@ type (
// - InternalServiceError
Update(ctx context.Context, request *workflowservice.UpdateNamespaceRequest) error

// CloseConnection closes underlying gRPC connection.
CloseConnection()
// Close client and clean up underlying resources.
Close()
}
)

Expand Down
8 changes: 4 additions & 4 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,8 @@ type (
// - EntityNotExistError
DescribeTaskList(ctx context.Context, tasklist string, tasklistType tasklistpb.TaskListType) (*workflowservice.DescribeTaskListResponse, error)

// CloseConnection closes underlying gRPC connection.
CloseConnection()
// Close client and clean up underlying resources.
Close()
}

// ClientOptions are optional parameters for Client creation.
Expand Down Expand Up @@ -511,8 +511,8 @@ type (
// - InternalServiceError
Update(ctx context.Context, request *workflowservice.UpdateNamespaceRequest) error

// CloseConnection closes underlying gRPC connection.
CloseConnection()
// Close client and clean up underlying resources.
Close()
}

// WorkflowIDReusePolicy defines workflow ID reuse behavior.
Expand Down
2 changes: 1 addition & 1 deletion internal/internal_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,5 +78,5 @@ func isServiceTransientError(err error) bool {
*serviceerror.CancellationAlreadyRequested:
return false
}
return err != errShutdown
return err != errStop
}
42 changes: 21 additions & 21 deletions internal/internal_task_pollers.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type (

// basePoller is the base class for all poller implementations
basePoller struct {
shutdownC <-chan struct{}
stopC <-chan struct{}
}

// workflowTaskPoller implements polling/processing a workflow task
Expand Down Expand Up @@ -180,21 +180,21 @@ func isClientSideError(err error) bool {
return err == context.DeadlineExceeded
}

// shuttingDown returns true if worker is shutting down right now
func (bp *basePoller) shuttingDown() bool {
// stopping returns true if worker is stopping right now
func (bp *basePoller) stopping() bool {
select {
case <-bp.shutdownC:
case <-bp.stopC:
return true
default:
return false
}
}

// doPoll runs the given pollFunc in a separate go routine. Returns when either of the conditions are met:
// - poll succeeds, poll fails or worker is shutting down
// - poll succeeds, poll fails or worker is stopping
func (bp *basePoller) doPoll(pollFunc func(ctx context.Context) (interface{}, error)) (interface{}, error) {
if bp.shuttingDown() {
return nil, errShutdown
if bp.stopping() {
return nil, errStop
}

var err error
Expand All @@ -212,16 +212,16 @@ func (bp *basePoller) doPoll(pollFunc func(ctx context.Context) (interface{}, er
select {
case <-doneC:
return result, err
case <-bp.shutdownC:
case <-bp.stopC:
cancel()
return nil, errShutdown
return nil, errStop
}
}

// newWorkflowTaskPoller creates a new workflow task poller which must have a one to one relationship to workflow worker
func newWorkflowTaskPoller(taskHandler WorkflowTaskHandler, service workflowservice.WorkflowServiceClient, params workerExecutionParameters) *workflowTaskPoller {
return &workflowTaskPoller{
basePoller: basePoller{shutdownC: params.WorkerStopChannel},
basePoller: basePoller{stopC: params.WorkerStopChannel},
service: service,
namespace: params.Namespace,
taskListName: params.TaskList,
Expand Down Expand Up @@ -249,8 +249,8 @@ func (wtp *workflowTaskPoller) PollTask() (interface{}, error) {

// ProcessTask processes a task which could be workflow task or local activity result
func (wtp *workflowTaskPoller) ProcessTask(task interface{}) error {
if wtp.shuttingDown() {
return errShutdown
if wtp.stopping() {
return errStop
}

switch task := task.(type) {
Expand Down Expand Up @@ -425,7 +425,7 @@ func newLocalActivityPoller(params workerExecutionParameters, laTunnel *localAct
tracer: params.Tracer,
}
return &localActivityTaskPoller{
basePoller: basePoller{shutdownC: params.WorkerStopChannel},
basePoller: basePoller{stopC: params.WorkerStopChannel},
handler: handler,
metricsScope: params.MetricsScope,
logger: params.Logger,
Expand All @@ -438,8 +438,8 @@ func (latp *localActivityTaskPoller) PollTask() (interface{}, error) {
}

func (latp *localActivityTaskPoller) ProcessTask(task interface{}) error {
if latp.shuttingDown() {
return errShutdown
if latp.stopping() {
return errStop
}

result := latp.handler.executeLocalActivityTask(task.(*localActivityTask))
Expand Down Expand Up @@ -795,7 +795,7 @@ func newGetHistoryPageFunc(

func newActivityTaskPoller(taskHandler ActivityTaskHandler, service workflowservice.WorkflowServiceClient, params workerExecutionParameters) *activityTaskPoller {
return &activityTaskPoller{
basePoller: basePoller{shutdownC: params.WorkerStopChannel},
basePoller: basePoller{stopC: params.WorkerStopChannel},
taskHandler: taskHandler,
service: service,
namespace: params.Namespace,
Expand Down Expand Up @@ -858,8 +858,8 @@ func (atp *activityTaskPoller) PollTask() (interface{}, error) {

// ProcessTask processes a new task
func (atp *activityTaskPoller) ProcessTask(task interface{}) error {
if atp.shuttingDown() {
return errShutdown
if atp.stopping() {
return errStop
}

activityTask := task.(*activityTask)
Expand Down Expand Up @@ -888,9 +888,9 @@ func (atp *activityTaskPoller) ProcessTask(task interface{}) error {
return nil
}

// if worker is shutting down, don't bother reporting activity completion
if atp.shuttingDown() {
return errShutdown
// if worker is stopping, don't bother reporting activity completion
if atp.stopping() {
return errStop
}

responseStartTime := time.Now()
Expand Down
18 changes: 9 additions & 9 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func newWorkflowTaskWorkerInternal(taskHandler WorkflowTaskHandler, service work
taskWorker: poller,
identity: params.Identity,
workerType: "DecisionWorker",
shutdownTimeout: params.WorkerStopTimeout},
stopTimeout: params.WorkerStopTimeout},
params.Logger,
params.MetricsScope,
nil,
Expand All @@ -315,7 +315,7 @@ func newWorkflowTaskWorkerInternal(taskHandler WorkflowTaskHandler, service work
taskWorker: localActivityTaskPoller,
identity: params.Identity,
workerType: "LocalActivityWorker",
shutdownTimeout: params.WorkerStopTimeout},
stopTimeout: params.WorkerStopTimeout},
params.Logger,
params.MetricsScope,
nil,
Expand Down Expand Up @@ -356,7 +356,7 @@ func (ww *workflowWorker) Run() error {
return nil
}

// Shutdown the worker.
// Stop the worker.
func (ww *workflowWorker) Stop() {
close(ww.stopC)
// TODO: remove the stop methods in favor of the workerStopChannel
Expand Down Expand Up @@ -445,7 +445,7 @@ func newActivityTaskWorker(taskHandler ActivityTaskHandler, service workflowserv
taskWorker: poller,
identity: workerParams.Identity,
workerType: "ActivityWorker",
shutdownTimeout: workerParams.WorkerStopTimeout,
stopTimeout: workerParams.WorkerStopTimeout,
userContextCancel: workerParams.UserContextCancel},
workerParams.Logger,
workerParams.MetricsScope,
Expand Down Expand Up @@ -482,7 +482,7 @@ func (aw *activityWorker) Run() error {
return nil
}

// Shutdown the worker.
// Stop the worker.
func (aw *activityWorker) Stop() {
close(aw.stopC)
aw.worker.Stop()
Expand Down Expand Up @@ -962,7 +962,7 @@ func (aw *AggregatedWorker) RegisterActivityWithOptions(a interface{}, options R
aw.registry.RegisterActivityWithOptions(a, options)
}

// Start starts the worker in a non-blocking fashion
// Start the worker in a non-blocking fashion.
func (aw *AggregatedWorker) Start() error {
if err := initBinaryChecksum(); err != nil {
return fmt.Errorf("failed to get executable checksum: %v", err)
Expand Down Expand Up @@ -1076,8 +1076,8 @@ func getBinaryChecksum() string {
return binaryChecksum
}

// Run is a blocking start and cleans up resources when killed
// returns error only if it fails to start the worker
// Run the worker in a blocking fashion. Stop the worker when process is killed with SIGINT or SIGTERM.
// Returns error only if worker fails to start.
func (aw *AggregatedWorker) Run() error {
if err := aw.Start(); err != nil {
return err
Expand All @@ -1088,7 +1088,7 @@ func (aw *AggregatedWorker) Run() error {
return nil
}

// Stop cleans up any resources opened by worker
// Stop the worker.
func (aw *AggregatedWorker) Stop() {
if !isInterfaceNil(aw.workflowWorker) {
aw.workflowWorker.Stop()
Expand Down
44 changes: 22 additions & 22 deletions internal/internal_worker_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ var (
pollOperationRetryPolicy = createPollRetryPolicy()
)

var errShutdown = errors.New("worker shutting down")
var errStop = errors.New("worker stopping")

type (
// resultHandler that returns result
Expand Down Expand Up @@ -115,16 +115,16 @@ type (
taskWorker taskPoller
identity string
workerType string
shutdownTimeout time.Duration
stopTimeout time.Duration
userContextCancel context.CancelFunc
}

// baseWorker that wraps worker activities.
baseWorker struct {
options baseWorkerOptions
isWorkerStarted bool
shutdownCh chan struct{} // Channel used to shut down the go routines.
shutdownWG sync.WaitGroup // The WaitGroup for shutting down existing routines.
stopCh chan struct{} // Channel used to stop the go routines.
stopWG sync.WaitGroup // The WaitGroup for stopping existing routines.
pollLimiter *rate.Limiter
taskLimiter *rate.Limiter
limiterContext context.Context
Expand Down Expand Up @@ -159,7 +159,7 @@ func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope t
ctx, cancel := context.WithCancel(context.Background())
bw := &baseWorker{
options: options,
shutdownCh: make(chan struct{}),
stopCh: make(chan struct{}),
taskLimiter: rate.NewLimiter(rate.Limit(options.maxTaskPerSecond), 1),
retrier: backoff.NewConcurrentRetrier(pollOperationRetryPolicy),
logger: logger.With(zapcore.Field{Key: tagWorkerType, Type: zapcore.StringType, String: options.workerType}),
Expand Down Expand Up @@ -187,11 +187,11 @@ func (bw *baseWorker) Start() {
bw.metricsScope.Counter(metrics.WorkerStartCounter).Inc(1)

for i := 0; i < bw.options.pollerCount; i++ {
bw.shutdownWG.Add(1)
bw.stopWG.Add(1)
go bw.runPoller()
}

bw.shutdownWG.Add(1)
bw.stopWG.Add(1)
go bw.runTaskDispatcher()

bw.isWorkerStarted = true
Expand All @@ -204,22 +204,22 @@ func (bw *baseWorker) Start() {
})
}

func (bw *baseWorker) isShutdown() bool {
func (bw *baseWorker) isStop() bool {
select {
case <-bw.shutdownCh:
case <-bw.stopCh:
return true
default:
return false
}
}

func (bw *baseWorker) runPoller() {
defer bw.shutdownWG.Done()
defer bw.stopWG.Done()
bw.metricsScope.Counter(metrics.PollerStartCounter).Inc(1)

for {
select {
case <-bw.shutdownCh:
case <-bw.stopCh:
return
case <-bw.pollerRequestCh:
if bw.sessionTokenBucket != nil {
Expand All @@ -231,26 +231,26 @@ func (bw *baseWorker) runPoller() {
}

func (bw *baseWorker) runTaskDispatcher() {
defer bw.shutdownWG.Done()
defer bw.stopWG.Done()

for i := 0; i < bw.options.maxConcurrentTask; i++ {
bw.pollerRequestCh <- struct{}{}
}

for {
// wait for new task or shutdown
// wait for new task or worker stop
select {
case <-bw.shutdownCh:
case <-bw.stopCh:
return
case task := <-bw.taskQueueCh:
// for non-polled-task (local activity result as task), we don't need to rate limit
_, isPolledTask := task.(*polledTask)
if isPolledTask && bw.taskLimiter.Wait(bw.limiterContext) != nil {
if bw.isShutdown() {
if bw.isStop() {
return
}
}
bw.shutdownWG.Add(1)
bw.stopWG.Add(1)
go bw.processTask(task)
}
}
Expand All @@ -275,15 +275,15 @@ func (bw *baseWorker) pollTask() {
if task != nil {
select {
case bw.taskQueueCh <- &polledTask{task}:
case <-bw.shutdownCh:
case <-bw.stopCh:
}
} else {
bw.pollerRequestCh <- struct{}{} // poll failed, trigger a new poll
}
}

func (bw *baseWorker) processTask(task interface{}) {
defer bw.shutdownWG.Done()
defer bw.stopWG.Done()
// If the task is from poller, after processing it we would need to request a new poll. Otherwise, the task is from
// local activity worker, we don't need a new poll from server.
polledTask, isPolledTask := task.(*polledTask)
Expand Down Expand Up @@ -323,17 +323,17 @@ func (bw *baseWorker) Run() {
bw.Stop()
}

// Shutdown is a blocking call and cleans up all the resources associated with worker.
// Stop is a blocking call and cleans up all the resources associated with worker.
func (bw *baseWorker) Stop() {
if !bw.isWorkerStarted {
return
}
close(bw.shutdownCh)
close(bw.stopCh)
bw.limiterContextCancel()

if success := awaitWaitGroup(&bw.shutdownWG, bw.options.shutdownTimeout); !success {
if success := awaitWaitGroup(&bw.stopWG, bw.options.stopTimeout); !success {
traceLog(func() {
bw.logger.Info("Worker graceful shutdown timed out.", zap.Duration("Shutdown timeout", bw.options.shutdownTimeout))
bw.logger.Info("Worker graceful stop timed out.", zap.Duration("Stop timeout", bw.options.stopTimeout))
})
}

Expand Down
Loading

0 comments on commit 8bcb763

Please sign in to comment.