Skip to content

Commit

Permalink
CloseConnection doesn't return error (#131)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin committed May 15, 2020
1 parent c2c1f89 commit 6313e48
Show file tree
Hide file tree
Showing 11 changed files with 60 additions and 70 deletions.
4 changes: 2 additions & 2 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ type (
DescribeTaskList(ctx context.Context, tasklist string, tasklistType tasklistpb.TaskListType) (*workflowservice.DescribeTaskListResponse, error)

// CloseConnection closes underlying gRPC connection.
CloseConnection() error
CloseConnection()
}

// NamespaceClient is the client for managing operations on the namespace.
Expand Down Expand Up @@ -381,7 +381,7 @@ type (
Update(ctx context.Context, request *workflowservice.UpdateNamespaceRequest) error

// CloseConnection closes underlying gRPC connection.
CloseConnection() error
CloseConnection()
}
)

Expand Down
10 changes: 8 additions & 2 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ type (
DescribeTaskList(ctx context.Context, tasklist string, tasklistType tasklistpb.TaskListType) (*workflowservice.DescribeTaskListResponse, error)

// CloseConnection closes underlying gRPC connection.
CloseConnection() error
CloseConnection()
}

// ClientOptions are optional parameters for Client creation.
Expand All @@ -327,6 +327,10 @@ type (
// default: default
Namespace string

// Optional: Logger framework can use to log.
// default: default logger provided.
Logger *zap.Logger

// Optional: Metrics to be reported.
// Default metrics are Prometheus compatible but default separator (.) should be replaced with some other character:
// opts := tally.ScopeOptions{
Expand Down Expand Up @@ -510,7 +514,7 @@ type (
Update(ctx context.Context, request *workflowservice.UpdateNamespaceRequest) error

// CloseConnection closes underlying gRPC connection.
CloseConnection() error
CloseConnection()
}

// WorkflowIDReusePolicy defines workflow ID reuse behavior.
Expand Down Expand Up @@ -600,6 +604,7 @@ func NewServiceClient(workflowServiceClient workflowservice.WorkflowServiceClien
namespace: options.Namespace,
registry: newRegistry(),
metricsScope: metrics.NewTaggedScope(options.MetricsScope),
logger: options.Logger,
identity: options.Identity,
dataConverter: options.DataConverter,
contextPropagators: options.ContextPropagators,
Expand Down Expand Up @@ -632,6 +637,7 @@ func newNamespaceServiceClient(workflowServiceClient workflowservice.WorkflowSer
workflowService: workflowServiceClient,
connectionCloser: clientConn,
metricsScope: options.MetricsScope,
logger: options.Logger,
identity: options.Identity,
}
}
Expand Down
2 changes: 1 addition & 1 deletion internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1342,7 +1342,7 @@ func NewAggregatedWorker(client *WorkflowClient, taskList string, options Worker
MaxConcurrentDecisionPollers: options.MaxConcurrentDecisionTaskPollers,
Identity: client.identity,
MetricsScope: client.metricsScope,
Logger: options.Logger,
Logger: client.logger,
EnableLoggingInReplay: options.EnableLoggingInReplay,
UserContext: backgroundActivityContext,
UserContextCancel: backgroundActivityContextCancel,
Expand Down
4 changes: 2 additions & 2 deletions internal/internal_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1458,6 +1458,7 @@ func TestWorkerOptionNonDefaults(t *testing.T) {
dataConverter: &defaultDataConverter{},
contextPropagators: nil,
tracer: nil,
logger: zap.NewNop(),
}

options := WorkerOptions{
Expand All @@ -1473,7 +1474,6 @@ func TestWorkerOptionNonDefaults(t *testing.T) {
WorkerActivitiesPerSecond: 99,
StickyScheduleToStartTimeout: 555 * time.Minute,
BackgroundActivityContext: context.Background(),
Logger: zap.NewNop(),
}

aggWorker := NewAggregatedWorker(client, taskList, options)
Expand All @@ -1495,7 +1495,7 @@ func TestWorkerOptionNonDefaults(t *testing.T) {
StickyScheduleToStartTimeout: options.StickyScheduleToStartTimeout,
DataConverter: client.dataConverter,
Tracer: client.tracer,
Logger: options.Logger,
Logger: client.logger,
MetricsScope: client.metricsScope,
Identity: client.identity,
}
Expand Down
4 changes: 2 additions & 2 deletions internal/internal_workers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,10 +327,10 @@ func (s *WorkersTestSuite) TestLongRunningDecisionTask() {
}).Times(2)

options := WorkerOptions{
Logger: zap.NewNop(),
DisableActivityWorker: true,
}
clientOptions := ClientOptions{
Logger: zap.NewNop(),
Identity: "test-worker-identity",
}

Expand Down Expand Up @@ -460,10 +460,10 @@ func (s *WorkersTestSuite) TestMultipleLocalActivities() {
}).Times(1)

options := WorkerOptions{
Logger: zap.NewNop(),
DisableActivityWorker: true,
}
clientOptions := ClientOptions{
Logger: zap.NewNop(),
Identity: "test-worker-identity",
}

Expand Down
35 changes: 20 additions & 15 deletions internal/internal_workflow_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pborman/uuid"
"github.com/uber-go/tally"
"go.uber.org/zap"

commonpb "go.temporal.io/temporal-proto/common"
eventpb "go.temporal.io/temporal-proto/event"
Expand Down Expand Up @@ -70,6 +71,7 @@ type (
connectionCloser io.Closer
namespace string
registry *registry
logger *zap.Logger
metricsScope *metrics.TaggedScope
identity string
dataConverter DataConverter
Expand All @@ -82,6 +84,7 @@ type (
workflowService workflowservice.WorkflowServiceClient
connectionCloser io.Closer
metricsScope tally.Scope
logger *zap.Logger
identity string
}

Expand Down Expand Up @@ -942,12 +945,13 @@ func (wc *WorkflowClient) DescribeTaskList(ctx context.Context, taskList string,
}

// CloseConnection closes underlying gRPC connection.
func (wc *WorkflowClient) CloseConnection() error {
func (wc *WorkflowClient) CloseConnection() {
if wc.connectionCloser == nil {
return nil
return
}
if err := wc.connectionCloser.Close(); err != nil {
wc.logger.Warn("unable to close connection", zap.Error(err))
}

return wc.connectionCloser.Close()
}

func (wc *WorkflowClient) getWorkflowHeader(ctx context.Context) *commonpb.Header {
Expand All @@ -966,13 +970,13 @@ func (wc *WorkflowClient) getWorkflowHeader(ctx context.Context) *commonpb.Heade
// - NamespaceAlreadyExistsError
// - BadRequestError
// - InternalServiceError
func (dc *namespaceClient) Register(ctx context.Context, request *workflowservice.RegisterNamespaceRequest) error {
func (nc *namespaceClient) Register(ctx context.Context, request *workflowservice.RegisterNamespaceRequest) error {
return backoff.Retry(ctx,
func() error {
tchCtx, cancel := newChannelContext(ctx)
defer cancel()
var err error
_, err = dc.workflowService.RegisterNamespace(tchCtx, request)
_, err = nc.workflowService.RegisterNamespace(tchCtx, request)
return err
}, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
}
Expand All @@ -985,7 +989,7 @@ func (dc *namespaceClient) Register(ctx context.Context, request *workflowservic
// - EntityNotExistsError
// - BadRequestError
// - InternalServiceError
func (dc *namespaceClient) Describe(ctx context.Context, name string) (*workflowservice.DescribeNamespaceResponse, error) {
func (nc *namespaceClient) Describe(ctx context.Context, name string) (*workflowservice.DescribeNamespaceResponse, error) {
request := &workflowservice.DescribeNamespaceRequest{
Name: name,
}
Expand All @@ -996,7 +1000,7 @@ func (dc *namespaceClient) Describe(ctx context.Context, name string) (*workflow
tchCtx, cancel := newChannelContext(ctx)
defer cancel()
var err error
response, err = dc.workflowService.DescribeNamespace(tchCtx, request)
response, err = nc.workflowService.DescribeNamespace(tchCtx, request)
return err
}, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
if err != nil {
Expand All @@ -1010,23 +1014,24 @@ func (dc *namespaceClient) Describe(ctx context.Context, name string) (*workflow
// - EntityNotExistsError
// - BadRequestError
// - InternalServiceError
func (dc *namespaceClient) Update(ctx context.Context, request *workflowservice.UpdateNamespaceRequest) error {
func (nc *namespaceClient) Update(ctx context.Context, request *workflowservice.UpdateNamespaceRequest) error {
return backoff.Retry(ctx,
func() error {
tchCtx, cancel := newChannelContext(ctx)
defer cancel()
_, err := dc.workflowService.UpdateNamespace(tchCtx, request)
_, err := nc.workflowService.UpdateNamespace(tchCtx, request)
return err
}, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
}

// CloseConnection closes underlying gRPC connection.
func (dc *namespaceClient) CloseConnection() error {
if dc.connectionCloser == nil {
return nil
func (nc *namespaceClient) CloseConnection() {
if nc.connectionCloser == nil {
return
}
if err := nc.connectionCloser.Close(); err != nil {
nc.logger.Warn("unable to close connection", zap.Error(err))
}

return dc.connectionCloser.Close()
}

func (iter *historyEventIteratorImpl) HasNext() bool {
Expand Down
26 changes: 10 additions & 16 deletions internal/internal_workflow_testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,18 @@ import (
"sync"
"time"

decisionpb "go.temporal.io/temporal-proto/decision"
tasklistpb "go.temporal.io/temporal-proto/tasklist"

"github.com/facebookgo/clock"
"github.com/golang/mock/gomock"
"github.com/opentracing/opentracing-go"
"github.com/robfig/cron"
"github.com/stretchr/testify/mock"
"github.com/uber-go/tally"

commonpb "go.temporal.io/temporal-proto/common"
decisionpb "go.temporal.io/temporal-proto/decision"
eventpb "go.temporal.io/temporal-proto/event"
executionpb "go.temporal.io/temporal-proto/execution"
"go.temporal.io/temporal-proto/serviceerror"
tasklistpb "go.temporal.io/temporal-proto/tasklist"
"go.temporal.io/temporal-proto/workflowservice"
"go.temporal.io/temporal-proto/workflowservicemock"
"go.uber.org/zap"
Expand All @@ -65,9 +63,9 @@ const (
workflowTypeNotSpecified = "workflow-type-not-specified"

// These are copied from service implementation
reservedTaskListPrefix = "/__temporal_sys/"
maxIDLengthLimit = 1000
maxWorkflowTimeoutSeconds = 60 * 24 * 365 * 10
reservedTaskListPrefix = "/__temporal_sys/"
maxIDLengthLimit = 1000
maxWorkflowTimeout = 24 * time.Hour * 365 * 10
)

type (
Expand Down Expand Up @@ -245,7 +243,7 @@ func newTestWorkflowEnvironmentImpl(s *WorkflowTestSuite, parentRegistry *regist
WorkflowType: WorkflowType{Name: workflowTypeNotSpecified},
TaskListName: defaultTestTaskList,

WorkflowExecutionTimeoutSeconds: maxWorkflowTimeoutSeconds,
WorkflowExecutionTimeoutSeconds: common.Int32Ceil(maxWorkflowTimeout.Seconds()),
WorkflowTaskTimeoutSeconds: 1,
},
registry: r,
Expand All @@ -256,7 +254,7 @@ func newTestWorkflowEnvironmentImpl(s *WorkflowTestSuite, parentRegistry *regist
doneChannel: make(chan struct{}),
workerStopChannel: make(chan struct{}),
dataConverter: getDefaultDataConverter(),
runTimeout: maxWorkflowTimeoutSeconds * time.Second,
runTimeout: maxWorkflowTimeout,
}

// move forward the mock clock to start time.
Expand Down Expand Up @@ -313,10 +311,6 @@ func newTestWorkflowEnvironmentImpl(s *WorkflowTestSuite, parentRegistry *regist

env.service = mockService

if env.workerOptions.Logger == nil {
env.workerOptions.Logger = env.logger
}

return env
}

Expand Down Expand Up @@ -456,7 +450,7 @@ func (env *testWorkflowEnvironmentImpl) executeWorkflowInternal(delayStart time.
wInfo.WorkflowRunTimeoutSeconds = common.Int32Ceil(env.runTimeout.Seconds())
}
if wInfo.WorkflowExecutionTimeoutSeconds == 0 {
wInfo.WorkflowExecutionTimeoutSeconds = maxWorkflowTimeoutSeconds
wInfo.WorkflowExecutionTimeoutSeconds = common.Int32Ceil(maxWorkflowTimeout.Seconds())
}
if wInfo.WorkflowTaskTimeoutSeconds == 0 {
wInfo.WorkflowTaskTimeoutSeconds = 1
Expand Down Expand Up @@ -1294,7 +1288,7 @@ func (env *testWorkflowEnvironmentImpl) ExecuteLocalActivity(params executeLocal
taskHandler := localActivityTaskHandler{
userContext: env.workerOptions.BackgroundActivityContext,
metricsScope: metrics.NewTaggedScope(env.metricsScope),
logger: env.workerOptions.Logger,
logger: env.logger,
dataConverter: env.dataConverter,
tracer: env.tracer,
contextPropagators: env.contextPropagators,
Expand Down Expand Up @@ -1719,7 +1713,7 @@ func (env *testWorkflowEnvironmentImpl) newTestActivityTaskHandler(taskList stri
TaskList: taskList,
Identity: env.identity,
MetricsScope: env.metricsScope,
Logger: env.workerOptions.Logger,
Logger: env.logger,
UserContext: env.workerOptions.BackgroundActivityContext,
DataConverter: dataConverter,
WorkerStopChannel: env.workerStopChannel,
Expand Down
6 changes: 0 additions & 6 deletions internal/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ package internal
import (
"context"
"time"

"go.uber.org/zap"
)

type (
Expand Down Expand Up @@ -97,10 +95,6 @@ type (
// default: 2
MaxConcurrentDecisionTaskPollers int

// Optional: Logger framework can use to log.
// default: default logger provided.
Logger *zap.Logger

// Optional: Enable logging in replay.
// In the workflow code you can use workflow.GetLogger(ctx) to write logs. By default, the logger will skip log
// entry during replay mode so you won't see duplicate logs. This option will enable the logging in replay mode.
Expand Down
11 changes: 3 additions & 8 deletions mocks/Client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 3 additions & 8 deletions mocks/NamespaceClient.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 6313e48

Please sign in to comment.