From 3db2adb7576ed594759343b854fce6148797557d Mon Sep 17 00:00:00 2001 From: Alex Shtin Date: Sat, 28 Mar 2020 11:05:08 -0700 Subject: [PATCH] Rename domain to namespace. (#81) Due to circular runtime dependency with the service integration tests cannot pass with backward incompatible change like this one. --- client/client.go | 52 +++++----- evictiontest/workflow_cache_eviction_test.go | 2 +- go.mod | 2 +- go.sum | 4 +- internal/activity.go | 8 +- internal/client.go | 72 +++++++------- internal/common/metrics/interceptor.go | 2 +- internal/common/metrics/operationScope.go | 2 +- internal/common/metrics/service_wrapper.go | 40 ++++---- .../common/metrics/service_wrapper_test.go | 24 ++--- internal/internal_activity.go | 2 +- internal/internal_decision_state_machine.go | 10 +- .../internal_decision_state_machine_test.go | 24 ++--- internal/internal_event_handlers.go | 14 +-- internal/internal_logging_tags.go | 2 +- internal/internal_retry.go | 4 +- internal/internal_task_handlers.go | 36 +++---- internal/internal_task_handlers_test.go | 96 +++++++++---------- internal/internal_task_pollers.go | 32 +++---- internal/internal_worker.go | 62 ++++++------ internal/internal_worker_base.go | 6 +- internal/internal_worker_interfaces_test.go | 14 +-- internal/internal_worker_test.go | 64 ++++++------- internal/internal_workers_test.go | 22 ++--- internal/internal_workflow.go | 10 +- internal/internal_workflow_client.go | 96 +++++++++---------- internal/internal_workflow_client_test.go | 28 +++--- internal/internal_workflow_testsuite.go | 29 +++--- internal/internal_workflow_testsuite_test.go | 26 ++--- internal/session.go | 2 +- internal/testdata/parentWF.json | 10 +- internal/worker.go | 10 +- internal/workflow.go | 42 ++++---- internal/workflow_testsuite.go | 16 ++-- mocks/Client.go | 16 ++-- mocks/{DomainClient.go => NamespaceClient.go} | 22 ++--- mocks/verify.go | 2 +- test/activity_test.go | 6 +- test/integration_test.go | 30 +++--- test/workflow_test.go | 8 +- worker/worker.go | 4 +- workflow/doc.go | 4 +- workflow/workflow.go | 8 +- workflow/workflow_options.go | 6 +- 44 files changed, 485 insertions(+), 486 deletions(-) rename mocks/{DomainClient.go => NamespaceClient.go} (76%) diff --git a/client/client.go b/client/client.go index f26dd65c9..223d76ff6 100644 --- a/client/client.go +++ b/client/client.go @@ -39,8 +39,8 @@ const ( // DefaultHostPort is the host:port which is used if not passed with options. DefaultHostPort = internal.LocalHostPort - // DefaultDomainName is the domain name which is used if not passed with options. - DefaultDomainName = internal.DefaultDomainName + // DefaultNamespace is the namespace name which is used if not passed with options. + DefaultNamespace = internal.DefaultNamespace // QueryTypeStackTrace is the build in query type for Client.QueryWorkflow() call. Use this query type to get the call // stack of the workflow. The result will be a string encoded in the encoded.Value. @@ -92,7 +92,7 @@ type ( // or // ExecuteWorkflow(ctx, options, workflowExecuteFn, arg1, arg2, arg3) // The errors it can return: - // - EntityNotExistsError, if domain does not exists + // - EntityNotExistsError, if namespace does not exists // - BadRequestError // - InternalServiceError // @@ -144,7 +144,7 @@ type ( // Note: options.WorkflowIDReusePolicy is default to WorkflowIDReusePolicyAllowDuplicate in this API; // while in StartWorkflow/ExecuteWorkflow APIs it is default to WorkflowIdReusePolicyAllowDuplicateFailedOnly. // The errors it can return: - // - EntityNotExistsError, if domain does not exist + // - EntityNotExistsError, if namespace does not exist // - BadRequestError // - InternalServiceError SignalWithStartWorkflow(ctx context.Context, workflowID string, signalName string, signalArg interface{}, @@ -214,12 +214,12 @@ type ( // completed event will be reported; if err is CanceledError, activity task cancelled event will be reported; otherwise, // activity task failed event will be reported. // An activity implementation should use activityID provided in ActivityOption to use for completion. - // domain name, workflowID, activityID are required, runID is optional. + // namespace name, workflowID, activityID are required, runID is optional. // The errors it can return: // - ErrorWithDetails // - TimeoutError // - CanceledError - CompleteActivityByID(ctx context.Context, domain, workflowID, runID, activityID string, result interface{}, err error) error + CompleteActivityByID(ctx context.Context, namespace, workflowID, runID, activityID string, result interface{}, err error) error // RecordActivityHeartbeat records heartbeat for an activity. // taskToken - is the value of the binary "TaskToken" field of the "ActivityInfo" struct retrieved inside the activity. @@ -234,7 +234,7 @@ type ( // The errors it can return: // - EntityNotExistsError // - InternalServiceError - RecordActivityHeartbeatByID(ctx context.Context, domain, workflowID, runID, activityID string, details ...interface{}) error + RecordActivityHeartbeatByID(ctx context.Context, namespace, workflowID, runID, activityID string, details ...interface{}) error // ListClosedWorkflow gets closed workflow executions based on request filters. // Retrieved workflow executions are sorted by start time in descending order. @@ -270,9 +270,9 @@ type ( ListWorkflow(ctx context.Context, request *workflowservice.ListWorkflowExecutionsRequest) (*workflowservice.ListWorkflowExecutionsResponse, error) // ListArchivedWorkflow gets archived workflow executions based on query. This API will return BadRequest if Temporal - // cluster or target domain is not configured for visibility archival or read is not enabled. The query is basically the SQL WHERE clause. + // cluster or target namespace is not configured for visibility archival or read is not enabled. The query is basically the SQL WHERE clause. // However, different visibility archivers have different limitations on the query. Please check the documentation of the visibility archiver used - // by your domain to see what kind of queries are accept and whether retrieved workflow executions are ordered or not. + // by your namespace to see what kind of queries are accept and whether retrieved workflow executions are ordered or not. // The errors it can return: // - BadRequestError // - InternalServiceError @@ -352,32 +352,32 @@ type ( CloseConnection() error } - // DomainClient is the client for managing operations on the domain. - // CLI, tools, ... can use this layer to manager operations on domain. - DomainClient interface { - // Register a domain with temporal server + // NamespaceClient is the client for managing operations on the namespace. + // CLI, tools, ... can use this layer to manager operations on namespace. + NamespaceClient interface { + // Register a namespace with temporal server // The errors it can throw: - // - DomainAlreadyExistsError + // - NamespaceAlreadyExistsError // - BadRequestError // - InternalServiceError - Register(ctx context.Context, request *workflowservice.RegisterDomainRequest) error + Register(ctx context.Context, request *workflowservice.RegisterNamespaceRequest) error - // Describe a domain. The domain has 3 part of information - // DomainInfo - Which has Name, Status, Description, Owner Email - // DomainConfiguration - Configuration like Workflow Execution Retention Period In Days, Whether to emit metrics. + // Describe a namespace. The namespace has 3 part of information + // NamespaceInfo - Which has Name, Status, Description, Owner Email + // NamespaceConfiguration - Configuration like Workflow Execution Retention Period In Days, Whether to emit metrics. // ReplicationConfiguration - replication config like clusters and active cluster name // The errors it can throw: // - EntityNotExistsError // - BadRequestError // - InternalServiceError - Describe(ctx context.Context, name string) (*workflowservice.DescribeDomainResponse, error) + Describe(ctx context.Context, name string) (*workflowservice.DescribeNamespaceResponse, error) - // Update a domain. + // Update a namespace. // The errors it can throw: // - EntityNotExistsError // - BadRequestError // - InternalServiceError - Update(ctx context.Context, request *workflowservice.UpdateDomainRequest) error + Update(ctx context.Context, request *workflowservice.UpdateNamespaceRequest) error // CloseConnection closes underlying gRPC connection. CloseConnection() error @@ -412,16 +412,16 @@ func NewClient(options Options) (Client, error) { return internal.NewClient(options) } -// NewDomainClient creates an instance of a domain client, to manage lifecycle of domains. -func NewDomainClient(options Options) (DomainClient, error) { - return internal.NewDomainClient(options) +// NewNamespaceClient creates an instance of a namespace client, to manage lifecycle of namespaces. +func NewNamespaceClient(options Options) (NamespaceClient, error) { + return internal.NewNamespaceClient(options) } // make sure if new methods are added to internal.Client they are also added to public Client. var _ Client = internal.Client(nil) var _ internal.Client = Client(nil) -var _ DomainClient = internal.DomainClient(nil) -var _ internal.DomainClient = DomainClient(nil) +var _ NamespaceClient = internal.NamespaceClient(nil) +var _ internal.NamespaceClient = NamespaceClient(nil) // NewValue creates a new encoded.Value which can be used to decode binary data returned by Temporal. For example: // User had Activity.RecordHeartbeat(ctx, "my-heartbeat") and then got response from calling Client.DescribeWorkflowExecution. diff --git a/evictiontest/workflow_cache_eviction_test.go b/evictiontest/workflow_cache_eviction_test.go index a6c229053..64962119b 100644 --- a/evictiontest/workflow_cache_eviction_test.go +++ b/evictiontest/workflow_cache_eviction_test.go @@ -139,7 +139,7 @@ func (s *CacheEvictionSuite) TestResetStickyOnEviction() { cacheSize := 5 internal.SetStickyWorkflowCacheSize(cacheSize) // once for workflow worker because we disable activity worker - s.service.EXPECT().DescribeDomain(gomock.Any(), gomock.Any()).Return(nil, nil).Times(1) + s.service.EXPECT().DescribeNamespace(gomock.Any(), gomock.Any()).Return(nil, nil).Times(1) // feed our worker exactly *cacheSize* "legit" decision tasks // these are handcrafted decision tasks that are not blatantly obviously mocks // the goal is to trick our worker into thinking they are real so it diff --git a/go.mod b/go.mod index a73b5de63..713e25901 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/uber-go/tally v3.3.15+incompatible github.com/uber/jaeger-client-go v2.22.1+incompatible github.com/uber/jaeger-lib v2.2.0+incompatible // indirect - go.temporal.io/temporal-proto v0.20.1 + go.temporal.io/temporal-proto v0.20.2 go.uber.org/atomic v1.6.0 go.uber.org/goleak v1.0.0 go.uber.org/zap v1.14.1 diff --git a/go.sum b/go.sum index 239178af2..3669871b5 100644 --- a/go.sum +++ b/go.sum @@ -91,8 +91,8 @@ github.com/uber/jaeger-client-go v2.22.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMW github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/GfSYVCjK7dyaw= github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -go.temporal.io/temporal-proto v0.20.1 h1:hiu9DbQVYOZDp3IVkn0wCahIN6iQR429ctVXcUCARBw= -go.temporal.io/temporal-proto v0.20.1/go.mod h1:Lv8L8YBpbp0Z7V5nbvw5UD0j7x0isebhCOIDLkBqn6s= +go.temporal.io/temporal-proto v0.20.2 h1:ll+VJaxyR4xzZWClT+CIcm4fQdzEdcBGf4Yksq4EVxI= +go.temporal.io/temporal-proto v0.20.2/go.mod h1:Lv8L8YBpbp0Z7V5nbvw5UD0j7x0isebhCOIDLkBqn6s= go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/goleak v1.0.0 h1:qsup4IcBdlmsnGfqyLl4Ntn3C2XCCuKAE7DwHpScyUo= diff --git a/internal/activity.go b/internal/activity.go index feab7a706..72fda60e4 100644 --- a/internal/activity.go +++ b/internal/activity.go @@ -42,7 +42,7 @@ type ( ActivityInfo struct { TaskToken []byte WorkflowType *WorkflowType - WorkflowDomain string + WorkflowNamespace string WorkflowExecution WorkflowExecution ActivityID string ActivityType ActivityType @@ -134,7 +134,7 @@ func GetActivityInfo(ctx context.Context) ActivityInfo { TaskList: env.taskList, Attempt: env.attempt, WorkflowType: env.workflowType, - WorkflowDomain: env.workflowDomain, + WorkflowNamespace: env.workflowNamespace, } } @@ -215,7 +215,7 @@ type ServiceInvoker interface { // Returns ActivityTaskCanceledError if activity is cancelled Heartbeat(details []byte) error Close(flushBufferedHeartbeat bool) - GetClient(domain string, options ClientOptions) Client + GetClient(namespace string, options ClientOptions) Client } // WithActivityTask adds activity specific information into context. @@ -276,7 +276,7 @@ func WithActivityTask( workflowType: &WorkflowType{ Name: task.WorkflowType.Name, }, - workflowDomain: task.WorkflowDomain, + workflowNamespace: task.WorkflowNamespace, workerStopChannel: workerStopChannel, contextPropagators: contextPropagators, tracer: tracer, diff --git a/internal/client.go b/internal/client.go index b4abe636c..03593ed60 100644 --- a/internal/client.go +++ b/internal/client.go @@ -37,8 +37,8 @@ import ( ) const ( - // DefaultDomainName is the domain name which is used if not passed with options. - DefaultDomainName = "default" + // DefaultNamespace is the namespace name which is used if not passed with options. + DefaultNamespace = "default" // QueryTypeStackTrace is the build in query type for Client.QueryWorkflow() call. Use this query type to get the call // stack of the workflow. The result will be a string encoded in the EncodedValue. @@ -60,7 +60,7 @@ type ( // or // ExecuteWorkflow(ctx, options, workflowExecuteFn, arg1, arg2, arg3) // The errors it can return: - // - EntityNotExistsError, if domain does not exists + // - EntityNotExistsError, if namespace does not exists // - BadRequestError // - InternalServiceError // @@ -113,7 +113,7 @@ type ( // Note: options.WorkflowIDReusePolicy is default to WorkflowIDReusePolicyAllowDuplicate in this API; // while in StartWorkflow/ExecuteWorkflow APIs it is default to WorkflowIdReusePolicyAllowDuplicateFailedOnly. // The errors it can return: - // - EntityNotExistsError, if domain does not exist + // - EntityNotExistsError, if namespace does not exist // - BadRequestError // - InternalServiceError SignalWithStartWorkflow(ctx context.Context, workflowID string, signalName string, signalArg interface{}, @@ -181,12 +181,12 @@ type ( // completed event will be reported; if err is CanceledError, activity task cancelled event will be reported; otherwise, // activity task failed event will be reported. // An activity implementation should use activityID provided in ActivityOption to use for completion. - // domain name, workflowID, activityID are required, runID is optional. + // namespace name, workflowID, activityID are required, runID is optional. // The errors it can return: // - ErrorWithDetails // - TimeoutError // - CanceledError - CompleteActivityByID(ctx context.Context, domain, workflowID, runID, activityID string, result interface{}, err error) error + CompleteActivityByID(ctx context.Context, namespace, workflowID, runID, activityID string, result interface{}, err error) error // RecordActivityHeartbeat records heartbeat for an activity. // details - is the progress you want to record along with heart beat for this activity. @@ -200,7 +200,7 @@ type ( // The errors it can return: // - EntityNotExistsError // - InternalServiceError - RecordActivityHeartbeatByID(ctx context.Context, domain, workflowID, runID, activityID string, details ...interface{}) error + RecordActivityHeartbeatByID(ctx context.Context, namespace, workflowID, runID, activityID string, details ...interface{}) error // ListClosedWorkflow gets closed workflow executions based on request filters // The errors it can return: @@ -230,9 +230,9 @@ type ( ListWorkflow(ctx context.Context, request *workflowservice.ListWorkflowExecutionsRequest) (*workflowservice.ListWorkflowExecutionsResponse, error) // ListArchivedWorkflow gets archived workflow executions based on query. This API will return BadRequest if Temporal - // cluster or target domain is not configured for visibility archival or read is not enabled. The query is basically the SQL WHERE clause. + // cluster or target namespace is not configured for visibility archival or read is not enabled. The query is basically the SQL WHERE clause. // However, different visibility archivers have different limitations on the query. Please check the documentation of the visibility archiver used - // by your domain to see what kind of queries are accept and whether retrieved workflow executions are ordered or not. + // by your namespace to see what kind of queries are accept and whether retrieved workflow executions are ordered or not. // The errors it can return: // - BadRequestError // - InternalServiceError @@ -317,9 +317,9 @@ type ( // default: localhost:7233 HostPort string - // Optional: To set the domain name for this client to work with. + // Optional: To set the namespace name for this client to work with. // default: default - DomainName string + Namespace string // Optional: Metrics to be reported. // To ensure metrics are compatible with prometheus make sure to create tally scope with sanitizer options set. @@ -443,7 +443,7 @@ type ( // history only when the activity completes or "finally" timeouts/fails. And the started event only records the last // started time. Because of that, to check an activity has started or not, you cannot rely on history events. Instead, // you can use CLI to describe the workflow to see the status of the activity: - // temporal --do wf desc -w + // temporal --do wf desc -w RetryPolicy struct { // Backoff interval for the first retry. If coefficient is 1.0 then it is used for all retries. // Required, no default value. @@ -476,32 +476,32 @@ type ( NonRetriableErrorReasons []string } - // DomainClient is the client for managing operations on the domain. - // CLI, tools, ... can use this layer to manager operations on domain. - DomainClient interface { - // Register a domain with temporal server + // NamespaceClient is the client for managing operations on the namespace. + // CLI, tools, ... can use this layer to manager operations on namespace. + NamespaceClient interface { + // Register a namespace with temporal server // The errors it can throw: - // - DomainAlreadyExistsError + // - NamespaceAlreadyExistsError // - BadRequestError // - InternalServiceError - Register(ctx context.Context, request *workflowservice.RegisterDomainRequest) error + Register(ctx context.Context, request *workflowservice.RegisterNamespaceRequest) error - // Describe a domain. The domain has 3 part of information - // DomainInfo - Which has Name, Status, Description, Owner Email - // DomainConfiguration - Configuration like Workflow Execution Retention Period In Days, Whether to emit metrics. + // Describe a namespace. The namespace has 3 part of information + // NamespaceInfo - Which has Name, Status, Description, Owner Email + // NamespaceConfiguration - Configuration like Workflow Execution Retention Period In Days, Whether to emit metrics. // ReplicationConfiguration - replication config like clusters and active cluster name // The errors it can throw: // - EntityNotExistsError // - BadRequestError // - InternalServiceError - Describe(ctx context.Context, name string) (*workflowservice.DescribeDomainResponse, error) + Describe(ctx context.Context, name string) (*workflowservice.DescribeNamespaceResponse, error) - // Update a domain. + // Update a namespace. // The errors it can throw: // - EntityNotExistsError // - BadRequestError // - InternalServiceError - Update(ctx context.Context, request *workflowservice.UpdateDomainRequest) error + Update(ctx context.Context, request *workflowservice.UpdateNamespaceRequest) error // CloseConnection closes underlying gRPC connection. CloseConnection() error @@ -539,11 +539,11 @@ const ( // NewClient creates an instance of a workflow client func NewClient(options ClientOptions) (Client, error) { - if len(options.DomainName) == 0 { - options.DomainName = DefaultDomainName + if len(options.Namespace) == 0 { + options.Namespace = DefaultNamespace } - options.MetricsScope = tagScope(options.MetricsScope, tagDomain, options.DomainName, clientImplHeaderName, clientImplHeaderValue) + options.MetricsScope = tagScope(options.MetricsScope, tagNamespace, options.Namespace, clientImplHeaderName, clientImplHeaderValue) if len(options.HostPort) == 0 { options.HostPort = LocalHostPort @@ -568,9 +568,9 @@ func NewClient(options ClientOptions) (Client, error) { // NewServiceClient creates workflow client from workflowservice.WorkflowServiceClient. Must be used internally in unit tests only. func NewServiceClient(workflowServiceClient workflowservice.WorkflowServiceClient, connectionCloser io.Closer, options ClientOptions) *WorkflowClient { - // DomainName can be empty in unit tests. - if len(options.DomainName) == 0 { - options.DomainName = DefaultDomainName + // Namespace can be empty in unit tests. + if len(options.Namespace) == 0 { + options.Namespace = DefaultNamespace } if len(options.Identity) == 0 { @@ -590,7 +590,7 @@ func NewServiceClient(workflowServiceClient workflowservice.WorkflowServiceClien return &WorkflowClient{ workflowService: workflowServiceClient, connectionCloser: connectionCloser, - domain: options.DomainName, + namespace: options.Namespace, registry: newRegistry(), metricsScope: metrics.NewTaggedScope(options.MetricsScope), identity: options.Identity, @@ -600,8 +600,8 @@ func NewServiceClient(workflowServiceClient workflowservice.WorkflowServiceClien } } -// NewDomainClient creates an instance of a domain client, to manager lifecycle of domains. -func NewDomainClient(options ClientOptions) (DomainClient, error) { +// NewNamespaceClient creates an instance of a namespace client, to manager lifecycle of namespaces. +func NewNamespaceClient(options ClientOptions) (NamespaceClient, error) { options.MetricsScope = tagScope(options.MetricsScope, clientImplHeaderName, clientImplHeaderValue) if len(options.HostPort) == 0 { @@ -622,15 +622,15 @@ func NewDomainClient(options ClientOptions) (DomainClient, error) { return nil, err } - return newDomainServiceClient(workflowservice.NewWorkflowServiceClient(connection), connection, options), nil + return newNamespaceServiceClient(workflowservice.NewWorkflowServiceClient(connection), connection, options), nil } -func newDomainServiceClient(workflowServiceClient workflowservice.WorkflowServiceClient, clientConn *grpc.ClientConn, options ClientOptions) DomainClient { +func newNamespaceServiceClient(workflowServiceClient workflowservice.WorkflowServiceClient, clientConn *grpc.ClientConn, options ClientOptions) NamespaceClient { if len(options.Identity) == 0 { options.Identity = getWorkerIdentity("") } - return &domainClient{ + return &namespaceClient{ workflowService: workflowServiceClient, connectionCloser: clientConn, metricsScope: options.MetricsScope, diff --git a/internal/common/metrics/interceptor.go b/internal/common/metrics/interceptor.go index 2ee057b60..293393247 100644 --- a/internal/common/metrics/interceptor.go +++ b/internal/common/metrics/interceptor.go @@ -58,7 +58,7 @@ func (w *scopeInterceptor) getOperationScope(method string) *operationScope { } func (w *scopeInterceptor) convertMethodToScope(method string) string { - // method is something like "/workflowservice.WorkflowService/RegisterDomain" + // method is something like "/workflowservice.WorkflowService/RegisterNamespace" methodStart := strings.LastIndex(method, "/") + 1 return TemporalMetricsPrefix + method[methodStart:] } diff --git a/internal/common/metrics/operationScope.go b/internal/common/metrics/operationScope.go index 97705d8de..241c7f428 100644 --- a/internal/common/metrics/operationScope.go +++ b/internal/common/metrics/operationScope.go @@ -40,7 +40,7 @@ func (s *operationScope) handleError(err error) { switch err.(type) { case *serviceerror.NotFound, *serviceerror.InvalidArgument, - *serviceerror.DomainAlreadyExists, + *serviceerror.NamespaceAlreadyExists, *serviceerror.WorkflowExecutionAlreadyStarted, *serviceerror.QueryFailed: s.scope.Counter(TemporalInvalidRequest).Inc(1) diff --git a/internal/common/metrics/service_wrapper.go b/internal/common/metrics/service_wrapper.go index 4507c1e0f..a4948ff39 100644 --- a/internal/common/metrics/service_wrapper.go +++ b/internal/common/metrics/service_wrapper.go @@ -40,9 +40,9 @@ type ( ) const ( - scopeNameDeprecateDomain = TemporalMetricsPrefix + "DeprecateDomain" - scopeNameDescribeDomain = TemporalMetricsPrefix + "DescribeDomain" - scopeNameListDomains = TemporalMetricsPrefix + "ListDomains" + scopeNameDeprecateNamespace = TemporalMetricsPrefix + "DeprecateNamespace" + scopeNameDescribeNamespace = TemporalMetricsPrefix + "DescribeNamespace" + scopeNameListNamespaces = TemporalMetricsPrefix + "ListNamespaces" scopeNameGetWorkflowExecutionHistory = TemporalMetricsPrefix + "GetWorkflowExecutionHistory" scopeNamePollForWorkflowExecutionRawHistory = TemporalMetricsPrefix + "PollForWorkflowExecutionRawHistory" scopeNameListClosedWorkflowExecutions = TemporalMetricsPrefix + "ListClosedWorkflowExecutions" @@ -55,7 +55,7 @@ const ( scopeNamePollForDecisionTask = TemporalMetricsPrefix + "PollForDecisionTask" scopeNameRecordActivityTaskHeartbeat = TemporalMetricsPrefix + "RecordActivityTaskHeartbeat" scopeNameRecordActivityTaskHeartbeatByID = TemporalMetricsPrefix + "RecordActivityTaskHeartbeatByID" - scopeNameRegisterDomain = TemporalMetricsPrefix + "RegisterDomain" + scopeNameRegisterNamespace = TemporalMetricsPrefix + "RegisterNamespace" scopeNameRequestCancelWorkflowExecution = TemporalMetricsPrefix + "RequestCancelWorkflowExecution" scopeNameRespondActivityTaskCanceled = TemporalMetricsPrefix + "RespondActivityTaskCanceled" scopeNameRespondActivityTaskCompleted = TemporalMetricsPrefix + "RespondActivityTaskCompleted" @@ -70,7 +70,7 @@ const ( scopeNameStartWorkflowExecution = TemporalMetricsPrefix + "StartWorkflowExecution" scopeNameTerminateWorkflowExecution = TemporalMetricsPrefix + "TerminateWorkflowExecution" scopeNameResetWorkflowExecution = TemporalMetricsPrefix + "ResetWorkflowExecution" - scopeNameUpdateDomain = TemporalMetricsPrefix + "UpdateDomain" + scopeNameUpdateNamespace = TemporalMetricsPrefix + "UpdateNamespace" scopeNameQueryWorkflow = TemporalMetricsPrefix + "QueryWorkflow" scopeNameDescribeTaskList = TemporalMetricsPrefix + "DescribeTaskList" scopeNameRespondQueryTaskCompleted = TemporalMetricsPrefix + "RespondQueryTaskCompleted" @@ -111,23 +111,23 @@ func (w *workflowServiceMetricsWrapper) getOperationScope(scopeName string) *ope return &operationScope{scope: scope, startTime: time.Now()} } -func (w *workflowServiceMetricsWrapper) DeprecateDomain(ctx context.Context, request *workflowservice.DeprecateDomainRequest, opts ...grpc.CallOption) (*workflowservice.DeprecateDomainResponse, error) { - scope := w.getOperationScope(scopeNameDeprecateDomain) - result, err := w.service.DeprecateDomain(ctx, request, opts...) +func (w *workflowServiceMetricsWrapper) DeprecateNamespace(ctx context.Context, request *workflowservice.DeprecateNamespaceRequest, opts ...grpc.CallOption) (*workflowservice.DeprecateNamespaceResponse, error) { + scope := w.getOperationScope(scopeNameDeprecateNamespace) + result, err := w.service.DeprecateNamespace(ctx, request, opts...) scope.handleError(err) return result, err } -func (w *workflowServiceMetricsWrapper) ListDomains(ctx context.Context, request *workflowservice.ListDomainsRequest, opts ...grpc.CallOption) (*workflowservice.ListDomainsResponse, error) { - scope := w.getOperationScope(scopeNameListDomains) - result, err := w.service.ListDomains(ctx, request, opts...) +func (w *workflowServiceMetricsWrapper) ListNamespaces(ctx context.Context, request *workflowservice.ListNamespacesRequest, opts ...grpc.CallOption) (*workflowservice.ListNamespacesResponse, error) { + scope := w.getOperationScope(scopeNameListNamespaces) + result, err := w.service.ListNamespaces(ctx, request, opts...) scope.handleError(err) return result, err } -func (w *workflowServiceMetricsWrapper) DescribeDomain(ctx context.Context, request *workflowservice.DescribeDomainRequest, opts ...grpc.CallOption) (*workflowservice.DescribeDomainResponse, error) { - scope := w.getOperationScope(scopeNameDescribeDomain) - result, err := w.service.DescribeDomain(ctx, request, opts...) +func (w *workflowServiceMetricsWrapper) DescribeNamespace(ctx context.Context, request *workflowservice.DescribeNamespaceRequest, opts ...grpc.CallOption) (*workflowservice.DescribeNamespaceResponse, error) { + scope := w.getOperationScope(scopeNameDescribeNamespace) + result, err := w.service.DescribeNamespace(ctx, request, opts...) scope.handleError(err) return result, err } @@ -223,9 +223,9 @@ func (w *workflowServiceMetricsWrapper) RecordActivityTaskHeartbeatByID(ctx cont return result, err } -func (w *workflowServiceMetricsWrapper) RegisterDomain(ctx context.Context, request *workflowservice.RegisterDomainRequest, opts ...grpc.CallOption) (*workflowservice.RegisterDomainResponse, error) { - scope := w.getOperationScope(scopeNameRegisterDomain) - result, err := w.service.RegisterDomain(ctx, request, opts...) +func (w *workflowServiceMetricsWrapper) RegisterNamespace(ctx context.Context, request *workflowservice.RegisterNamespaceRequest, opts ...grpc.CallOption) (*workflowservice.RegisterNamespaceResponse, error) { + scope := w.getOperationScope(scopeNameRegisterNamespace) + result, err := w.service.RegisterNamespace(ctx, request, opts...) scope.handleError(err) return result, err } @@ -328,9 +328,9 @@ func (w *workflowServiceMetricsWrapper) ResetWorkflowExecution(ctx context.Conte return result, err } -func (w *workflowServiceMetricsWrapper) UpdateDomain(ctx context.Context, request *workflowservice.UpdateDomainRequest, opts ...grpc.CallOption) (*workflowservice.UpdateDomainResponse, error) { - scope := w.getOperationScope(scopeNameUpdateDomain) - result, err := w.service.UpdateDomain(ctx, request, opts...) +func (w *workflowServiceMetricsWrapper) UpdateNamespace(ctx context.Context, request *workflowservice.UpdateNamespaceRequest, opts ...grpc.CallOption) (*workflowservice.UpdateNamespaceResponse, error) { + scope := w.getOperationScope(scopeNameUpdateNamespace) + result, err := w.service.UpdateNamespace(ctx, request, opts...) scope.handleError(err) return result, err } diff --git a/internal/common/metrics/service_wrapper_test.go b/internal/common/metrics/service_wrapper_test.go index 23934c36f..ea370da25 100644 --- a/internal/common/metrics/service_wrapper_test.go +++ b/internal/common/metrics/service_wrapper_test.go @@ -69,15 +69,15 @@ func Test_Wrapper(t *testing.T) { ctx, _ := context.WithTimeout(context.Background(), time.Minute) tests := []testCase{ // one case for each service call - {"DeprecateDomain", []interface{}{ctx, &workflowservice.DeprecateDomainRequest{}}, []interface{}{&workflowservice.DeprecateDomainResponse{}, nil}, []string{TemporalRequest}}, - {"DescribeDomain", []interface{}{ctx, &workflowservice.DescribeDomainRequest{}}, []interface{}{&workflowservice.DescribeDomainResponse{}, nil}, []string{TemporalRequest}}, + {"DeprecateNamespace", []interface{}{ctx, &workflowservice.DeprecateNamespaceRequest{}}, []interface{}{&workflowservice.DeprecateNamespaceResponse{}, nil}, []string{TemporalRequest}}, + {"DescribeNamespace", []interface{}{ctx, &workflowservice.DescribeNamespaceRequest{}}, []interface{}{&workflowservice.DescribeNamespaceResponse{}, nil}, []string{TemporalRequest}}, {"GetWorkflowExecutionHistory", []interface{}{ctx, &workflowservice.GetWorkflowExecutionHistoryRequest{}}, []interface{}{&workflowservice.GetWorkflowExecutionHistoryResponse{}, nil}, []string{TemporalRequest}}, {"ListClosedWorkflowExecutions", []interface{}{ctx, &workflowservice.ListClosedWorkflowExecutionsRequest{}}, []interface{}{&workflowservice.ListClosedWorkflowExecutionsResponse{}, nil}, []string{TemporalRequest}}, {"ListOpenWorkflowExecutions", []interface{}{ctx, &workflowservice.ListOpenWorkflowExecutionsRequest{}}, []interface{}{&workflowservice.ListOpenWorkflowExecutionsResponse{}, nil}, []string{TemporalRequest}}, {"PollForActivityTask", []interface{}{ctx, &workflowservice.PollForActivityTaskRequest{}}, []interface{}{&workflowservice.PollForActivityTaskResponse{}, nil}, []string{TemporalRequest}}, {"PollForDecisionTask", []interface{}{ctx, &workflowservice.PollForDecisionTaskRequest{}}, []interface{}{&workflowservice.PollForDecisionTaskResponse{}, nil}, []string{TemporalRequest}}, {"RecordActivityTaskHeartbeat", []interface{}{ctx, &workflowservice.RecordActivityTaskHeartbeatRequest{}}, []interface{}{&workflowservice.RecordActivityTaskHeartbeatResponse{}, nil}, []string{TemporalRequest}}, - {"RegisterDomain", []interface{}{ctx, &workflowservice.RegisterDomainRequest{}}, []interface{}{&workflowservice.RegisterDomainResponse{}, nil}, []string{TemporalRequest}}, + {"RegisterNamespace", []interface{}{ctx, &workflowservice.RegisterNamespaceRequest{}}, []interface{}{&workflowservice.RegisterNamespaceResponse{}, nil}, []string{TemporalRequest}}, {"RequestCancelWorkflowExecution", []interface{}{ctx, &workflowservice.RequestCancelWorkflowExecutionRequest{}}, []interface{}{&workflowservice.RequestCancelWorkflowExecutionResponse{}, nil}, []string{TemporalRequest}}, {"RespondActivityTaskCanceled", []interface{}{ctx, &workflowservice.RespondActivityTaskCanceledRequest{}}, []interface{}{&workflowservice.RespondActivityTaskCanceledResponse{}, nil}, []string{TemporalRequest}}, {"RespondActivityTaskCompleted", []interface{}{ctx, &workflowservice.RespondActivityTaskCompletedRequest{}}, []interface{}{&workflowservice.RespondActivityTaskCompletedResponse{}, nil}, []string{TemporalRequest}}, @@ -90,7 +90,7 @@ func Test_Wrapper(t *testing.T) { {"StartWorkflowExecution", []interface{}{ctx, &workflowservice.StartWorkflowExecutionRequest{}}, []interface{}{&workflowservice.StartWorkflowExecutionResponse{}, nil}, []string{TemporalRequest}}, {"TerminateWorkflowExecution", []interface{}{ctx, &workflowservice.TerminateWorkflowExecutionRequest{}}, []interface{}{&workflowservice.TerminateWorkflowExecutionResponse{}, nil}, []string{TemporalRequest}}, {"ResetWorkflowExecution", []interface{}{ctx, &workflowservice.ResetWorkflowExecutionRequest{}}, []interface{}{&workflowservice.ResetWorkflowExecutionResponse{}, nil}, []string{TemporalRequest}}, - {"UpdateDomain", []interface{}{ctx, &workflowservice.UpdateDomainRequest{}}, []interface{}{&workflowservice.UpdateDomainResponse{}, nil}, []string{TemporalRequest}}, + {"UpdateNamespace", []interface{}{ctx, &workflowservice.UpdateNamespaceRequest{}}, []interface{}{&workflowservice.UpdateNamespaceResponse{}, nil}, []string{TemporalRequest}}, // one case of invalid request {"PollForActivityTask", []interface{}{ctx, &workflowservice.PollForActivityTaskRequest{}}, []interface{}{nil, serviceerror.NewNotFound("")}, []string{TemporalRequest, TemporalInvalidRequest}}, // one case of server error @@ -121,10 +121,10 @@ func runTest( mockService, wrapperService, closer, reporter := serviceFunc(t) switch test.serviceMethod { - case "DeprecateDomain": - mockService.EXPECT().DeprecateDomain(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...) - case "DescribeDomain": - mockService.EXPECT().DescribeDomain(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...) + case "DeprecateNamespace": + mockService.EXPECT().DeprecateNamespace(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...) + case "DescribeNamespace": + mockService.EXPECT().DescribeNamespace(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...) case "GetWorkflowExecutionHistory": mockService.EXPECT().GetWorkflowExecutionHistory(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...) case "ListClosedWorkflowExecutions": @@ -139,8 +139,8 @@ func runTest( mockService.EXPECT().RecordActivityTaskHeartbeat(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...) case "RecordActivityTaskHeartbeatByID": mockService.EXPECT().RecordActivityTaskHeartbeatByID(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...) - case "RegisterDomain": - mockService.EXPECT().RegisterDomain(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...) + case "RegisterNamespace": + mockService.EXPECT().RegisterNamespace(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...) case "RequestCancelWorkflowExecution": mockService.EXPECT().RequestCancelWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...) case "RespondActivityTaskCanceled": @@ -167,8 +167,8 @@ func runTest( mockService.EXPECT().TerminateWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...) case "ResetWorkflowExecution": mockService.EXPECT().ResetWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...) - case "UpdateDomain": - mockService.EXPECT().UpdateDomain(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...) + case "UpdateNamespace": + mockService.EXPECT().UpdateNamespace(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...) case "QueryWorkflow": mockService.EXPECT().QueryWorkflow(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...) case "RespondQueryTaskCompleted": diff --git a/internal/internal_activity.go b/internal/internal_activity.go index f988d5751..19fc2921e 100644 --- a/internal/internal_activity.go +++ b/internal/internal_activity.go @@ -127,7 +127,7 @@ type ( attempt int32 // starts from 0. heartbeatDetails []byte workflowType *WorkflowType - workflowDomain string + workflowNamespace string workerStopChannel <-chan struct{} contextPropagators []ContextPropagator tracer opentracing.Tracer diff --git a/internal/internal_decision_state_machine.go b/internal/internal_decision_state_machine.go index 5e97ccf8d..b10d8614f 100644 --- a/internal/internal_decision_state_machine.go +++ b/internal/internal_decision_state_machine.go @@ -515,7 +515,7 @@ func (d *childWorkflowDecisionStateMachine) getDecision() *commonproto.Decision case decisionStateCanceledAfterStarted: decision := createNewDecision(enums.DecisionTypeRequestCancelExternalWorkflowExecution) decision.Attributes = &commonproto.Decision_RequestCancelExternalWorkflowExecutionDecisionAttributes{RequestCancelExternalWorkflowExecutionDecisionAttributes: &commonproto.RequestCancelExternalWorkflowExecutionDecisionAttributes{ - Domain: d.attributes.Domain, + Namespace: d.attributes.Namespace, WorkflowId: d.attributes.WorkflowId, ChildWorkflowOnly: true, }} @@ -841,7 +841,7 @@ func (h *decisionsHelper) handleStartChildWorkflowExecutionFailed(workflowID str return decision } -func (h *decisionsHelper) requestCancelExternalWorkflowExecution(domain, workflowID, runID string, cancellationID string, childWorkflowOnly bool) decisionStateMachine { +func (h *decisionsHelper) requestCancelExternalWorkflowExecution(namespace, workflowID, runID string, cancellationID string, childWorkflowOnly bool) decisionStateMachine { if childWorkflowOnly { // For cancellation of child workflow only, we do not use cancellation ID // since the child workflow cancellation go through the existing child workflow @@ -874,7 +874,7 @@ func (h *decisionsHelper) requestCancelExternalWorkflowExecution(domain, workflo panic("cancellation on external workflow should use cancellation ID") } attributes := &commonproto.RequestCancelExternalWorkflowExecutionDecisionAttributes{ - Domain: domain, + Namespace: namespace, WorkflowId: workflowID, RunId: runID, Control: []byte(cancellationID), @@ -928,9 +928,9 @@ func (h *decisionsHelper) handleRequestCancelExternalWorkflowExecutionFailed(ini return isExternal, decision } -func (h *decisionsHelper) signalExternalWorkflowExecution(domain, workflowID, runID, signalName string, input []byte, signalID string, childWorkflowOnly bool) decisionStateMachine { +func (h *decisionsHelper) signalExternalWorkflowExecution(namespace, workflowID, runID, signalName string, input []byte, signalID string, childWorkflowOnly bool) decisionStateMachine { attributes := &commonproto.SignalExternalWorkflowExecutionDecisionAttributes{ - Domain: domain, + Namespace: namespace, Execution: &commonproto.WorkflowExecution{ WorkflowId: workflowID, RunId: runID, diff --git a/internal/internal_decision_state_machine_test.go b/internal/internal_decision_state_machine_test.go index a4dc170a6..c25bd8896 100644 --- a/internal/internal_decision_state_machine_test.go +++ b/internal/internal_decision_state_machine_test.go @@ -339,7 +339,7 @@ func Test_ChildWorkflowStateMachine_Basic(t *testing.T) { func Test_ChildWorkflowStateMachine_CancelSucceed(t *testing.T) { t.Parallel() - domain := "test-domain" + namespace := "test-namespace" workflowID := "test-child-workflow" runID := "" cancellationID := "" @@ -359,7 +359,7 @@ func Test_ChildWorkflowStateMachine_CancelSucceed(t *testing.T) { h.handleChildWorkflowExecutionStarted(workflowID) // cancel child workflow - h.requestCancelExternalWorkflowExecution(domain, workflowID, runID, cancellationID, true) + h.requestCancelExternalWorkflowExecution(namespace, workflowID, runID, cancellationID, true) require.Equal(t, decisionStateCanceledAfterStarted, d.getState()) // send cancel request @@ -383,7 +383,7 @@ func Test_ChildWorkflowStateMachine_CancelSucceed(t *testing.T) { func Test_ChildWorkflowStateMachine_InvalidStates(t *testing.T) { t.Parallel() - domain := "test-domain" + namespace := "test-namespace" workflowID := "test-workflow-id" runID := "" attributes := &commonproto.StartChildWorkflowExecutionDecisionAttributes{ @@ -427,7 +427,7 @@ func Test_ChildWorkflowStateMachine_InvalidStates(t *testing.T) { require.NotNil(t, err) // cancel child workflow after child workflow is started - h.requestCancelExternalWorkflowExecution(domain, workflowID, runID, cancellationID, true) + h.requestCancelExternalWorkflowExecution(namespace, workflowID, runID, cancellationID, true) require.Equal(t, decisionStateCanceledAfterStarted, d.getState()) // send cancel request @@ -465,7 +465,7 @@ func Test_ChildWorkflowStateMachine_InvalidStates(t *testing.T) { func Test_ChildWorkflowStateMachine_CancelFailed(t *testing.T) { t.Parallel() - domain := "test-domain" + namespace := "test-namespace" workflowID := "test-workflow-id" runID := "" attributes := &commonproto.StartChildWorkflowExecutionDecisionAttributes{ @@ -484,7 +484,7 @@ func Test_ChildWorkflowStateMachine_CancelFailed(t *testing.T) { // child workflow started h.handleChildWorkflowExecutionStarted(workflowID) // cancel child workflow - h.requestCancelExternalWorkflowExecution(domain, workflowID, runID, cancellationID, true) + h.requestCancelExternalWorkflowExecution(namespace, workflowID, runID, cancellationID, true) // send cancel request h.getDecisions(true) // cancel request initiated @@ -530,7 +530,7 @@ func Test_UpsertSearchAttributesDecisionStateMachine(t *testing.T) { func Test_CancelExternalWorkflowStateMachine_Succeed(t *testing.T) { t.Parallel() - domain := "test-domain" + namespace := "test-namespace" workflowID := "test-workflow-id" runID := "test-run-id" cancellationID := "1" @@ -538,7 +538,7 @@ func Test_CancelExternalWorkflowStateMachine_Succeed(t *testing.T) { h := newDecisionsHelper() // request cancel external workflow - decision := h.requestCancelExternalWorkflowExecution(domain, workflowID, runID, cancellationID, false) + decision := h.requestCancelExternalWorkflowExecution(namespace, workflowID, runID, cancellationID, false) require.False(t, decision.isDone()) d := h.getDecision(makeDecisionID(decisionTypeCancellation, cancellationID)) require.Equal(t, decisionStateCreated, d.getState()) @@ -550,7 +550,7 @@ func Test_CancelExternalWorkflowStateMachine_Succeed(t *testing.T) { require.Equal( t, &commonproto.RequestCancelExternalWorkflowExecutionDecisionAttributes{ - Domain: domain, + Namespace: namespace, WorkflowId: workflowID, RunId: runID, Control: []byte(cancellationID), @@ -576,7 +576,7 @@ func Test_CancelExternalWorkflowStateMachine_Succeed(t *testing.T) { func Test_CancelExternalWorkflowStateMachine_Failed(t *testing.T) { t.Parallel() - domain := "test-domain" + namespace := "test-namespace" workflowID := "test-workflow-id" runID := "test-run-id" cancellationID := "2" @@ -584,7 +584,7 @@ func Test_CancelExternalWorkflowStateMachine_Failed(t *testing.T) { h := newDecisionsHelper() // request cancel external workflow - decision := h.requestCancelExternalWorkflowExecution(domain, workflowID, runID, cancellationID, false) + decision := h.requestCancelExternalWorkflowExecution(namespace, workflowID, runID, cancellationID, false) require.False(t, decision.isDone()) d := h.getDecision(makeDecisionID(decisionTypeCancellation, cancellationID)) require.Equal(t, decisionStateCreated, d.getState()) @@ -596,7 +596,7 @@ func Test_CancelExternalWorkflowStateMachine_Failed(t *testing.T) { require.Equal( t, &commonproto.RequestCancelExternalWorkflowExecutionDecisionAttributes{ - Domain: domain, + Namespace: namespace, WorkflowId: workflowID, RunId: runID, Control: []byte(cancellationID), diff --git a/internal/internal_event_handlers.go b/internal/internal_event_handlers.go index e55e5ad50..f807c8508 100644 --- a/internal/internal_event_handlers.go +++ b/internal/internal_event_handlers.go @@ -269,23 +269,23 @@ func (wc *workflowEnvironmentImpl) Complete(result []byte, err error) { wc.completeHandler(result, err) } -func (wc *workflowEnvironmentImpl) RequestCancelChildWorkflow(domainName string, workflowID string) { +func (wc *workflowEnvironmentImpl) RequestCancelChildWorkflow(namespace string, workflowID string) { // For cancellation of child workflow only, we do not use cancellation ID and run ID - wc.decisionsHelper.requestCancelExternalWorkflowExecution(domainName, workflowID, "", "", true) + wc.decisionsHelper.requestCancelExternalWorkflowExecution(namespace, workflowID, "", "", true) } -func (wc *workflowEnvironmentImpl) RequestCancelExternalWorkflow(domainName, workflowID, runID string, callback resultHandler) { +func (wc *workflowEnvironmentImpl) RequestCancelExternalWorkflow(namespace, workflowID, runID string, callback resultHandler) { // for cancellation of external workflow, we have to use cancellation ID and set isChildWorkflowOnly to false cancellationID := wc.GenerateSequenceID() - decision := wc.decisionsHelper.requestCancelExternalWorkflowExecution(domainName, workflowID, runID, cancellationID, false) + decision := wc.decisionsHelper.requestCancelExternalWorkflowExecution(namespace, workflowID, runID, cancellationID, false) decision.setData(&scheduledCancellation{callback: callback}) } -func (wc *workflowEnvironmentImpl) SignalExternalWorkflow(domainName, workflowID, runID, signalName string, +func (wc *workflowEnvironmentImpl) SignalExternalWorkflow(namespace, workflowID, runID, signalName string, input []byte, _ /* THIS IS FOR TEST FRAMEWORK. DO NOT USE HERE. */ interface{}, childWorkflowOnly bool, callback resultHandler) { signalID := wc.GenerateSequenceID() - decision := wc.decisionsHelper.signalExternalWorkflowExecution(domainName, workflowID, runID, signalName, input, signalID, childWorkflowOnly) + decision := wc.decisionsHelper.signalExternalWorkflowExecution(namespace, workflowID, runID, signalName, input, signalID, childWorkflowOnly) decision.setData(&scheduledSignal{callback: callback}) } @@ -361,7 +361,7 @@ func (wc *workflowEnvironmentImpl) ExecuteChildWorkflow( attributes := &commonproto.StartChildWorkflowExecutionDecisionAttributes{} - attributes.Domain = params.domain + attributes.Namespace = params.namespace attributes.TaskList = &commonproto.TaskList{Name: params.taskListName} attributes.WorkflowId = params.workflowID attributes.ExecutionStartToCloseTimeoutSeconds = params.executionStartToCloseTimeoutSeconds diff --git a/internal/internal_logging_tags.go b/internal/internal_logging_tags.go index 9bf3f3cbb..b9bf06cdf 100644 --- a/internal/internal_logging_tags.go +++ b/internal/internal_logging_tags.go @@ -23,7 +23,7 @@ package internal const ( tagActivityID = "ActivityID" tagActivityType = "ActivityType" - tagDomain = "Domain" + tagNamespace = "Namespace" tagEventID = "EventID" tagEventType = "EventType" tagRunID = "RunID" diff --git a/internal/internal_retry.go b/internal/internal_retry.go index 3389b85a8..8f563840b 100644 --- a/internal/internal_retry.go +++ b/internal/internal_retry.go @@ -68,9 +68,9 @@ func isServiceTransientError(err error) bool { case *serviceerror.InvalidArgument, *serviceerror.NotFound, *serviceerror.WorkflowExecutionAlreadyStarted, - *serviceerror.DomainAlreadyExists, + *serviceerror.NamespaceAlreadyExists, *serviceerror.QueryFailed, - *serviceerror.DomainNotActive, + *serviceerror.NamespaceNotActive, *serviceerror.CancellationAlreadyRequested: return false } diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index 2ffc91df7..2d5e0b6fa 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -113,7 +113,7 @@ type ( // workflowTaskHandlerImpl is the implementation of WorkflowTaskHandler workflowTaskHandlerImpl struct { - domain string + namespace string metricsScope *metrics.TaggedScope ppMgr pressurePointMgr logger *zap.Logger @@ -370,7 +370,7 @@ func isPreloadMarkerEvent(event *commonproto.HistoryEvent) bool { func newWorkflowTaskHandler(params workerExecutionParameters, ppMgr pressurePointMgr, registry *registry) WorkflowTaskHandler { ensureRequiredParams(¶ms) return &workflowTaskHandlerImpl{ - domain: params.DomainName, + namespace: params.Namespace, logger: params.Logger, ppMgr: ppMgr, metricsScope: metrics.NewTaggedScope(params.MetricsScope), @@ -526,7 +526,7 @@ func (w *workflowExecutionContextImpl) IsDestroyed() bool { func (w *workflowExecutionContextImpl) queueResetStickinessTask() { var task resetStickinessTask task.task = &workflowservice.ResetStickyTaskListRequest{ - Domain: w.workflowInfo.Domain, + Namespace: w.workflowInfo.Namespace, Execution: &commonproto.WorkflowExecution{ WorkflowId: w.workflowInfo.WorkflowExecution.ID, RunId: w.workflowInfo.WorkflowExecution.RunID, @@ -613,12 +613,12 @@ func (wth *workflowTaskHandlerImpl) createWorkflowContext(task *workflowservice. TaskListName: taskList.GetName(), ExecutionStartToCloseTimeoutSeconds: attributes.GetExecutionStartToCloseTimeoutSeconds(), TaskStartToCloseTimeoutSeconds: attributes.GetTaskStartToCloseTimeoutSeconds(), - Domain: wth.domain, + Namespace: wth.namespace, Attempt: attributes.GetAttempt(), lastCompletionResult: attributes.LastCompletionResult, CronSchedule: attributes.CronSchedule, ContinuedExecutionRunID: attributes.ContinuedExecutionRunId, - ParentWorkflowDomain: attributes.ParentWorkflowDomain, + ParentWorkflowNamespace: attributes.ParentWorkflowNamespace, ParentWorkflowExecution: parentWorkflowExecution, Memo: attributes.Memo, SearchAttributes: attributes.SearchAttributes, @@ -1338,7 +1338,7 @@ func isDecisionMatchEvent(d *commonproto.Decision, e *commonproto.HistoryEvent, } eventAttributes := e.GetRequestCancelExternalWorkflowExecutionInitiatedEventAttributes() decisionAttributes := d.GetRequestCancelExternalWorkflowExecutionDecisionAttributes() - if checkDomainsInDecisionAndEvent(eventAttributes.GetDomain(), decisionAttributes.GetDomain()) || + if checkNamespacesInDecisionAndEvent(eventAttributes.GetNamespace(), decisionAttributes.GetNamespace()) || eventAttributes.WorkflowExecution.GetWorkflowId() != decisionAttributes.GetWorkflowId() { return false } @@ -1351,7 +1351,7 @@ func isDecisionMatchEvent(d *commonproto.Decision, e *commonproto.HistoryEvent, } eventAttributes := e.GetSignalExternalWorkflowExecutionInitiatedEventAttributes() decisionAttributes := d.GetSignalExternalWorkflowExecutionDecisionAttributes() - if checkDomainsInDecisionAndEvent(eventAttributes.GetDomain(), decisionAttributes.GetDomain()) || + if checkNamespacesInDecisionAndEvent(eventAttributes.GetNamespace(), decisionAttributes.GetNamespace()) || eventAttributes.GetSignalName() != decisionAttributes.GetSignalName() || eventAttributes.WorkflowExecution.GetWorkflowId() != decisionAttributes.Execution.GetWorkflowId() { return false @@ -1386,7 +1386,7 @@ func isDecisionMatchEvent(d *commonproto.Decision, e *commonproto.HistoryEvent, eventAttributes := e.GetStartChildWorkflowExecutionInitiatedEventAttributes() decisionAttributes := d.GetStartChildWorkflowExecutionDecisionAttributes() if lastPartOfName(eventAttributes.WorkflowType.GetName()) != lastPartOfName(decisionAttributes.WorkflowType.GetName()) || - (strictMode && checkDomainsInDecisionAndEvent(eventAttributes.GetDomain(), decisionAttributes.GetDomain())) || + (strictMode && checkNamespacesInDecisionAndEvent(eventAttributes.GetNamespace(), decisionAttributes.GetNamespace())) || (strictMode && eventAttributes.TaskList.GetName() != decisionAttributes.TaskList.GetName()) { return false } @@ -1416,14 +1416,14 @@ func isSearchAttributesMatched(attrFromEvent, attrFromDecision *commonproto.Sear } // return true if the check fails: -// domain is not empty in decision -// and domain is not replayDomain -// and domains unmatch in decision and events -func checkDomainsInDecisionAndEvent(eventDomainName, decisionDomainName string) bool { - if decisionDomainName == "" || IsReplayDomain(decisionDomainName) { +// namespace is not empty in decision +// and namespace is not replayNamespace +// and namespaces unmatch in decision and events +func checkNamespacesInDecisionAndEvent(eventNamespace, decisionNamespace string) bool { + if decisionNamespace == "" || IsReplayNamespace(decisionNamespace) { return false } - return eventDomainName != decisionDomainName + return eventNamespace != decisionNamespace } func (wth *workflowTaskHandlerImpl) completeWorkflow( @@ -1691,7 +1691,7 @@ func (i *temporalInvoker) internalHeartBeat(details []byte) (bool, error) { i.cancelHandler() isActivityCancelled = true - case *serviceerror.NotFound, *serviceerror.DomainNotActive: + case *serviceerror.NotFound, *serviceerror.NamespaceNotActive: // We will pass these through as cancellation for now but something we can change // later when we have setter on cancel handler. i.cancelHandler() @@ -1716,7 +1716,7 @@ func (i *temporalInvoker) Close(flushBufferedHeartbeat bool) { } } -func (i *temporalInvoker) GetClient(domain string, options ClientOptions) Client { +func (i *temporalInvoker) GetClient(namespace string, options ClientOptions) Client { return NewServiceClient(i.service, nil, options) } @@ -1877,11 +1877,11 @@ func recordActivityHeartbeatByID( ctx context.Context, service workflowservice.WorkflowServiceClient, identity string, - domain, workflowID, runID, activityID string, + namespace, workflowID, runID, activityID string, details []byte, ) error { request := &workflowservice.RecordActivityTaskHeartbeatByIDRequest{ - Domain: domain, + Namespace: namespace, WorkflowID: workflowID, RunID: runID, ActivityID: activityID, diff --git a/internal/internal_task_handlers_test.go b/internal/internal_task_handlers_test.go index 5bb4153ea..98a696e66 100644 --- a/internal/internal_task_handlers_test.go +++ b/internal/internal_task_handlers_test.go @@ -44,7 +44,7 @@ import ( ) const ( - testDomain = "test-domain" + testNamespace = "test-namespace" ) type ( @@ -350,10 +350,10 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_BinaryChecksum() { } task := createWorkflowTask(testEvents, 8, "BinaryChecksumWorkflow") params := workerExecutionParameters{ - DomainName: testDomain, - TaskList: taskList, - Identity: "test-id-1", - Logger: t.logger, + Namespace: testNamespace, + TaskList: taskList, + Identity: "test-id-1", + Logger: t.logger, } taskHandler := newWorkflowTaskHandler(params, nil, t.registry) request, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil) @@ -391,10 +391,10 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_ActivityTaskScheduled() { } task := createWorkflowTask(testEvents[0:3], 0, "HelloWorld_Workflow") params := workerExecutionParameters{ - DomainName: testDomain, - TaskList: taskList, - Identity: "test-id-1", - Logger: t.logger, + Namespace: testNamespace, + TaskList: taskList, + Identity: "test-id-1", + Logger: t.logger, } taskHandler := newWorkflowTaskHandler(params, nil, t.registry) request, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil) @@ -438,10 +438,10 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_QueryWorkflow_Sticky() { createTestEventActivityTaskCompleted(7, &commonproto.ActivityTaskCompletedEventAttributes{ScheduledEventId: 5}), } params := workerExecutionParameters{ - DomainName: testDomain, - TaskList: taskList, - Identity: "test-id-1", - Logger: t.logger, + Namespace: testNamespace, + TaskList: taskList, + Identity: "test-id-1", + Logger: t.logger, } taskHandler := newWorkflowTaskHandler(params, nil, t.registry) @@ -484,10 +484,10 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_QueryWorkflow_NonSticky() { createTestEventWorkflowExecutionSignaled(9, "test-signal"), } params := workerExecutionParameters{ - DomainName: testDomain, - TaskList: taskList, - Identity: "test-id-1", - Logger: t.logger, + Namespace: testNamespace, + TaskList: taskList, + Identity: "test-id-1", + Logger: t.logger, } // query after first decision task (notice the previousStartEventID is always the last eventID for query task) @@ -551,7 +551,7 @@ func (t *TaskHandlersTestSuite) TestCacheEvictionWhenErrorOccurs() { }), } params := workerExecutionParameters{ - DomainName: testDomain, + Namespace: testNamespace, TaskList: taskList, Identity: "test-id-1", Logger: zap.NewNop(), @@ -586,7 +586,7 @@ func (t *TaskHandlersTestSuite) TestWithMissingHistoryEvents() { createTestEventDecisionTaskStarted(7), } params := workerExecutionParameters{ - DomainName: testDomain, + Namespace: testNamespace, TaskList: taskList, Identity: "test-id-1", Logger: zap.NewNop(), @@ -627,7 +627,7 @@ func (t *TaskHandlersTestSuite) TestWithTruncatedHistory() { }), } params := workerExecutionParameters{ - DomainName: testDomain, + Namespace: testNamespace, TaskList: taskList, Identity: "test-id-1", Logger: zap.NewNop(), @@ -704,7 +704,7 @@ func (t *TaskHandlersTestSuite) testSideEffectDeferHelper(disableSticky bool) { } params := workerExecutionParameters{ - DomainName: testDomain, + Namespace: testNamespace, TaskList: taskList, Identity: "test-id-1", Logger: zap.NewNop(), @@ -747,7 +747,7 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_NondeterministicDetection() { task := createWorkflowTask(testEvents, 3, "HelloWorld_Workflow") stopC := make(chan struct{}) params := workerExecutionParameters{ - DomainName: testDomain, + Namespace: testNamespace, TaskList: taskList, Identity: "test-id-1", Logger: zap.NewNop(), @@ -810,7 +810,7 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_WorkflowReturnsPanicError() { } task := createWorkflowTask(testEvents, 3, "ReturnPanicWorkflow") params := workerExecutionParameters{ - DomainName: testDomain, + Namespace: testNamespace, TaskList: taskList, Identity: "test-id-1", Logger: zap.NewNop(), @@ -839,7 +839,7 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_WorkflowPanics() { } task := createWorkflowTask(testEvents, 3, "PanicWorkflow") params := workerExecutionParameters{ - DomainName: testDomain, + Namespace: testNamespace, TaskList: taskList, Identity: "test-id-1", Logger: zap.NewNop(), @@ -866,7 +866,7 @@ func (t *TaskHandlersTestSuite) TestGetWorkflowInfo() { WorkflowId: parentID, RunId: parentRunID, } - parentDomain := "parentDomain" + parentNamespace := "parentNamespace" var attempt int32 = 123 var executionTimeout int32 = 213456 var taskTimeout int32 = 21 @@ -879,7 +879,7 @@ func (t *TaskHandlersTestSuite) TestGetWorkflowInfo() { ParentWorkflowExecution: parentExecution, CronSchedule: cronSchedule, ContinuedExecutionRunId: continuedRunID, - ParentWorkflowDomain: parentDomain, + ParentWorkflowNamespace: parentNamespace, Attempt: attempt, ExecutionStartToCloseTimeoutSeconds: executionTimeout, TaskStartToCloseTimeoutSeconds: taskTimeout, @@ -892,7 +892,7 @@ func (t *TaskHandlersTestSuite) TestGetWorkflowInfo() { } task := createWorkflowTask(testEvents, 3, workflowType) params := workerExecutionParameters{ - DomainName: testDomain, + Namespace: testNamespace, TaskList: taskList, Identity: "test-id-1", Logger: zap.NewNop(), @@ -914,18 +914,18 @@ func (t *TaskHandlersTestSuite) TestGetWorkflowInfo() { t.EqualValues(parentRunID, result.ParentWorkflowExecution.RunID) t.EqualValues(cronSchedule, result.CronSchedule) t.EqualValues(continuedRunID, result.ContinuedExecutionRunID) - t.EqualValues(parentDomain, result.ParentWorkflowDomain) + t.EqualValues(parentNamespace, result.ParentWorkflowNamespace) t.EqualValues(attempt, result.Attempt) t.EqualValues(executionTimeout, result.ExecutionStartToCloseTimeoutSeconds) t.EqualValues(taskTimeout, result.TaskStartToCloseTimeoutSeconds) t.EqualValues(workflowType, result.WorkflowType.Name) - t.EqualValues(testDomain, result.Domain) + t.EqualValues(testNamespace, result.Namespace) } func (t *TaskHandlersTestSuite) TestConsistentQuery_InvalidQueryTask() { taskList := "taskList" params := workerExecutionParameters{ - DomainName: testDomain, + Namespace: testNamespace, TaskList: taskList, Identity: "test-id-1", Logger: zap.NewNop(), @@ -976,10 +976,10 @@ func (t *TaskHandlersTestSuite) TestConsistentQuery_Success() { task := createWorkflowTaskWithQueries(testEvents[0:3], 0, "QuerySignalWorkflow", queries) params := workerExecutionParameters{ - DomainName: testDomain, - TaskList: taskList, - Identity: "test-id-1", - Logger: t.logger, + Namespace: testNamespace, + TaskList: taskList, + Identity: "test-id-1", + Logger: t.logger, } taskHandler := newWorkflowTaskHandler(params, nil, t.registry) @@ -1042,10 +1042,10 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_CancelActivityBeforeSent() { task := createWorkflowTask(testEvents, 0, "HelloWorld_WorkflowCancel") params := workerExecutionParameters{ - DomainName: testDomain, - TaskList: taskList, - Identity: "test-id-1", - Logger: t.logger, + Namespace: testNamespace, + TaskList: taskList, + Identity: "test-id-1", + Logger: t.logger, } taskHandler := newWorkflowTaskHandler(params, nil, t.registry) request, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil) @@ -1068,10 +1068,10 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_PageToken() { task.NextPageToken = []byte("token") params := workerExecutionParameters{ - DomainName: testDomain, - TaskList: taskList, - Identity: "test-id-1", - Logger: t.logger, + Namespace: testNamespace, + TaskList: taskList, + Identity: "test-id-1", + Logger: t.logger, } nextEvents := []*commonproto.HistoryEvent{ @@ -1133,7 +1133,7 @@ func (t *TaskHandlersTestSuite) TestLocalActivityRetry_DecisionHeartbeatFail() { task := createWorkflowTask(testEvents, 0, "RetryLocalActivityWorkflow") stopCh := make(chan struct{}) params := workerExecutionParameters{ - DomainName: testDomain, + Namespace: testNamespace, TaskList: testWorkflowTaskTasklist, Identity: "test-id-1", Logger: t.logger, @@ -1219,11 +1219,11 @@ func (t *TaskHandlersTestSuite) TestHeartBeat_NilResponseWithError() { t.IsType(&serviceerror.NotFound{}, heartbeatErr, "heartbeatErr must be of type NotFound.") } -func (t *TaskHandlersTestSuite) TestHeartBeat_NilResponseWithDomainNotActiveError() { +func (t *TaskHandlersTestSuite) TestHeartBeat_NilResponseWithNamespaceNotActiveError() { mockCtrl := gomock.NewController(t.T()) mockService := workflowservicemock.NewMockWorkflowServiceClient(mockCtrl) - mockService.EXPECT().RecordActivityTaskHeartbeat(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, serviceerror.NewDomainNotActive("fake_domain", "current_cluster", "active_cluster")) + mockService.EXPECT().RecordActivityTaskHeartbeat(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, serviceerror.NewNamespaceNotActive("fake_namespace", "current_cluster", "active_cluster")) called := false cancelHandler := func() { called = true } @@ -1238,7 +1238,7 @@ func (t *TaskHandlersTestSuite) TestHeartBeat_NilResponseWithDomainNotActiveErro heartbeatErr := temporalInvoker.Heartbeat(nil) t.NotNil(heartbeatErr) - t.IsType(&serviceerror.DomainNotActive{}, heartbeatErr, "heartbeatErr must be of type DomainNotActive.") + t.IsType(&serviceerror.NamespaceNotActive{}, heartbeatErr, "heartbeatErr must be of type NamespaceNotActive.") t.True(called) } @@ -1317,7 +1317,7 @@ func (t *TaskHandlersTestSuite) TestActivityExecutionDeadline() { WorkflowType: &commonproto.WorkflowType{ Name: "wType", }, - WorkflowDomain: "domain", + WorkflowNamespace: "namespace", } td := fmt.Sprintf("testIndex: %v, testDetails: %v", i, d) r, err := activityHandler.Execute(tasklist, pats) @@ -1372,7 +1372,7 @@ func (t *TaskHandlersTestSuite) TestActivityExecutionWorkerStop() { WorkflowType: &commonproto.WorkflowType{ Name: "wType", }, - WorkflowDomain: "domain", + WorkflowNamespace: "namespace", } close(workerStopCh) r, err := activityHandler.Execute(tasklist, pats) diff --git a/internal/internal_task_pollers.go b/internal/internal_task_pollers.go index 7a991e882..e5b9c349a 100644 --- a/internal/internal_task_pollers.go +++ b/internal/internal_task_pollers.go @@ -68,7 +68,7 @@ type ( // workflowTaskPoller implements polling/processing a workflow task workflowTaskPoller struct { basePoller - domain string + namespace string taskListName string identity string service workflowservice.WorkflowServiceClient @@ -89,7 +89,7 @@ type ( // activityTaskPoller implements polling/processing a workflow task activityTaskPoller struct { basePoller - domain string + namespace string taskListName string identity string service workflowservice.WorkflowServiceClient @@ -103,7 +103,7 @@ type ( iteratorFunc func(nextPageToken []byte) (*commonproto.History, []byte, error) execution *commonproto.WorkflowExecution nextPageToken []byte - domain string + namespace string service workflowservice.WorkflowServiceClient metricsScope tally.Scope maxEventID int64 @@ -214,7 +214,7 @@ func newWorkflowTaskPoller(taskHandler WorkflowTaskHandler, service workflowserv return &workflowTaskPoller{ basePoller: basePoller{shutdownC: params.WorkerStopChannel}, service: service, - domain: params.DomainName, + namespace: params.Namespace, taskListName: params.TaskList, identity: params.Identity, taskHandler: taskHandler, @@ -464,7 +464,7 @@ func (lath *localActivityTaskHandler) executeLocalActivityTask(task *localActivi ctx := context.WithValue(rootCtx, activityEnvContextKey, &activityEnvironment{ workflowType: &workflowTypeLocal, - workflowDomain: task.params.WorkflowInfo.Domain, + workflowNamespace: task.params.WorkflowInfo.Namespace, taskList: task.params.WorkflowInfo.TaskListName, activityType: ActivityType{Name: activityType}, activityID: fmt.Sprintf("%v", task.activityID), @@ -617,7 +617,7 @@ func (wtp *workflowTaskPoller) getNextPollRequest() (request *workflowservice.Po Kind: taskListKind, } return &workflowservice.PollForDecisionTaskRequest{ - Domain: wtp.domain, + Namespace: wtp.namespace, TaskList: taskList, Identity: wtp.identity, BinaryChecksum: getBinaryChecksum(), @@ -680,7 +680,7 @@ func (wtp *workflowTaskPoller) toWorkflowTask(response *workflowservice.PollForD historyIterator := &historyIteratorImpl{ nextPageToken: response.NextPageToken, execution: response.WorkflowExecution, - domain: wtp.domain, + namespace: wtp.namespace, service: wtp.service, metricsScope: wtp.metricsScope, maxEventID: response.GetStartedEventId(), @@ -697,7 +697,7 @@ func (h *historyIteratorImpl) GetNextPage() (*commonproto.History, error) { h.iteratorFunc = newGetHistoryPageFunc( context.Background(), h.service, - h.domain, + h.namespace, h.execution, h.maxEventID, h.metricsScope) @@ -722,7 +722,7 @@ func (h *historyIteratorImpl) HasNextPage() bool { func newGetHistoryPageFunc( ctx context.Context, service workflowservice.WorkflowServiceClient, - domain string, + namespace string, execution *commonproto.WorkflowExecution, atDecisionTaskCompletedEventID int64, metricsScope tally.Scope, @@ -738,7 +738,7 @@ func newGetHistoryPageFunc( var err1 error resp, err1 = service.GetWorkflowExecutionHistory(tchCtx, &workflowservice.GetWorkflowExecutionHistoryRequest{ - Domain: domain, + Namespace: namespace, Execution: execution, NextPageToken: nextPageToken, }) @@ -772,7 +772,7 @@ func newActivityTaskPoller(taskHandler ActivityTaskHandler, service workflowserv basePoller: basePoller{shutdownC: params.WorkerStopChannel}, taskHandler: taskHandler, service: service, - domain: params.DomainName, + namespace: params.Namespace, taskListName: params.TaskList, identity: params.Identity, logger: params.Logger, @@ -791,7 +791,7 @@ func (atp *activityTaskPoller) poll(ctx context.Context) (interface{}, error) { atp.logger.Debug("activityTaskPoller::Poll") }) request := &workflowservice.PollForActivityTaskRequest{ - Domain: atp.domain, + Namespace: atp.namespace, TaskList: &commonproto.TaskList{Name: atp.taskListName}, Identity: atp.identity, TaskListMetadata: &commonproto.TaskListMetadata{MaxTasksPerSecond: &types.DoubleValue{Value: atp.activitiesPerSecond}}, @@ -1012,7 +1012,7 @@ func convertActivityResultToRespondRequest(identity string, taskToken, result [] Identity: identity} } -func convertActivityResultToRespondRequestByID(identity, domain, workflowID, runID, activityID string, +func convertActivityResultToRespondRequestByID(identity, namespace, workflowID, runID, activityID string, result []byte, err error, dataConverter DataConverter) interface{} { if err == ErrActivityResultPending { // activity result is pending and will be completed asynchronously. @@ -1022,7 +1022,7 @@ func convertActivityResultToRespondRequestByID(identity, domain, workflowID, run if err == nil { return &workflowservice.RespondActivityTaskCompletedByIDRequest{ - Domain: domain, + Namespace: namespace, WorkflowID: workflowID, RunID: runID, ActivityID: activityID, @@ -1033,7 +1033,7 @@ func convertActivityResultToRespondRequestByID(identity, domain, workflowID, run reason, details := getErrorDetails(err, dataConverter) if _, ok := err.(*CanceledError); ok || err == context.Canceled { return &workflowservice.RespondActivityTaskCanceledByIDRequest{ - Domain: domain, + Namespace: namespace, WorkflowID: workflowID, RunID: runID, ActivityID: activityID, @@ -1042,7 +1042,7 @@ func convertActivityResultToRespondRequestByID(identity, domain, workflowID, run } return &workflowservice.RespondActivityTaskFailedByIDRequest{ - Domain: domain, + Namespace: namespace, WorkflowID: workflowID, RunID: runID, ActivityID: activityID, diff --git a/internal/internal_worker.go b/internal/internal_worker.go index 6fc11b88d..c3e20e521 100644 --- a/internal/internal_worker.go +++ b/internal/internal_worker.go @@ -120,8 +120,8 @@ type ( // workerExecutionParameters defines worker configure/execution options. workerExecutionParameters struct { - // Domain name. - DomainName string + // Namespace name. + Namespace string // Task list name to poll. TaskList string @@ -225,36 +225,36 @@ func ensureRequiredParams(params *workerExecutionParameters) { } } -// verifyDomainExist does a DescribeDomain operation on the specified domain with backoff/retry +// verifyNamespaceExist does a DescribeNamespace operation on the specified namespace with backoff/retry // It returns an error, if the server returns an EntityNotExist or BadRequest error // On any other transient error, this method will just return success -func verifyDomainExist(client workflowservice.WorkflowServiceClient, domain string, logger *zap.Logger) error { +func verifyNamespaceExist(client workflowservice.WorkflowServiceClient, namespace string, logger *zap.Logger) error { ctx := context.Background() - descDomainOp := func() error { + descNamespaceOp := func() error { tchCtx, cancel := newChannelContext(ctx) defer cancel() - _, err := client.DescribeDomain(tchCtx, &workflowservice.DescribeDomainRequest{Name: domain}) + _, err := client.DescribeNamespace(tchCtx, &workflowservice.DescribeNamespaceRequest{Name: namespace}) if err != nil { switch err.(type) { case *serviceerror.NotFound: - logger.Error("domain does not exist", zap.String("domain", domain), zap.Error(err)) + logger.Error("namespace does not exist", zap.String("namespace", namespace), zap.Error(err)) return err case *serviceerror.InvalidArgument: - logger.Error("domain does not exist", zap.String("domain", domain), zap.Error(err)) + logger.Error("namespace does not exist", zap.String("namespace", namespace), zap.Error(err)) return err } // on any other error, just return true - logger.Warn("unable to verify if domain exist", zap.String("domain", domain), zap.Error(err)) + logger.Warn("unable to verify if namespace exist", zap.String("namespace", namespace), zap.Error(err)) } return nil } - if len(domain) == 0 { - return errors.New("domain cannot be empty") + if len(namespace) == 0 { + return errors.New("namespace cannot be empty") } // exponential backoff retry for upto a minute - return backoff.Retry(ctx, descDomainOp, createDynamicServiceRetryPolicy(ctx), isServiceTransientError) + return backoff.Retry(ctx, descNamespaceOp, createDynamicServiceRetryPolicy(ctx), isServiceTransientError) } func newWorkflowWorkerInternal(service workflowservice.WorkflowServiceClient, params workerExecutionParameters, ppMgr pressurePointMgr, overrides *workerOverrides, registry *registry) *workflowWorker { @@ -327,7 +327,7 @@ func newWorkflowTaskWorkerInternal(taskHandler WorkflowTaskHandler, service work // Start the worker. func (ww *workflowWorker) Start() error { - err := verifyDomainExist(ww.workflowService, ww.executionParameters.DomainName, ww.worker.logger) + err := verifyNamespaceExist(ww.workflowService, ww.executionParameters.Namespace, ww.worker.logger) if err != nil { return err } @@ -337,7 +337,7 @@ func (ww *workflowWorker) Start() error { } func (ww *workflowWorker) Run() error { - err := verifyDomainExist(ww.workflowService, ww.executionParameters.DomainName, ww.worker.logger) + err := verifyNamespaceExist(ww.workflowService, ww.executionParameters.Namespace, ww.worker.logger) if err != nil { return err } @@ -454,7 +454,7 @@ func newActivityTaskWorker(taskHandler ActivityTaskHandler, service workflowserv // Start the worker. func (aw *activityWorker) Start() error { - err := verifyDomainExist(aw.workflowService, aw.executionParameters.DomainName, aw.worker.logger) + err := verifyNamespaceExist(aw.workflowService, aw.executionParameters.Namespace, aw.worker.logger) if err != nil { return err } @@ -464,7 +464,7 @@ func (aw *activityWorker) Start() error { // Run the worker. func (aw *activityWorker) Run() error { - err := verifyDomainExist(aw.workflowService, aw.executionParameters.DomainName, aw.worker.logger) + err := verifyNamespaceExist(aw.workflowService, aw.executionParameters.Namespace, aw.worker.logger) if err != nil { return err } @@ -1146,7 +1146,7 @@ func (aw *WorkflowReplayer) ReplayWorkflowHistory(logger *zap.Logger, history *c controller := gomock.NewController(testReporter) service := workflowservicemock.NewMockWorkflowServiceClient(controller) - return aw.replayWorkflowHistory(logger, service, ReplayDomainName, history) + return aw.replayWorkflowHistory(logger, service, ReplayNamespace, history) } // ReplayWorkflowHistoryFromJSONFile executes a single decision task for the given json history file. @@ -1175,17 +1175,17 @@ func (aw *WorkflowReplayer) ReplayPartialWorkflowHistoryFromJSONFile(logger *zap controller := gomock.NewController(testReporter) service := workflowservicemock.NewMockWorkflowServiceClient(controller) - return aw.replayWorkflowHistory(logger, service, ReplayDomainName, history) + return aw.replayWorkflowHistory(logger, service, ReplayNamespace, history) } // ReplayWorkflowExecution replays workflow execution loading it from Temporal service. -func (aw *WorkflowReplayer) ReplayWorkflowExecution(ctx context.Context, service workflowservice.WorkflowServiceClient, logger *zap.Logger, domain string, execution WorkflowExecution) error { +func (aw *WorkflowReplayer) ReplayWorkflowExecution(ctx context.Context, service workflowservice.WorkflowServiceClient, logger *zap.Logger, namespace string, execution WorkflowExecution) error { sharedExecution := &commonproto.WorkflowExecution{ RunId: execution.RunID, WorkflowId: execution.ID, } request := &workflowservice.GetWorkflowExecutionHistoryRequest{ - Domain: domain, + Namespace: namespace, Execution: sharedExecution, } hResponse, err := service.GetWorkflowExecutionHistory(ctx, request) @@ -1193,10 +1193,10 @@ func (aw *WorkflowReplayer) ReplayWorkflowExecution(ctx context.Context, service return err } - return aw.replayWorkflowHistory(logger, service, domain, hResponse.History) + return aw.replayWorkflowHistory(logger, service, namespace, hResponse.History) } -func (aw *WorkflowReplayer) replayWorkflowHistory(logger *zap.Logger, service workflowservice.WorkflowServiceClient, domain string, history *commonproto.History) error { +func (aw *WorkflowReplayer) replayWorkflowHistory(logger *zap.Logger, service workflowservice.WorkflowServiceClient, namespace string, history *commonproto.History) error { taskList := "ReplayTaskList" events := history.Events if events == nil { @@ -1240,16 +1240,16 @@ func (aw *WorkflowReplayer) replayWorkflowHistory(logger *zap.Logger, service wo iterator := &historyIteratorImpl{ nextPageToken: task.NextPageToken, execution: task.WorkflowExecution, - domain: ReplayDomainName, + namespace: ReplayNamespace, service: service, metricsScope: metricScope, maxEventID: task.GetStartedEventId(), } params := workerExecutionParameters{ - DomainName: domain, - TaskList: taskList, - Identity: "replayID", - Logger: logger, + Namespace: namespace, + TaskList: taskList, + Identity: "replayID", + Logger: logger, } taskHandler := newWorkflowTaskHandler(params, nil, aw.registry) resp, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task, historyIterator: iterator}, nil) @@ -1331,7 +1331,7 @@ func NewAggregatedWorker(client *WorkflowClient, taskList string, options Worker backgroundActivityContext, backgroundActivityContextCancel := context.WithCancel(ctx) workerParams := workerExecutionParameters{ - DomainName: client.domain, + Namespace: client.namespace, TaskList: taskList, ConcurrentActivityExecutionSize: options.MaxConcurrentActivityExecutionSize, WorkerActivitiesPerSecond: options.WorkerActivitiesPerSecond, @@ -1359,7 +1359,7 @@ func NewAggregatedWorker(client *WorkflowClient, taskList string, options Worker ensureRequiredParams(&workerParams) workerParams.Logger = workerParams.Logger.With( - zapcore.Field{Key: tagDomain, Type: zapcore.StringType, String: client.domain}, + zapcore.Field{Key: tagNamespace, Type: zapcore.StringType, String: client.namespace}, zapcore.Field{Key: tagTaskList, Type: zapcore.StringType, String: taskList}, zapcore.Field{Key: tagWorkerID, Type: zapcore.StringType, String: workerParams.Identity}, ) @@ -1537,8 +1537,8 @@ func setClientDefaults(client *WorkflowClient) { if client.dataConverter == nil { client.dataConverter = getDefaultDataConverter() } - if len(client.domain) == 0 { - client.domain = DefaultDomainName + if len(client.namespace) == 0 { + client.namespace = DefaultNamespace } if client.tracer == nil { client.tracer = opentracing.NoopTracer{} diff --git a/internal/internal_worker_base.go b/internal/internal_worker_base.go index ee2321d0b..1259a4b50 100644 --- a/internal/internal_worker_base.go +++ b/internal/internal_worker_base.go @@ -74,13 +74,13 @@ type ( WorkflowInfo() *WorkflowInfo Complete(result []byte, err error) RegisterCancelHandler(handler func()) - RequestCancelChildWorkflow(domainName, workflowID string) - RequestCancelExternalWorkflow(domainName, workflowID, runID string, callback resultHandler) + RequestCancelChildWorkflow(namespace, workflowID string) + RequestCancelExternalWorkflow(namespace, workflowID, runID string, callback resultHandler) ExecuteChildWorkflow(params executeWorkflowParams, callback resultHandler, startedHandler func(r WorkflowExecution, e error)) error GetLogger() *zap.Logger GetMetricsScope() tally.Scope RegisterSignalHandler(handler func(name string, input []byte)) - SignalExternalWorkflow(domainName, workflowID, runID, signalName string, input []byte, arg interface{}, childWorkflowOnly bool, callback resultHandler) + SignalExternalWorkflow(namespace, workflowID, runID, signalName string, input []byte, arg interface{}, childWorkflowOnly bool, callback resultHandler) RegisterQueryHandler(handler func(queryType string, queryArgs []byte) ([]byte, error)) IsReplaying() bool MutableSideEffect(id string, f func() interface{}, equals func(a, b interface{}) bool) Value diff --git a/internal/internal_worker_interfaces_test.go b/internal/internal_worker_interfaces_test.go index 05a1503e0..2b34c58df 100644 --- a/internal/internal_worker_interfaces_test.go +++ b/internal/internal_worker_interfaces_test.go @@ -174,7 +174,7 @@ func (s *InterfacesTestSuite) TearDownTest() { func (s *InterfacesTestSuite) TestInterface() { logger, _ := zap.NewDevelopment() - domain := "testDomain" + namespace := "testNamespace" // Workflow execution parameters. workflowExecutionParameters := workerExecutionParameters{ TaskList: "testTaskList", @@ -184,16 +184,16 @@ func (s *InterfacesTestSuite) TestInterface() { Tracer: opentracing.NoopTracer{}, } - domainStatus := enums.DomainStatusRegistered - domainDesc := &workflowservice.DescribeDomainResponse{ - DomainInfo: &commonproto.DomainInfo{ - Name: domain, - Status: domainStatus, + namespaceStatus := enums.NamespaceStatusRegistered + namespaceDesc := &workflowservice.DescribeNamespaceResponse{ + NamespaceInfo: &commonproto.NamespaceInfo{ + Name: namespace, + Status: namespaceStatus, }, } // mocks - s.service.EXPECT().DescribeDomain(gomock.Any(), gomock.Any(), gomock.Any()).Return(domainDesc, nil).AnyTimes() + s.service.EXPECT().DescribeNamespace(gomock.Any(), gomock.Any(), gomock.Any()).Return(namespaceDesc, nil).AnyTimes() s.service.EXPECT().PollForActivityTask(gomock.Any(), gomock.Any(), gomock.Any()).Return(&workflowservice.PollForActivityTaskResponse{}, nil).AnyTimes() s.service.EXPECT().RespondActivityTaskCompleted(gomock.Any(), gomock.Any(), gomock.Any()).Return(&workflowservice.RespondActivityTaskCompletedResponse{}, nil).AnyTimes() s.service.EXPECT().PollForDecisionTask(gomock.Any(), gomock.Any(), gomock.Any()).Return(&workflowservice.PollForDecisionTaskResponse{}, nil).AnyTimes() diff --git a/internal/internal_worker_test.go b/internal/internal_worker_test.go index d6da2aaca..77fc4ea2c 100644 --- a/internal/internal_worker_test.go +++ b/internal/internal_worker_test.go @@ -400,16 +400,16 @@ func (s *internalWorkerTestSuite) testDecisionTaskHandlerHelper(params workerExe func (s *internalWorkerTestSuite) TestDecisionTaskHandler() { params := workerExecutionParameters{ - DomainName: testDomain, - Identity: "identity", - Logger: getLogger(), + Namespace: testNamespace, + Identity: "identity", + Logger: getLogger(), } s.testDecisionTaskHandlerHelper(params) } func (s *internalWorkerTestSuite) TestDecisionTaskHandler_WithDataConverter() { params := workerExecutionParameters{ - DomainName: testDomain, + Namespace: testNamespace, Identity: "identity", Logger: getLogger(), DataConverter: newTestDataConverter(), @@ -480,11 +480,11 @@ func (s *internalWorkerTestSuite) TestNoActivitiesOrWorkflows() { assert.NoError(t, w.Start()) } -func (s *internalWorkerTestSuite) TestWorkerStartFailsWithInvalidDomain() { +func (s *internalWorkerTestSuite) TestWorkerStartFailsWithInvalidNamespace() { t := s.T() testCases := []struct { - domainErr error - isErrFatal bool + namespaceErr error + isErrFatal bool }{ {serviceerror.NewNotFound(""), true}, {serviceerror.NewInvalidArgument(""), true}, @@ -496,22 +496,22 @@ func (s *internalWorkerTestSuite) TestWorkerStartFailsWithInvalidDomain() { for _, tc := range testCases { service := workflowservicemock.NewMockWorkflowServiceClient(mockCtrl) - service.EXPECT().DescribeDomain(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, tc.domainErr).Do( - func(ctx context.Context, request *workflowservice.DescribeDomainRequest, opts ...grpc.CallOption) { + service.EXPECT().DescribeNamespace(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, tc.namespaceErr).Do( + func(ctx context.Context, request *workflowservice.DescribeNamespaceRequest, opts ...grpc.CallOption) { // log }).Times(2) worker := createWorker(service) if tc.isErrFatal { err := worker.Start() - assert.Error(t, err, "worker.start() MUST fail when domain is invalid") + assert.Error(t, err, "worker.start() MUST fail when namespace is invalid") errC := make(chan error) go func() { errC <- worker.Run() }() select { case e := <-errC: - assert.Error(t, e, "worker.Run() MUST fail when domain is invalid") + assert.Error(t, e, "worker.Run() MUST fail when namespace is invalid") case <-time.After(time.Second): - assert.Fail(t, "worker.Run() MUST fail when domain is invalid") + assert.Fail(t, "worker.Run() MUST fail when namespace is invalid") } continue } @@ -553,17 +553,17 @@ func createWorker(service *workflowservicemock.MockWorkflowServiceClient) *Aggre func createWorkerWithThrottle( service *workflowservicemock.MockWorkflowServiceClient, activitiesPerSecond float64, dc DataConverter, ) *AggregatedWorker { - domain := "testDomain" - domainStatus := enums.DomainStatusRegistered - domainDesc := &workflowservice.DescribeDomainResponse{ - DomainInfo: &commonproto.DomainInfo{ - Name: domain, - Status: domainStatus, + namespace := "testNamespace" + namespaceStatus := enums.NamespaceStatusRegistered + namespaceDesc := &workflowservice.DescribeNamespaceResponse{ + NamespaceInfo: &commonproto.NamespaceInfo{ + Name: namespace, + Status: namespaceStatus, }, } // mocks - service.EXPECT().DescribeDomain(gomock.Any(), gomock.Any(), gomock.Any()).Return(domainDesc, nil).Do( - func(ctx context.Context, request *workflowservice.DescribeDomainRequest, opts ...grpc.CallOption) { + service.EXPECT().DescribeNamespace(gomock.Any(), gomock.Any(), gomock.Any()).Return(namespaceDesc, nil).Do( + func(ctx context.Context, request *workflowservice.DescribeNamespaceRequest, opts ...grpc.CallOption) { // log }).AnyTimes() @@ -588,7 +588,7 @@ func createWorkerWithThrottle( EnableSessionWorker: true} clientOptions := ClientOptions{ - DomainName: domain, + Namespace: namespace, } if dc != nil { clientOptions.DataConverter = dc @@ -643,7 +643,7 @@ func (s *internalWorkerTestSuite) TestCompleteActivity_WithDataConverter() { func (s *internalWorkerTestSuite) TestCompleteActivityById() { t := s.T() mockService := s.service - wfClient := NewServiceClient(mockService, nil, ClientOptions{DomainName: "testDomain"}) + wfClient := NewServiceClient(mockService, nil, ClientOptions{Namespace: "testNamespace"}) var completedRequest, canceledRequest, failedRequest interface{} mockService.EXPECT().RespondActivityTaskCompletedByID(gomock.Any(), gomock.Any(), gomock.Any()).Return(&workflowservice.RespondActivityTaskCompletedByIDResponse{}, nil).Do( func(ctx context.Context, request *workflowservice.RespondActivityTaskCompletedByIDRequest, opts ...grpc.CallOption) { @@ -662,18 +662,18 @@ func (s *internalWorkerTestSuite) TestCompleteActivityById() { runID := "" activityID := "aid" - _ = wfClient.CompleteActivityByID(context.Background(), DefaultDomainName, workflowID, runID, activityID, nil, nil) + _ = wfClient.CompleteActivityByID(context.Background(), DefaultNamespace, workflowID, runID, activityID, nil, nil) require.NotNil(t, completedRequest) - _ = wfClient.CompleteActivityByID(context.Background(), DefaultDomainName, workflowID, runID, activityID, nil, NewCanceledError()) + _ = wfClient.CompleteActivityByID(context.Background(), DefaultNamespace, workflowID, runID, activityID, nil, NewCanceledError()) require.NotNil(t, canceledRequest) - _ = wfClient.CompleteActivityByID(context.Background(), DefaultDomainName, workflowID, runID, activityID, nil, errors.New("")) + _ = wfClient.CompleteActivityByID(context.Background(), DefaultNamespace, workflowID, runID, activityID, nil, errors.New("")) require.NotNil(t, failedRequest) } func (s *internalWorkerTestSuite) TestRecordActivityHeartbeat() { - wfClient := NewServiceClient(s.service, nil, ClientOptions{DomainName: "testDomain"}) + wfClient := NewServiceClient(s.service, nil, ClientOptions{Namespace: "testNamespace"}) var heartbeatRequest *workflowservice.RecordActivityTaskHeartbeatRequest heartbeatResponse := workflowservice.RecordActivityTaskHeartbeatResponse{CancelRequested: false} s.service.EXPECT().RecordActivityTaskHeartbeat(gomock.Any(), gomock.Any(), gomock.Any()).Return(&heartbeatResponse, nil). @@ -689,7 +689,7 @@ func (s *internalWorkerTestSuite) TestRecordActivityHeartbeat() { func (s *internalWorkerTestSuite) TestRecordActivityHeartbeat_WithDataConverter() { t := s.T() dc := newTestDataConverter() - opt := ClientOptions{DomainName: "testDomain", DataConverter: dc} + opt := ClientOptions{Namespace: "testNamespace", DataConverter: dc} wfClient := NewServiceClient(s.service, nil, opt) var heartbeatRequest *workflowservice.RecordActivityTaskHeartbeatRequest heartbeatResponse := workflowservice.RecordActivityTaskHeartbeatResponse{CancelRequested: false} @@ -709,7 +709,7 @@ func (s *internalWorkerTestSuite) TestRecordActivityHeartbeat_WithDataConverter( } func (s *internalWorkerTestSuite) TestRecordActivityHeartbeatByID() { - wfClient := NewServiceClient(s.service, nil, ClientOptions{DomainName: "testDomain"}) + wfClient := NewServiceClient(s.service, nil, ClientOptions{Namespace: "testNamespace"}) var heartbeatRequest *workflowservice.RecordActivityTaskHeartbeatByIDRequest heartbeatResponse := workflowservice.RecordActivityTaskHeartbeatByIDResponse{CancelRequested: false} s.service.EXPECT().RecordActivityTaskHeartbeatByID(gomock.Any(), gomock.Any(), gomock.Any()).Return(&heartbeatResponse, nil). @@ -717,8 +717,8 @@ func (s *internalWorkerTestSuite) TestRecordActivityHeartbeatByID() { heartbeatRequest = request }).Times(2) - _ = wfClient.RecordActivityHeartbeatByID(context.Background(), DefaultDomainName, "wid", "rid", "aid") - _ = wfClient.RecordActivityHeartbeatByID(context.Background(), DefaultDomainName, "wid", "rid", "aid", + _ = wfClient.RecordActivityHeartbeatByID(context.Background(), DefaultNamespace, "wid", "rid", "aid") + _ = wfClient.RecordActivityHeartbeatByID(context.Background(), DefaultNamespace, "wid", "rid", "aid", "testStack", "customerObjects", 4) require.NotNil(s.T(), heartbeatRequest) } @@ -1185,7 +1185,7 @@ func TestWorkerOptionDefaults(t *testing.T) { require.Nil(t, decisionWorker.executionParameters.ContextPropagators) expected := workerExecutionParameters{ - DomainName: DefaultDomainName, + Namespace: DefaultNamespace, TaskList: taskList, MaxConcurrentActivityPollers: defaultConcurrentPollRoutineSize, MaxConcurrentDecisionPollers: defaultConcurrentPollRoutineSize, @@ -1221,7 +1221,7 @@ func TestWorkerOptionNonDefaults(t *testing.T) { client := &WorkflowClient{ workflowService: nil, connectionCloser: nil, - domain: "worker-options-test", + namespace: "worker-options-test", registry: nil, identity: "143@worker-options-test-1", dataConverter: &defaultDataConverter{}, diff --git a/internal/internal_workers_test.go b/internal/internal_workers_test.go index 9701d4903..e63f076e7 100644 --- a/internal/internal_workers_test.go +++ b/internal/internal_workers_test.go @@ -89,13 +89,13 @@ func TestWorkersTestSuite(t *testing.T) { func (s *WorkersTestSuite) TestWorkflowWorker() { logger, _ := zap.NewDevelopment() - s.service.EXPECT().DescribeDomain(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil) + s.service.EXPECT().DescribeNamespace(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil) s.service.EXPECT().PollForDecisionTask(gomock.Any(), gomock.Any(), gomock.Any()).Return(&workflowservice.PollForDecisionTaskResponse{}, nil).AnyTimes() s.service.EXPECT().RespondDecisionTaskCompleted(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() ctx, cancel := context.WithCancel(context.Background()) executionParameters := workerExecutionParameters{ - DomainName: DefaultDomainName, + Namespace: DefaultNamespace, TaskList: "testTaskList", MaxConcurrentDecisionPollers: 5, Logger: logger, @@ -113,12 +113,12 @@ func (s *WorkersTestSuite) TestWorkflowWorker() { func (s *WorkersTestSuite) TestActivityWorker() { logger, _ := zap.NewDevelopment() - s.service.EXPECT().DescribeDomain(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil) + s.service.EXPECT().DescribeNamespace(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil) s.service.EXPECT().PollForActivityTask(gomock.Any(), gomock.Any(), gomock.Any()).Return(&workflowservice.PollForActivityTaskResponse{}, nil).AnyTimes() s.service.EXPECT().RespondActivityTaskCompleted(gomock.Any(), gomock.Any(), gomock.Any()).Return(&workflowservice.RespondActivityTaskCompletedResponse{}, nil).AnyTimes() executionParameters := workerExecutionParameters{ - DomainName: DefaultDomainName, + Namespace: DefaultNamespace, TaskList: "testTaskList", MaxConcurrentActivityPollers: 5, Logger: logger, @@ -149,17 +149,17 @@ func (s *WorkersTestSuite) TestActivityWorkerStop() { WorkflowType: &commonproto.WorkflowType{ Name: "wType", }, - WorkflowDomain: "domain", + WorkflowNamespace: "namespace", } - s.service.EXPECT().DescribeDomain(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil) + s.service.EXPECT().DescribeNamespace(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil) s.service.EXPECT().PollForActivityTask(gomock.Any(), gomock.Any(), gomock.Any()).Return(pats, nil).AnyTimes() s.service.EXPECT().RespondActivityTaskCompleted(gomock.Any(), gomock.Any(), gomock.Any()).Return(&workflowservice.RespondActivityTaskCompletedResponse{}, nil).AnyTimes() stopC := make(chan struct{}) ctx, cancel := context.WithCancel(context.Background()) executionParameters := workerExecutionParameters{ - DomainName: DefaultDomainName, + Namespace: DefaultNamespace, TaskList: "testTaskList", MaxConcurrentActivityPollers: 5, ConcurrentActivityExecutionSize: 2, @@ -189,11 +189,11 @@ func (s *WorkersTestSuite) TestActivityWorkerStop() { } func (s *WorkersTestSuite) TestPollForDecisionTask_InternalServiceError() { - s.service.EXPECT().DescribeDomain(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil) + s.service.EXPECT().DescribeNamespace(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil) s.service.EXPECT().PollForDecisionTask(gomock.Any(), gomock.Any(), gomock.Any()).Return(&workflowservice.PollForDecisionTaskResponse{}, serviceerror.NewInternal("")).AnyTimes() executionParameters := workerExecutionParameters{ - DomainName: DefaultDomainName, + Namespace: DefaultNamespace, TaskList: "testDecisionTaskList", MaxConcurrentDecisionPollers: 5, Logger: zap.NewNop(), @@ -271,7 +271,7 @@ func (s *WorkersTestSuite) TestLongRunningDecisionTask() { createTestEventDecisionTaskStarted(11), } - s.service.EXPECT().DescribeDomain(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() + s.service.EXPECT().DescribeNamespace(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() task := &workflowservice.PollForDecisionTaskResponse{ TaskToken: []byte("test-token"), WorkflowExecution: &commonproto.WorkflowExecution{ @@ -411,7 +411,7 @@ func (s *WorkersTestSuite) TestMultipleLocalActivities() { createTestEventDecisionTaskStarted(11), } - s.service.EXPECT().DescribeDomain(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() + s.service.EXPECT().DescribeNamespace(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() task := &workflowservice.PollForDecisionTaskResponse{ TaskToken: []byte("test-token"), WorkflowExecution: &commonproto.WorkflowExecution{ diff --git a/internal/internal_workflow.go b/internal/internal_workflow.go index 2b2cc55ee..8368da424 100644 --- a/internal/internal_workflow.go +++ b/internal/internal_workflow.go @@ -168,7 +168,7 @@ type ( taskListName string executionStartToCloseTimeoutSeconds int32 taskStartToCloseTimeoutSeconds int32 - domain string + namespace string workflowID string waitForCancellation bool signalChannels map[string]Channel @@ -423,7 +423,7 @@ func newWorkflowContext(env workflowEnvironment, interceptors WorkflowIntercepto // Set default values for the workflow execution. wInfo := env.WorkflowInfo() - rootCtx = WithWorkflowDomain(rootCtx, wInfo.Domain) + rootCtx = WithWorkflowNamespace(rootCtx, wInfo.Namespace) rootCtx = WithWorkflowTaskList(rootCtx, wInfo.TaskListName) rootCtx = WithExecutionStartToCloseTimeout(rootCtx, time.Duration(wInfo.ExecutionStartToCloseTimeoutSeconds)*time.Second) rootCtx = WithWorkflowTaskStartToCloseTimeout(rootCtx, time.Duration(wInfo.TaskStartToCloseTimeoutSeconds)*time.Second) @@ -1153,9 +1153,9 @@ func getValidatedWorkflowOptions(ctx Context) (*workflowOptions, error) { return nil, errWorkflowOptionBadRequest } info := GetWorkflowInfo(ctx) - if p.domain == "" { - // default to use current workflow's domain - p.domain = info.Domain + if p.namespace == "" { + // default to use current workflow's namespace + p.namespace = info.Namespace } if p.taskListName == "" { // default to use current workflow's task list diff --git a/internal/internal_workflow_client.go b/internal/internal_workflow_client.go index 55f7240fc..3c305a0ed 100644 --- a/internal/internal_workflow_client.go +++ b/internal/internal_workflow_client.go @@ -44,7 +44,7 @@ import ( // Assert that structs do indeed implement the interfaces var _ Client = (*WorkflowClient)(nil) -var _ DomainClient = (*domainClient)(nil) +var _ NamespaceClient = (*namespaceClient)(nil) const ( defaultDecisionTaskTimeoutInSecs = 10 @@ -60,7 +60,7 @@ type ( WorkflowClient struct { workflowService workflowservice.WorkflowServiceClient connectionCloser io.Closer - domain string + namespace string registry *registry metricsScope *metrics.TaggedScope identity string @@ -69,8 +69,8 @@ type ( tracer opentracing.Tracer } - // domainClient is the client for managing domains. - domainClient struct { + // namespaceClient is the client for managing namespaces. + namespaceClient struct { workflowService workflowservice.WorkflowServiceClient connectionCloser io.Closer metricsScope tally.Scope @@ -206,7 +206,7 @@ func (wc *WorkflowClient) StartWorkflow( // run propagators to extract information about tracing and other stuff, store in headers field startRequest := &workflowservice.StartWorkflowExecutionRequest{ - Domain: wc.domain, + Namespace: wc.namespace, RequestId: uuid.New(), WorkflowId: workflowID, WorkflowType: &commonproto.WorkflowType{Name: workflowType.Name}, @@ -322,7 +322,7 @@ func (wc *WorkflowClient) SignalWorkflow(ctx context.Context, workflowID string, } request := &workflowservice.SignalWorkflowExecutionRequest{ - Domain: wc.domain, + Namespace: wc.namespace, WorkflowExecution: &commonproto.WorkflowExecution{ WorkflowId: workflowID, RunId: runID, @@ -396,7 +396,7 @@ func (wc *WorkflowClient) SignalWithStartWorkflow(ctx context.Context, workflowI header := wc.getWorkflowHeader(ctx) signalWithStartRequest := &workflowservice.SignalWithStartWorkflowExecutionRequest{ - Domain: wc.domain, + Namespace: wc.namespace, RequestId: uuid.New(), WorkflowId: workflowID, WorkflowType: &commonproto.WorkflowType{Name: workflowType.Name}, @@ -448,7 +448,7 @@ func (wc *WorkflowClient) SignalWithStartWorkflow(ctx context.Context, workflowI // If runID is omit, it will terminate currently running workflow (if there is one) based on the workflowID. func (wc *WorkflowClient) CancelWorkflow(ctx context.Context, workflowID string, runID string) error { request := &workflowservice.RequestCancelWorkflowExecutionRequest{ - Domain: wc.domain, + Namespace: wc.namespace, WorkflowExecution: &commonproto.WorkflowExecution{ WorkflowId: workflowID, RunId: runID, @@ -470,7 +470,7 @@ func (wc *WorkflowClient) CancelWorkflow(ctx context.Context, workflowID string, // If runID is omit, it will terminate currently running workflow (if there is one) based on the workflowID. func (wc *WorkflowClient) TerminateWorkflow(ctx context.Context, workflowID string, runID string, reason string, details []byte) error { request := &workflowservice.TerminateWorkflowExecutionRequest{ - Domain: wc.domain, + Namespace: wc.namespace, WorkflowExecution: &commonproto.WorkflowExecution{ WorkflowId: workflowID, RunId: runID, @@ -495,10 +495,10 @@ func (wc *WorkflowClient) TerminateWorkflow(ctx context.Context, workflowID stri func (wc *WorkflowClient) GetWorkflowHistory(ctx context.Context, workflowID string, runID string, isLongPoll bool, filterType enums.HistoryEventFilterType) HistoryEventIterator { - domain := wc.domain + namespace := wc.namespace paginate := func(nexttoken []byte) (*workflowservice.GetWorkflowExecutionHistoryResponse, error) { request := &workflowservice.GetWorkflowExecutionHistoryRequest{ - Domain: domain, + Namespace: namespace, Execution: &commonproto.WorkflowExecution{ WorkflowId: workflowID, RunId: runID, @@ -565,12 +565,12 @@ func (wc *WorkflowClient) CompleteActivity(ctx context.Context, taskToken []byte } // CompleteActivityByID reports activity completed. Similar to CompleteActivity -// It takes domain name, workflowID, runID, activityID as arguments. -func (wc *WorkflowClient) CompleteActivityByID(ctx context.Context, domain, workflowID, runID, activityID string, +// It takes namespace name, workflowID, runID, activityID as arguments. +func (wc *WorkflowClient) CompleteActivityByID(ctx context.Context, namespace, workflowID, runID, activityID string, result interface{}, err error) error { - if activityID == "" || workflowID == "" || domain == "" { - return errors.New("empty activity or workflow id or domainName") + if activityID == "" || workflowID == "" || namespace == "" { + return errors.New("empty activity or workflow id or namespace") } var data []byte @@ -582,7 +582,7 @@ func (wc *WorkflowClient) CompleteActivityByID(ctx context.Context, domain, work } } - request := convertActivityResultToRespondRequestByID(wc.identity, domain, workflowID, runID, activityID, data, err, wc.dataConverter) + request := convertActivityResultToRespondRequestByID(wc.identity, namespace, workflowID, runID, activityID, data, err, wc.dataConverter) return reportActivityCompleteByID(ctx, wc.workflowService, request, wc.metricsScope) } @@ -597,12 +597,12 @@ func (wc *WorkflowClient) RecordActivityHeartbeat(ctx context.Context, taskToken // RecordActivityHeartbeatByID records heartbeat for an activity. func (wc *WorkflowClient) RecordActivityHeartbeatByID(ctx context.Context, - domain, workflowID, runID, activityID string, details ...interface{}) error { + namespace, workflowID, runID, activityID string, details ...interface{}) error { data, err := encodeArgs(wc.dataConverter, details) if err != nil { return err } - return recordActivityHeartbeatByID(ctx, wc.workflowService, wc.identity, domain, workflowID, runID, activityID, data) + return recordActivityHeartbeatByID(ctx, wc.workflowService, wc.identity, namespace, workflowID, runID, activityID, data) } // ListClosedWorkflow gets closed workflow executions based on request filters @@ -611,8 +611,8 @@ func (wc *WorkflowClient) RecordActivityHeartbeatByID(ctx context.Context, // - InternalServiceError // - EntityNotExistError func (wc *WorkflowClient) ListClosedWorkflow(ctx context.Context, request *workflowservice.ListClosedWorkflowExecutionsRequest) (*workflowservice.ListClosedWorkflowExecutionsResponse, error) { - if len(request.GetDomain()) == 0 { - request.Domain = wc.domain + if len(request.GetNamespace()) == 0 { + request.Namespace = wc.namespace } var response *workflowservice.ListClosedWorkflowExecutionsResponse err := backoff.Retry(ctx, @@ -635,8 +635,8 @@ func (wc *WorkflowClient) ListClosedWorkflow(ctx context.Context, request *workf // - InternalServiceError // - EntityNotExistError func (wc *WorkflowClient) ListOpenWorkflow(ctx context.Context, request *workflowservice.ListOpenWorkflowExecutionsRequest) (*workflowservice.ListOpenWorkflowExecutionsResponse, error) { - if len(request.GetDomain()) == 0 { - request.Domain = wc.domain + if len(request.GetNamespace()) == 0 { + request.Namespace = wc.namespace } var response *workflowservice.ListOpenWorkflowExecutionsResponse err := backoff.Retry(ctx, @@ -655,8 +655,8 @@ func (wc *WorkflowClient) ListOpenWorkflow(ctx context.Context, request *workflo // ListWorkflow implementation func (wc *WorkflowClient) ListWorkflow(ctx context.Context, request *workflowservice.ListWorkflowExecutionsRequest) (*workflowservice.ListWorkflowExecutionsResponse, error) { - if len(request.GetDomain()) == 0 { - request.Domain = wc.domain + if len(request.GetNamespace()) == 0 { + request.Namespace = wc.namespace } var response *workflowservice.ListWorkflowExecutionsResponse err := backoff.Retry(ctx, @@ -675,8 +675,8 @@ func (wc *WorkflowClient) ListWorkflow(ctx context.Context, request *workflowser // ListArchivedWorkflow implementation func (wc *WorkflowClient) ListArchivedWorkflow(ctx context.Context, request *workflowservice.ListArchivedWorkflowExecutionsRequest) (*workflowservice.ListArchivedWorkflowExecutionsResponse, error) { - if len(request.GetDomain()) == 0 { - request.Domain = wc.domain + if len(request.GetNamespace()) == 0 { + request.Namespace = wc.namespace } var response *workflowservice.ListArchivedWorkflowExecutionsResponse err := backoff.Retry(ctx, @@ -707,8 +707,8 @@ func (wc *WorkflowClient) ListArchivedWorkflow(ctx context.Context, request *wor // ScanWorkflow implementation func (wc *WorkflowClient) ScanWorkflow(ctx context.Context, request *workflowservice.ScanWorkflowExecutionsRequest) (*workflowservice.ScanWorkflowExecutionsResponse, error) { - if len(request.GetDomain()) == 0 { - request.Domain = wc.domain + if len(request.GetNamespace()) == 0 { + request.Namespace = wc.namespace } var response *workflowservice.ScanWorkflowExecutionsResponse err := backoff.Retry(ctx, @@ -727,8 +727,8 @@ func (wc *WorkflowClient) ScanWorkflow(ctx context.Context, request *workflowser // CountWorkflow implementation func (wc *WorkflowClient) CountWorkflow(ctx context.Context, request *workflowservice.CountWorkflowExecutionsRequest) (*workflowservice.CountWorkflowExecutionsResponse, error) { - if len(request.GetDomain()) == 0 { - request.Domain = wc.domain + if len(request.GetNamespace()) == 0 { + request.Namespace = wc.namespace } var response *workflowservice.CountWorkflowExecutionsResponse err := backoff.Retry(ctx, @@ -769,7 +769,7 @@ func (wc *WorkflowClient) GetSearchAttributes(ctx context.Context) (*workflowser // - EntityNotExistError func (wc *WorkflowClient) DescribeWorkflowExecution(ctx context.Context, workflowID, runID string) (*workflowservice.DescribeWorkflowExecutionResponse, error) { request := &workflowservice.DescribeWorkflowExecutionRequest{ - Domain: wc.domain, + Namespace: wc.namespace, Execution: &commonproto.WorkflowExecution{ WorkflowId: workflowID, RunId: runID, @@ -871,7 +871,7 @@ func (wc *WorkflowClient) QueryWorkflowWithOptions(ctx context.Context, request } } req := &workflowservice.QueryWorkflowRequest{ - Domain: wc.domain, + Namespace: wc.namespace, Execution: &commonproto.WorkflowExecution{ WorkflowId: request.WorkflowID, RunId: request.RunID, @@ -919,7 +919,7 @@ func (wc *WorkflowClient) QueryWorkflowWithOptions(ctx context.Context, request // - EntityNotExistError func (wc *WorkflowClient) DescribeTaskList(ctx context.Context, taskList string, taskListType enums.TaskListType) (*workflowservice.DescribeTaskListResponse, error) { request := &workflowservice.DescribeTaskListRequest{ - Domain: wc.domain, + Namespace: wc.namespace, TaskList: &commonproto.TaskList{Name: taskList}, TaskListType: taskListType, } @@ -960,42 +960,42 @@ func (wc *WorkflowClient) getWorkflowHeader(ctx context.Context) *commonproto.He return header } -// Register a domain with temporal server +// Register a namespace with temporal server // The errors it can throw: -// - DomainAlreadyExistsError +// - NamespaceAlreadyExistsError // - BadRequestError // - InternalServiceError -func (dc *domainClient) Register(ctx context.Context, request *workflowservice.RegisterDomainRequest) error { +func (dc *namespaceClient) Register(ctx context.Context, request *workflowservice.RegisterNamespaceRequest) error { return backoff.Retry(ctx, func() error { tchCtx, cancel := newChannelContext(ctx) defer cancel() var err error - _, err = dc.workflowService.RegisterDomain(tchCtx, request) + _, err = dc.workflowService.RegisterNamespace(tchCtx, request) return err }, createDynamicServiceRetryPolicy(ctx), isServiceTransientError) } -// Describe a domain. The domain has 3 part of information -// DomainInfo - Which has Name, Status, Description, Owner Email -// DomainConfiguration - Configuration like Workflow Execution Retention Period In Days, Whether to emit metrics. +// Describe a namespace. The namespace has 3 part of information +// NamespaceInfo - Which has Name, Status, Description, Owner Email +// NamespaceConfiguration - Configuration like Workflow Execution Retention Period In Days, Whether to emit metrics. // ReplicationConfiguration - replication config like clusters and active cluster name // The errors it can throw: // - EntityNotExistsError // - BadRequestError // - InternalServiceError -func (dc *domainClient) Describe(ctx context.Context, name string) (*workflowservice.DescribeDomainResponse, error) { - request := &workflowservice.DescribeDomainRequest{ +func (dc *namespaceClient) Describe(ctx context.Context, name string) (*workflowservice.DescribeNamespaceResponse, error) { + request := &workflowservice.DescribeNamespaceRequest{ Name: name, } - var response *workflowservice.DescribeDomainResponse + var response *workflowservice.DescribeNamespaceResponse err := backoff.Retry(ctx, func() error { tchCtx, cancel := newChannelContext(ctx) defer cancel() var err error - response, err = dc.workflowService.DescribeDomain(tchCtx, request) + response, err = dc.workflowService.DescribeNamespace(tchCtx, request) return err }, createDynamicServiceRetryPolicy(ctx), isServiceTransientError) if err != nil { @@ -1004,23 +1004,23 @@ func (dc *domainClient) Describe(ctx context.Context, name string) (*workflowser return response, nil } -// Update a domain. +// Update a namespace. // The errors it can throw: // - EntityNotExistsError // - BadRequestError // - InternalServiceError -func (dc *domainClient) Update(ctx context.Context, request *workflowservice.UpdateDomainRequest) error { +func (dc *namespaceClient) Update(ctx context.Context, request *workflowservice.UpdateNamespaceRequest) error { return backoff.Retry(ctx, func() error { tchCtx, cancel := newChannelContext(ctx) defer cancel() - _, err := dc.workflowService.UpdateDomain(tchCtx, request) + _, err := dc.workflowService.UpdateNamespace(tchCtx, request) return err }, createDynamicServiceRetryPolicy(ctx), isServiceTransientError) } // CloseConnection closes underlying gRPC connection. -func (dc *domainClient) CloseConnection() error { +func (dc *namespaceClient) CloseConnection() error { if dc.connectionCloser == nil { return nil } diff --git a/internal/internal_workflow_client_test.go b/internal/internal_workflow_client_test.go index 5ffc6deb1..d161d93b2 100644 --- a/internal/internal_workflow_client_test.go +++ b/internal/internal_workflow_client_test.go @@ -146,7 +146,7 @@ func (s *historyEventIteratorSuite) SetupTest() { s.wfClient = &WorkflowClient{ workflowService: s.workflowServiceClient, - domain: DefaultDomainName, + namespace: DefaultNamespace, } } @@ -725,7 +725,7 @@ func (s *workflowRunSuite) TestGetWorkflow() { func getGetWorkflowExecutionHistoryRequest(filterType enums.HistoryEventFilterType) *workflowservice.GetWorkflowExecutionHistoryRequest { request := &workflowservice.GetWorkflowExecutionHistoryRequest{ - Domain: DefaultDomainName, + Namespace: DefaultNamespace, Execution: &commonproto.WorkflowExecution{ WorkflowId: workflowID, RunId: runID, @@ -1036,16 +1036,16 @@ func (s *workflowClientTestSuite) TestListWorkflow() { response := &workflowservice.ListWorkflowExecutionsResponse{} s.service.EXPECT().ListWorkflowExecutions(gomock.Any(), gomock.Any(), gomock.Any()).Return(response, nil). Do(func(_ interface{}, req *workflowservice.ListWorkflowExecutionsRequest, _ ...interface{}) { - s.Equal(DefaultDomainName, request.GetDomain()) + s.Equal(DefaultNamespace, request.GetNamespace()) }) resp, err := s.client.ListWorkflow(context.Background(), request) s.Nil(err) s.Equal(response, resp) - request.Domain = "another" + request.Namespace = "another" s.service.EXPECT().ListWorkflowExecutions(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, serviceerror.NewInvalidArgument("")). Do(func(_ interface{}, req *workflowservice.ListWorkflowExecutionsRequest, _ ...interface{}) { - s.Equal("another", request.GetDomain()) + s.Equal("another", request.GetNamespace()) }) _, err = s.client.ListWorkflow(context.Background(), request) s.IsType(&serviceerror.InvalidArgument{}, err) @@ -1056,7 +1056,7 @@ func (s *workflowClientTestSuite) TestListArchivedWorkflow() { response := &workflowservice.ListArchivedWorkflowExecutionsResponse{} s.service.EXPECT().ListArchivedWorkflowExecutions(gomock.Any(), gomock.Any(), gomock.Any()).Return(response, nil). Do(func(_ interface{}, req *workflowservice.ListArchivedWorkflowExecutionsRequest, _ ...interface{}) { - s.Equal(DefaultDomainName, request.GetDomain()) + s.Equal(DefaultNamespace, request.GetNamespace()) }) ctxWithTimeout, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() @@ -1064,10 +1064,10 @@ func (s *workflowClientTestSuite) TestListArchivedWorkflow() { s.Nil(err) s.Equal(response, resp) - request.Domain = "another" + request.Namespace = "another" s.service.EXPECT().ListArchivedWorkflowExecutions(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, serviceerror.NewInvalidArgument("")). Do(func(_ interface{}, req *workflowservice.ListArchivedWorkflowExecutionsRequest, _ ...interface{}) { - s.Equal("another", request.GetDomain()) + s.Equal("another", request.GetNamespace()) }) _, err = s.client.ListArchivedWorkflow(ctxWithTimeout, request) s.IsType(&serviceerror.InvalidArgument{}, err) @@ -1078,16 +1078,16 @@ func (s *workflowClientTestSuite) TestScanWorkflow() { response := &workflowservice.ScanWorkflowExecutionsResponse{} s.service.EXPECT().ScanWorkflowExecutions(gomock.Any(), gomock.Any(), gomock.Any()).Return(response, nil). Do(func(_ interface{}, req *workflowservice.ScanWorkflowExecutionsRequest, _ ...interface{}) { - s.Equal(DefaultDomainName, request.GetDomain()) + s.Equal(DefaultNamespace, request.GetNamespace()) }) resp, err := s.client.ScanWorkflow(context.Background(), request) s.Nil(err) s.Equal(response, resp) - request.Domain = "another" + request.Namespace = "another" s.service.EXPECT().ScanWorkflowExecutions(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, serviceerror.NewInvalidArgument("")). Do(func(_ interface{}, req *workflowservice.ScanWorkflowExecutionsRequest, _ ...interface{}) { - s.Equal("another", request.GetDomain()) + s.Equal("another", request.GetNamespace()) }) _, err = s.client.ScanWorkflow(context.Background(), request) s.IsType(&serviceerror.InvalidArgument{}, err) @@ -1098,16 +1098,16 @@ func (s *workflowClientTestSuite) TestCountWorkflow() { response := &workflowservice.CountWorkflowExecutionsResponse{} s.service.EXPECT().CountWorkflowExecutions(gomock.Any(), gomock.Any(), gomock.Any()).Return(response, nil). Do(func(_ interface{}, req *workflowservice.CountWorkflowExecutionsRequest, _ ...interface{}) { - s.Equal(DefaultDomainName, request.GetDomain()) + s.Equal(DefaultNamespace, request.GetNamespace()) }) resp, err := s.client.CountWorkflow(context.Background(), request) s.Nil(err) s.Equal(response, resp) - request.Domain = "another" + request.Namespace = "another" s.service.EXPECT().CountWorkflowExecutions(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, serviceerror.NewInvalidArgument("")). Do(func(_ interface{}, req *workflowservice.CountWorkflowExecutionsRequest, _ ...interface{}) { - s.Equal("another", request.GetDomain()) + s.Equal("another", request.GetNamespace()) }) _, err = s.client.CountWorkflow(context.Background(), request) s.IsType(&serviceerror.InvalidArgument{}, err) diff --git a/internal/internal_workflow_testsuite.go b/internal/internal_workflow_testsuite.go index 1fd4e345d..56b474166 100644 --- a/internal/internal_workflow_testsuite.go +++ b/internal/internal_workflow_testsuite.go @@ -48,12 +48,11 @@ import ( ) const ( - defaultTestDomain = "default-test-domain" + defaultTestNamespace = "default-test-namespace" defaultTestTaskList = "default-test-tasklist" defaultTestWorkflowID = "default-test-workflow-id" defaultTestRunID = "default-test-run-id" defaultTestWorkflowTypeName = "default-test-workflow-type-name" - defaultTestDomainName = "default-test-domain-name" workflowTypeNotSpecified = "workflow-type-not-specified" ) @@ -231,7 +230,7 @@ func newTestWorkflowEnvironmentImpl(s *WorkflowTestSuite, parentRegistry *regist }, workflowInfo: &WorkflowInfo{ - Domain: defaultTestDomain, + Namespace: defaultTestNamespace, WorkflowExecution: WorkflowExecution{ ID: defaultTestWorkflowID, RunID: defaultTestRunID, @@ -344,13 +343,13 @@ func (env *testWorkflowEnvironmentImpl) newTestWorkflowEnvironmentForChild(param childEnv.workflowInfo.Attempt = params.attempt childEnv.workflowInfo.WorkflowExecution.ID = params.workflowID childEnv.workflowInfo.WorkflowExecution.RunID = params.workflowID + "_RunID" - childEnv.workflowInfo.Domain = params.domain + childEnv.workflowInfo.Namespace = params.namespace childEnv.workflowInfo.TaskListName = params.taskListName childEnv.workflowInfo.ExecutionStartToCloseTimeoutSeconds = params.executionStartToCloseTimeoutSeconds childEnv.workflowInfo.TaskStartToCloseTimeoutSeconds = params.taskStartToCloseTimeoutSeconds childEnv.workflowInfo.lastCompletionResult = params.lastCompletionResult childEnv.workflowInfo.CronSchedule = cronSchedule - childEnv.workflowInfo.ParentWorkflowDomain = env.workflowInfo.Domain + childEnv.workflowInfo.ParentWorkflowNamespace = env.workflowInfo.Namespace childEnv.workflowInfo.ParentWorkflowExecution = &env.workflowInfo.WorkflowExecution childEnv.executionTimeout = time.Duration(params.executionStartToCloseTimeoutSeconds) * time.Second if workflowHandler, ok := env.runningWorkflows[params.workflowID]; ok { @@ -517,7 +516,7 @@ func (env *testWorkflowEnvironmentImpl) executeActivity( defaultTestRunID, "0", defaultTestWorkflowTypeName, - defaultTestDomainName, + defaultTestNamespace, params, ) @@ -933,7 +932,7 @@ func (env *testWorkflowEnvironmentImpl) ExecuteActivity(parameters executeActivi defaultTestRunID, activityInfo.activityID, defaultTestWorkflowTypeName, - defaultTestDomainName, + defaultTestNamespace, parameters, ) @@ -1556,7 +1555,7 @@ func (env *testWorkflowEnvironmentImpl) newTestActivityTaskHandler(taskList stri return taskHandler } -func newTestActivityTask(workflowID, runID, activityID, workflowTypeName, domainName string, params executeActivityParams) *workflowservice.PollForActivityTaskResponse { +func newTestActivityTask(workflowID, runID, activityID, workflowTypeName, namespace string, params executeActivityParams) *workflowservice.PollForActivityTaskResponse { task := &workflowservice.PollForActivityTaskResponse{ WorkflowExecution: &commonproto.WorkflowExecution{ WorkflowId: workflowID, @@ -1574,8 +1573,8 @@ func newTestActivityTask(workflowID, runID, activityID, workflowTypeName, domain WorkflowType: &commonproto.WorkflowType{ Name: workflowTypeName, }, - WorkflowDomain: domainName, - Header: params.Header, + WorkflowNamespace: namespace, + Header: params.Header, } return task } @@ -1656,7 +1655,7 @@ func (env *testWorkflowEnvironmentImpl) RequestCancelChildWorkflow(_, workflowID } } -func (env *testWorkflowEnvironmentImpl) RequestCancelExternalWorkflow(domainName, workflowID, runID string, callback resultHandler) { +func (env *testWorkflowEnvironmentImpl) RequestCancelExternalWorkflow(namespace, workflowID, runID string, callback resultHandler) { if env.workflowInfo.WorkflowExecution.ID == workflowID { // cancel current workflow env.workflowCancelHandler() @@ -1685,7 +1684,7 @@ func (env *testWorkflowEnvironmentImpl) RequestCancelExternalWorkflow(domainName // configured to delay, it will block the main loop which stops the world. env.runningCount++ go func() { - args := []interface{}{domainName, workflowID, runID} + args := []interface{}{namespace, workflowID, runID} // below call will panic if mock is not properly setup. mockRet := env.mock.MethodCalled(mockMethodForRequestCancelExternalWorkflow, args...) m := &mockWrapper{name: mockMethodForRequestCancelExternalWorkflow, fn: mockFnRequestCancelExternalWorkflow} @@ -1708,7 +1707,7 @@ func (env *testWorkflowEnvironmentImpl) IsReplaying() bool { return false } -func (env *testWorkflowEnvironmentImpl) SignalExternalWorkflow(domainName, workflowID, runID, signalName string, input []byte, arg interface{}, childWorkflowOnly bool, callback resultHandler) { +func (env *testWorkflowEnvironmentImpl) SignalExternalWorkflow(namespace, workflowID, runID, signalName string, input []byte, arg interface{}, childWorkflowOnly bool, callback resultHandler) { // check if target workflow is a known workflow if childHandle, ok := env.runningWorkflows[workflowID]; ok { // target workflow is a child @@ -1737,7 +1736,7 @@ func (env *testWorkflowEnvironmentImpl) SignalExternalWorkflow(domainName, workf // configured to delay, it will block the main loop which stops the world. env.runningCount++ go func() { - args := []interface{}{domainName, workflowID, runID, signalName, arg} + args := []interface{}{namespace, workflowID, runID, signalName, arg} // below call will panic if mock is not properly setup. mockRet := env.mock.MethodCalled(mockMethodForSignalExternalWorkflow, args...) m := &mockWrapper{name: mockMethodForSignalExternalWorkflow, fn: mockFnSignalExternalWorkflow} @@ -1895,7 +1894,7 @@ func (env *testWorkflowEnvironmentImpl) cancelWorkflow(callback resultHandler) { env.postCallback(func() { // RequestCancelWorkflow needs to be run in main thread env.RequestCancelExternalWorkflow( - env.workflowInfo.Domain, + env.workflowInfo.Namespace, env.workflowInfo.WorkflowExecution.ID, env.workflowInfo.WorkflowExecution.RunID, callback, diff --git a/internal/internal_workflow_testsuite_test.go b/internal/internal_workflow_testsuite_test.go index b375a99d0..174fe8ba2 100644 --- a/internal/internal_workflow_testsuite_test.go +++ b/internal/internal_workflow_testsuite_test.go @@ -1780,7 +1780,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_SignalChildWorkflow() { workflowFn := func(ctx Context) error { cwo := ChildWorkflowOptions{ ExecutionStartToCloseTimeout: time.Minute, - Domain: "test-domain", + Namespace: "test-namespace", } ctx = WithChildWorkflowOptions(ctx, cwo) childFuture := ExecuteChildWorkflow(ctx, childWorkflowFn, GetWorkflowInfo(ctx).WorkflowExecution) @@ -1818,8 +1818,8 @@ func (s *WorkflowTestSuiteUnitTest) Test_SignalExternalWorkflow() { signalName := "test-signal-name" signalData := "test-signal-data" workflowFn := func(ctx Context) error { - // set domain to be more specific - ctx = WithWorkflowDomain(ctx, "test-domain") + // set namespace to be more specific + ctx = WithWorkflowNamespace(ctx, "test-namespace") f1 := SignalExternalWorkflow(ctx, "test-workflow-id1", "test-runid1", signalName, signalData) f2 := SignalExternalWorkflow(ctx, "test-workflow-id2", "test-runid2", signalName, signalData) f3 := SignalExternalWorkflow(ctx, "test-workflow-id3", "test-runid3", signalName, signalData) @@ -1862,16 +1862,16 @@ func (s *WorkflowTestSuiteUnitTest) Test_SignalExternalWorkflow() { env.RegisterWorkflow(workflowFn) // signal1 should succeed - env.OnSignalExternalWorkflow("test-domain", "test-workflow-id1", "test-runid1", signalName, signalData).Return(nil).Once() + env.OnSignalExternalWorkflow("test-namespace", "test-workflow-id1", "test-runid1", signalName, signalData).Return(nil).Once() // signal2 should fail - env.OnSignalExternalWorkflow("test-domain", "test-workflow-id2", "test-runid2", signalName, signalData).Return( - func(domainName, workflowID, runID, signalName string, arg interface{}) error { + env.OnSignalExternalWorkflow("test-namespace", "test-workflow-id2", "test-runid2", signalName, signalData).Return( + func(namespace, workflowID, runID, signalName string, arg interface{}) error { return errors.New("unknown external workflow") }).Once() // signal3 should succeed with delay, mock match exactly the parameters - env.OnSignalExternalWorkflow("test-domain", "test-workflow-id3", "test-runid3", signalName, signalData).After(time.Minute).Return(nil).Once() + env.OnSignalExternalWorkflow("test-namespace", "test-workflow-id3", "test-runid3", signalName, signalData).After(time.Minute).Return(nil).Once() env.ExecuteWorkflow(workflowFn) env.AssertExpectations(s.T()) @@ -1897,7 +1897,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_CancelChildWorkflow() { workflowFn := func(ctx Context) error { cwo := ChildWorkflowOptions{ - Domain: "test-domain", + Namespace: "test-namespace", ExecutionStartToCloseTimeout: time.Minute, } @@ -1924,8 +1924,8 @@ func (s *WorkflowTestSuiteUnitTest) Test_CancelChildWorkflow() { func (s *WorkflowTestSuiteUnitTest) Test_CancelExternalWorkflow() { workflowFn := func(ctx Context) error { - // set domain to be more specific - ctx = WithWorkflowDomain(ctx, "test-domain") + // set namespace to be more specific + ctx = WithWorkflowNamespace(ctx, "test-namespace") f1 := RequestCancelExternalWorkflow(ctx, "test-workflow-id1", "test-runid1") f2 := RequestCancelExternalWorkflow(ctx, "test-workflow-id2", "test-runid2") @@ -1948,11 +1948,11 @@ func (s *WorkflowTestSuiteUnitTest) Test_CancelExternalWorkflow() { env.RegisterWorkflow(workflowFn) // cancellation 1 should succeed - env.OnRequestCancelExternalWorkflow("test-domain", "test-workflow-id1", "test-runid1").Return(nil).Once() + env.OnRequestCancelExternalWorkflow("test-namespace", "test-workflow-id1", "test-runid1").Return(nil).Once() // cancellation 2 should fail - env.OnRequestCancelExternalWorkflow("test-domain", "test-workflow-id2", "test-runid2").Return( - func(domainName, workflowID, runID string) error { + env.OnRequestCancelExternalWorkflow("test-namespace", "test-workflow-id2", "test-runid2").Return( + func(namespace, workflowID, runID string) error { return errors.New("unknown external workflow") }).Once() diff --git a/internal/session.go b/internal/session.go index ce997aa52..133615962 100644 --- a/internal/session.go +++ b/internal/session.go @@ -514,7 +514,7 @@ func (env *sessionEnvironmentImpl) AddSessionToken() { func (env *sessionEnvironmentImpl) SignalCreationResponse(ctx context.Context, sessionID string) error { activityEnv := getActivityEnv(ctx) - client := activityEnv.serviceInvoker.GetClient(activityEnv.workflowDomain, ClientOptions{}) + client := activityEnv.serviceInvoker.GetClient(activityEnv.workflowNamespace, ClientOptions{}) return client.SignalWorkflow(ctx, activityEnv.workflowExecution.ID, activityEnv.workflowExecution.RunID, sessionID, env.getCreationResponse()) } diff --git a/internal/testdata/parentWF.json b/internal/testdata/parentWF.json index 57c69eed9..654117d89 100644 --- a/internal/testdata/parentWF.json +++ b/internal/testdata/parentWF.json @@ -69,7 +69,7 @@ "version": -24, "taskId": 50331658, "startChildWorkflowExecutionInitiatedEventAttributes": { - "domain": "samples-domain", + "namespace": "samples-namespace", "workflowId": "child_workflow:0ea65eda-a0db-4a59-bef3-dce48e8484f8", "workflowType": { "name": "testReplayWorkflowFromFile" @@ -92,7 +92,7 @@ "version": -24, "taskId": 50331660, "childWorkflowExecutionStartedEventAttributes": { - "domain": "samples-domain", + "namespace": "samples-namespace", "initiatedEventId": 5, "workflowExecution": { "workflowId": "child_workflow:0ea65eda-a0db-4a59-bef3-dce48e8484f8", @@ -151,7 +151,7 @@ "taskId": 50331671, "signalExternalWorkflowExecutionInitiatedEventAttributes": { "decisionTaskCompletedEventId": 9, - "domain": "samples-domain", + "namespace": "samples-namespace", "workflowExecution": { "workflowId": "child_workflow:0ea65eda-a0db-4a59-bef3-dce48e8484f8", "runId": "" @@ -170,7 +170,7 @@ "taskId": 50331673, "externalWorkflowExecutionSignaledEventAttributes": { "initiatedEventId": 10, - "domain": "2896806f-7a10-47d0-b963-ad80dc5df011", + "namespace": "2896806f-7a10-47d0-b963-ad80dc5df011", "workflowExecution": { "workflowId": "child_workflow:0ea65eda-a0db-4a59-bef3-dce48e8484f8", "runId": "" @@ -225,7 +225,7 @@ "taskId": 50331684, "childWorkflowExecutionCompletedEventAttributes": { "result": "IkNoaWxkIHdvcmtmbG93IGV4ZWN1dGlvbiBjb21wbGV0ZWQgYWZ0ZXIgNSBydW5zIgo=", - "domain": "samples-domain", + "namespace": "samples-namespace", "workflowExecution": { "workflowId": "child_workflow:0ea65eda-a0db-4a59-bef3-dce48e8484f8", "runId": "1c4e88cc-37fa-4cd1-82b5-997c4c746678" diff --git a/internal/worker.go b/internal/worker.go index 7eb2ec536..386150e3e 100644 --- a/internal/worker.go +++ b/internal/worker.go @@ -180,13 +180,13 @@ const ( // to fail the workflow execution. NonDeterministicWorkflowPolicyFailWorkflow - // ReplayDomainName is domainName for replay because startEvent doesn't contain it - ReplayDomainName = "ReplayDomain" + // ReplayNamespace is namespace for replay because startEvent doesn't contain it + ReplayNamespace = "ReplayNamespace" ) -// IsReplayDomain checks if the domainName is from replay -func IsReplayDomain(dn string) bool { - return ReplayDomainName == dn +// IsReplayNamespace checks if the namespace is from replay +func IsReplayNamespace(dn string) bool { + return ReplayNamespace == dn } // NewWorker creates an instance of worker for managing workflow and activity executions. diff --git a/internal/workflow.go b/internal/workflow.go index b19762794..29c20a548 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -35,7 +35,7 @@ import ( ) var ( - errDomainNotSet = errors.New("domain is not set") + errNamespaceNotSet = errors.New("namespace is not set") errWorkflowIDNotSet = errors.New("workflowId is not set") errLocalActivityParamsBadRequest = errors.New("missing local activity parameters through context, check LocalActivityOptions") errActivityParamsBadRequest = errors.New("missing activity parameters through context, check ActivityOptions") @@ -159,9 +159,9 @@ type ( // The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is // subjected to change in the future. ChildWorkflowOptions struct { - // Domain of the child workflow. - // Optional: the current workflow (parent)'s domain will be used if this is not provided. - Domain string + // Namespace of the child workflow. + // Optional: the current workflow (parent)'s namespace will be used if this is not provided. + Namespace string // WorkflowID of the child workflow to be scheduled. // Optional: an auto generated workflowID will be used if this is not provided. @@ -678,7 +678,7 @@ func (wc *workflowEnvironmentInterceptor) ExecuteChildWorkflow(ctx Context, chil cancellationCallback.fn = func(v interface{}, more bool) bool { if ctx.Err() == ErrCanceled && childWorkflowExecution != nil && !mainFuture.IsReady() { // child workflow started, and ctx cancelled - getWorkflowEnvironment(ctx).RequestCancelChildWorkflow(options.domain, childWorkflowExecution.ID) + getWorkflowEnvironment(ctx).RequestCancelChildWorkflow(options.namespace, childWorkflowExecution.ID) } return false } @@ -709,12 +709,12 @@ type WorkflowInfo struct { TaskListName string ExecutionStartToCloseTimeoutSeconds int32 TaskStartToCloseTimeoutSeconds int32 - Domain string + Namespace string Attempt int32 // Attempt starts from 0 and increased by 1 for every retry if retry policy is specified. lastCompletionResult []byte CronSchedule string ContinuedExecutionRunID string - ParentWorkflowDomain string + ParentWorkflowNamespace string ParentWorkflowExecution *WorkflowExecution Memo *commonproto.Memo // Value can be decoded using data converter (DefaultDataConverter, or custom one if set). SearchAttributes *commonproto.SearchAttributes // Value can be decoded using DefaultDataConverter. @@ -828,9 +828,9 @@ func (wc *workflowEnvironmentInterceptor) Sleep(ctx Context, d time.Duration) (e // Input workflowID is the workflow ID of target workflow. // Input runID indicates the instance of a workflow. Input runID is optional (default is ""). When runID is not specified, // then the currently running instance of that workflowID will be used. -// By default, the current workflow's domain will be used as target domain. However, you can specify a different domain +// By default, the current workflow's namespace will be used as target namespace. However, you can specify a different namespace // of the target workflow using the context like: -// ctx := WithWorkflowDomain(ctx, "domain-name") +// ctx := WithWorkflowNamespace(ctx, "namespace") // RequestCancelExternalWorkflow return Future with failure or empty success result. func RequestCancelExternalWorkflow(ctx Context, workflowID, runID string) Future { i := getWorkflowInterceptor(ctx) @@ -842,8 +842,8 @@ func (wc *workflowEnvironmentInterceptor) RequestCancelExternalWorkflow(ctx Cont options := getWorkflowEnvOptions(ctx1) future, settable := NewFuture(ctx1) - if options.domain == "" { - settable.Set(nil, errDomainNotSet) + if options.namespace == "" { + settable.Set(nil, errNamespaceNotSet) return future } @@ -857,7 +857,7 @@ func (wc *workflowEnvironmentInterceptor) RequestCancelExternalWorkflow(ctx Cont } wc.env.RequestCancelExternalWorkflow( - options.domain, + options.namespace, workflowID, runID, resultCallback, @@ -870,9 +870,9 @@ func (wc *workflowEnvironmentInterceptor) RequestCancelExternalWorkflow(ctx Cont // Input workflowID is the workflow ID of target workflow. // Input runID indicates the instance of a workflow. Input runID is optional (default is ""). When runID is not specified, // then the currently running instance of that workflowID will be used. -// By default, the current workflow's domain will be used as target domain. However, you can specify a different domain +// By default, the current workflow's namespace will be used as target namespace. However, you can specify a different namespace // of the target workflow using the context like: -// ctx := WithWorkflowDomain(ctx, "domain-name") +// ctx := WithWorkflowNamespace(ctx, "namespace") // SignalExternalWorkflow return Future with failure or empty success result. func SignalExternalWorkflow(ctx Context, workflowID, runID, signalName string, arg interface{}) Future { i := getWorkflowInterceptor(ctx) @@ -890,8 +890,8 @@ func signalExternalWorkflow(ctx Context, workflowID, runID, signalName string, a options := getWorkflowEnvOptions(ctx1) future, settable := NewFuture(ctx1) - if options.domain == "" { - settable.Set(nil, errDomainNotSet) + if options.namespace == "" { + settable.Set(nil, errNamespaceNotSet) return future } @@ -910,7 +910,7 @@ func signalExternalWorkflow(ctx Context, workflowID, runID, signalName string, a settable.Set(result, err) } env.SignalExternalWorkflow( - options.domain, + options.namespace, workflowID, runID, signalName, @@ -967,7 +967,7 @@ func (wc *workflowEnvironmentInterceptor) UpsertSearchAttributes(ctx Context, at func WithChildWorkflowOptions(ctx Context, cwo ChildWorkflowOptions) Context { ctx1 := setWorkflowEnvOptionsIfNotExist(ctx) wfOptions := getWorkflowEnvOptions(ctx1) - wfOptions.domain = cwo.Domain + wfOptions.namespace = cwo.Namespace wfOptions.taskListName = cwo.TaskList wfOptions.workflowID = cwo.WorkflowID wfOptions.executionStartToCloseTimeoutSeconds = common.Int32Ceil(cwo.ExecutionStartToCloseTimeout.Seconds()) @@ -983,10 +983,10 @@ func WithChildWorkflowOptions(ctx Context, cwo ChildWorkflowOptions) Context { return ctx1 } -// WithWorkflowDomain adds a domain to the context. -func WithWorkflowDomain(ctx Context, name string) Context { +// WithWorkflowNamespace adds a namespace to the context. +func WithWorkflowNamespace(ctx Context, name string) Context { ctx1 := setWorkflowEnvOptionsIfNotExist(ctx) - getWorkflowEnvOptions(ctx1).domain = name + getWorkflowEnvOptions(ctx1).namespace = name return ctx1 } diff --git a/internal/workflow_testsuite.go b/internal/workflow_testsuite.go index bbcd3bd5e..086093f38 100644 --- a/internal/workflow_testsuite.go +++ b/internal/workflow_testsuite.go @@ -349,19 +349,19 @@ const mockMethodForUpsertSearchAttributes = "workflow.UpsertSearchAttributes" // Some examples of how to setup mock: // // * mock for specific target workflow that matches specific signal name and signal data -// env.OnSignalExternalWorkflow("test-domain", "test-workflow-id1", "test-runid1", "test-signal", "test-data").Return(nil).Once() +// env.OnSignalExternalWorkflow("test-namespace", "test-workflow-id1", "test-runid1", "test-signal", "test-data").Return(nil).Once() // * mock for anything and succeed the send // env.OnSignalExternalWorkflow(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() // * mock for anything and fail the send // env.OnSignalExternalWorkflow(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(errors.New("unknown external workflow")).Once() // * mock function for SignalExternalWorkflow // env.OnSignalExternalWorkflow(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return( -// func(domainName, workflowID, runID, signalName string, arg interface{}) error { +// func(namespace, workflowID, runID, signalName string, arg interface{}) error { // // you can do differently based on the parameters // return nil // }) -func (t *TestWorkflowEnvironment) OnSignalExternalWorkflow(domainName, workflowID, runID, signalName, arg interface{}) *MockCallWrapper { - call := t.Mock.On(mockMethodForSignalExternalWorkflow, domainName, workflowID, runID, signalName, arg) +func (t *TestWorkflowEnvironment) OnSignalExternalWorkflow(namespace, workflowID, runID, signalName, arg interface{}) *MockCallWrapper { + call := t.Mock.On(mockMethodForSignalExternalWorkflow, namespace, workflowID, runID, signalName, arg) return t.wrapCall(call) } @@ -373,19 +373,19 @@ func (t *TestWorkflowEnvironment) OnSignalExternalWorkflow(domainName, workflowI // Some examples of how to setup mock: // // * mock for specific target workflow that matches specific workflow ID and run ID -// env.OnRequestCancelExternalWorkflow("test-domain", "test-workflow-id1", "test-runid1").Return(nil).Once() +// env.OnRequestCancelExternalWorkflow("test-namespace", "test-workflow-id1", "test-runid1").Return(nil).Once() // * mock for anything and succeed the cancellation // env.OnRequestCancelExternalWorkflow(mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() // * mock for anything and fail the cancellation // env.OnRequestCancelExternalWorkflow(mock.Anything, mock.Anything, mock.Anything).Return(errors.New("unknown external workflow")).Once() // * mock function for RequestCancelExternalWorkflow // env.OnRequestCancelExternalWorkflow(mock.Anything, mock.Anything, mock.Anything).Return( -// func(domainName, workflowID, runID) error { +// func(namespace, workflowID, runID) error { // // you can do differently based on the parameters // return nil // }) -func (t *TestWorkflowEnvironment) OnRequestCancelExternalWorkflow(domainName, workflowID, runID string) *MockCallWrapper { - call := t.Mock.On(mockMethodForRequestCancelExternalWorkflow, domainName, workflowID, runID) +func (t *TestWorkflowEnvironment) OnRequestCancelExternalWorkflow(namespace, workflowID, runID string) *MockCallWrapper { + call := t.Mock.On(mockMethodForRequestCancelExternalWorkflow, namespace, workflowID, runID) return t.wrapCall(call) } diff --git a/mocks/Client.go b/mocks/Client.go index eebac4aec..163939887 100644 --- a/mocks/Client.go +++ b/mocks/Client.go @@ -70,13 +70,13 @@ func (_m *Client) CompleteActivity(ctx context.Context, taskToken []byte, result return r0 } -// CompleteActivityByID provides a mock function with given fields: ctx, domain, workflowID, runID, activityID, result, err -func (_m *Client) CompleteActivityByID(ctx context.Context, domain string, workflowID string, runID string, activityID string, result interface{}, err error) error { - ret := _m.Called(ctx, domain, workflowID, runID, activityID, result, err) +// CompleteActivityByID provides a mock function with given fields: ctx, namespace, workflowID, runID, activityID, result, err +func (_m *Client) CompleteActivityByID(ctx context.Context, namespace string, workflowID string, runID string, activityID string, result interface{}, err error) error { + ret := _m.Called(ctx, namespace, workflowID, runID, activityID, result, err) var r0 error if rf, ok := ret.Get(0).(func(context.Context, string, string, string, string, interface{}, error) error); ok { - r0 = rf(ctx, domain, workflowID, runID, activityID, result, err) + r0 = rf(ctx, namespace, workflowID, runID, activityID, result, err) } else { r0 = ret.Error(0) } @@ -394,16 +394,16 @@ func (_m *Client) RecordActivityHeartbeat(ctx context.Context, taskToken []byte, return r0 } -// RecordActivityHeartbeatByID provides a mock function with given fields: ctx, domain, workflowID, runID, activityID, details -func (_m *Client) RecordActivityHeartbeatByID(ctx context.Context, domain string, workflowID string, runID string, activityID string, details ...interface{}) error { +// RecordActivityHeartbeatByID provides a mock function with given fields: ctx, namespace, workflowID, runID, activityID, details +func (_m *Client) RecordActivityHeartbeatByID(ctx context.Context, namespace string, workflowID string, runID string, activityID string, details ...interface{}) error { var _ca []interface{} - _ca = append(_ca, ctx, domain, workflowID, runID, activityID) + _ca = append(_ca, ctx, namespace, workflowID, runID, activityID) _ca = append(_ca, details...) ret := _m.Called(_ca...) var r0 error if rf, ok := ret.Get(0).(func(context.Context, string, string, string, string, ...interface{}) error); ok { - r0 = rf(ctx, domain, workflowID, runID, activityID, details...) + r0 = rf(ctx, namespace, workflowID, runID, activityID, details...) } else { r0 = ret.Error(0) } diff --git a/mocks/DomainClient.go b/mocks/NamespaceClient.go similarity index 76% rename from mocks/DomainClient.go rename to mocks/NamespaceClient.go index f2ea88e50..e9e5e2f0d 100644 --- a/mocks/DomainClient.go +++ b/mocks/NamespaceClient.go @@ -29,21 +29,21 @@ import ( "go.temporal.io/temporal-proto/workflowservice" ) -// DomainClient is an autogenerated mock type for the DomainClient type -type DomainClient struct { +// NamespaceClient is an autogenerated mock type for the NamespaceClient type +type NamespaceClient struct { mock.Mock } // Describe provides a mock function with given fields: ctx, name -func (_m *DomainClient) Describe(ctx context.Context, name string) (*workflowservice.DescribeDomainResponse, error) { +func (_m *NamespaceClient) Describe(ctx context.Context, name string) (*workflowservice.DescribeNamespaceResponse, error) { ret := _m.Called(ctx, name) - var r0 *workflowservice.DescribeDomainResponse - if rf, ok := ret.Get(0).(func(context.Context, string) *workflowservice.DescribeDomainResponse); ok { + var r0 *workflowservice.DescribeNamespaceResponse + if rf, ok := ret.Get(0).(func(context.Context, string) *workflowservice.DescribeNamespaceResponse); ok { r0 = rf(ctx, name) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(*workflowservice.DescribeDomainResponse) + r0 = ret.Get(0).(*workflowservice.DescribeNamespaceResponse) } } @@ -58,11 +58,11 @@ func (_m *DomainClient) Describe(ctx context.Context, name string) (*workflowser } // Register provides a mock function with given fields: ctx, request -func (_m *DomainClient) Register(ctx context.Context, request *workflowservice.RegisterDomainRequest) error { +func (_m *NamespaceClient) Register(ctx context.Context, request *workflowservice.RegisterNamespaceRequest) error { ret := _m.Called(ctx, request) var r0 error - if rf, ok := ret.Get(0).(func(context.Context, *workflowservice.RegisterDomainRequest) error); ok { + if rf, ok := ret.Get(0).(func(context.Context, *workflowservice.RegisterNamespaceRequest) error); ok { r0 = rf(ctx, request) } else { r0 = ret.Error(0) @@ -72,11 +72,11 @@ func (_m *DomainClient) Register(ctx context.Context, request *workflowservice.R } // Update provides a mock function with given fields: ctx, request -func (_m *DomainClient) Update(ctx context.Context, request *workflowservice.UpdateDomainRequest) error { +func (_m *NamespaceClient) Update(ctx context.Context, request *workflowservice.UpdateNamespaceRequest) error { ret := _m.Called(ctx) var r0 error - if rf, ok := ret.Get(0).(func(context.Context, *workflowservice.UpdateDomainRequest) error); ok { + if rf, ok := ret.Get(0).(func(context.Context, *workflowservice.UpdateNamespaceRequest) error); ok { r0 = rf(ctx, request) } else { r0 = ret.Error(0) @@ -86,7 +86,7 @@ func (_m *DomainClient) Update(ctx context.Context, request *workflowservice.Upd } // CloseConnection provides a mock function without given fields -func (_m *DomainClient) CloseConnection() error { +func (_m *NamespaceClient) CloseConnection() error { ret := _m.Called() var r0 error diff --git a/mocks/verify.go b/mocks/verify.go index 1133502f4..f3a24bfdd 100644 --- a/mocks/verify.go +++ b/mocks/verify.go @@ -26,4 +26,4 @@ import ( // make sure mocks are in sync with interfaces var _ client.Client = (*Client)(nil) -var _ client.DomainClient = (*DomainClient)(nil) +var _ client.NamespaceClient = (*NamespaceClient)(nil) diff --git a/test/activity_test.go b/test/activity_test.go index 2b5ede52d..357b48901 100644 --- a/test/activity_test.go +++ b/test/activity_test.go @@ -86,11 +86,11 @@ func (a *Activities) fail(_ context.Context) error { return errFailOnPurpose } -func (a *Activities) InspectActivityInfo(ctx context.Context, domain, taskList, wfType string) error { +func (a *Activities) InspectActivityInfo(ctx context.Context, namespace, taskList, wfType string) error { a.append("inspectActivityInfo") info := activity.GetInfo(ctx) - if info.WorkflowDomain != domain { - return fmt.Errorf("expected domainName %v but got %v", domain, info.WorkflowDomain) + if info.WorkflowNamespace != namespace { + return fmt.Errorf("expected namespace %v but got %v", namespace, info.WorkflowNamespace) } if info.WorkflowType == nil || info.WorkflowType.Name != wfType { return fmt.Errorf("expected workflowType %v but got %v", wfType, info.WorkflowType) diff --git a/test/integration_test.go b/test/integration_test.go index 9792ed429..0c0ed4638 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -59,9 +59,9 @@ type IntegrationTestSuite struct { } const ( - ctxTimeout = 15 * time.Second - domainName = "integration-test-domain" - domainCacheRefreshInterval = 20 * time.Second + ctxTimeout = 15 * time.Second + namespace = "integration-test-namespace" + namespaceCacheRefreshInterval = 20 * time.Second ) func TestIntegrationSuite(t *testing.T) { @@ -96,9 +96,9 @@ func (ts *IntegrationTestSuite) SetupSuite() { ts.workflows = &Workflows{} ts.NoError(waitForTCP(time.Minute, ts.config.ServiceAddr)) var err error - ts.client, err = client.NewClient(client.Options{HostPort: ts.config.ServiceAddr, DomainName: domainName}) + ts.client, err = client.NewClient(client.Options{HostPort: ts.config.ServiceAddr, Namespace: namespace}) ts.NoError(err) - ts.registerDomain() + ts.registerNamespace() } func (ts *IntegrationTestSuite) TearDownSuite() { @@ -441,31 +441,31 @@ func (ts *IntegrationTestSuite) TestInspectLocalActivityInfo() { ts.Nil(err) } -func (ts *IntegrationTestSuite) registerDomain() { - client, err := client.NewDomainClient(client.Options{HostPort: ts.config.ServiceAddr}) +func (ts *IntegrationTestSuite) registerNamespace() { + client, err := client.NewNamespaceClient(client.Options{HostPort: ts.config.ServiceAddr}) ts.NoError(err) ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout) defer cancel() - name := domainName + name := namespace retention := int32(1) - err = client.Register(ctx, &workflowservice.RegisterDomainRequest{ + err = client.Register(ctx, &workflowservice.RegisterNamespaceRequest{ Name: name, WorkflowExecutionRetentionPeriodInDays: retention, }) _ = client.CloseConnection() - if _, ok := err.(*serviceerror.DomainAlreadyExists); ok { + if _, ok := err.(*serviceerror.NamespaceAlreadyExists); ok { return } ts.NoError(err) - time.Sleep(domainCacheRefreshInterval) // wait for domain cache refresh on temporal-server - // bellow is used to guarantee domain is ready + time.Sleep(namespaceCacheRefreshInterval) // wait for namespace cache refresh on temporal-server + // bellow is used to guarantee namespace is ready var dummyReturn string - err = ts.executeWorkflow("test-domain-exist", ts.workflows.SimplestWorkflow, &dummyReturn) + err = ts.executeWorkflow("test-namespace-exist", ts.workflows.SimplestWorkflow, &dummyReturn) numOfRetry := 20 for err != nil && numOfRetry >= 0 { if _, ok := err.(*serviceerror.NotFound); ok { - time.Sleep(domainCacheRefreshInterval) - err = ts.executeWorkflow("test-domain-exist", ts.workflows.SimplestWorkflow, &dummyReturn) + time.Sleep(namespaceCacheRefreshInterval) + err = ts.executeWorkflow("test-namespace-exist", ts.workflows.SimplestWorkflow, &dummyReturn) } else { break } diff --git a/test/workflow_test.go b/test/workflow_test.go index 966245f18..2f00cce41 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -489,22 +489,22 @@ func (w *Workflows) sleep(ctx workflow.Context, d time.Duration) error { func (w *Workflows) InspectActivityInfo(ctx workflow.Context) error { info := workflow.GetInfo(ctx) - domain := info.Domain + namespace := info.Namespace wfType := info.WorkflowType.Name taskList := info.TaskListName ctx = workflow.WithActivityOptions(ctx, w.defaultActivityOptions()) - return workflow.ExecuteActivity(ctx, "inspectActivityInfo", domain, taskList, wfType).Get(ctx, nil) + return workflow.ExecuteActivity(ctx, "inspectActivityInfo", namespace, taskList, wfType).Get(ctx, nil) } func (w *Workflows) InspectLocalActivityInfo(ctx workflow.Context) error { info := workflow.GetInfo(ctx) - domain := info.Domain + namespace := info.Namespace wfType := info.WorkflowType.Name taskList := info.TaskListName ctx = workflow.WithLocalActivityOptions(ctx, w.defaultLocalActivityOptions()) activites := Activities{} return workflow.ExecuteLocalActivity( - ctx, activites.InspectActivityInfo, domain, taskList, wfType).Get(ctx, nil) + ctx, activites.InspectActivityInfo, namespace, taskList, wfType).Get(ctx, nil) } func (w *Workflows) register(worker worker.Worker) { diff --git a/worker/worker.go b/worker/worker.go index 8e58b1d48..b30eef8f7 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -155,7 +155,7 @@ type ( // ReplayWorkflowExecution loads a workflow execution history from the Temporal service and executes a single decision task for it. // Use for testing the backwards compatibility of code changes and troubleshooting workflows in a debugger. // The logger is the only optional parameter. Defaults to the noop logger. - ReplayWorkflowExecution(ctx context.Context, service workflowservice.WorkflowServiceClient, logger *zap.Logger, domain string, execution workflow.Execution) error + ReplayWorkflowExecution(ctx context.Context, service workflowservice.WorkflowServiceClient, logger *zap.Logger, namespace string, execution workflow.Execution) error } // Options is used to configure a worker instance. @@ -182,7 +182,7 @@ const ( ) // New creates an instance of worker for managing workflow and activity executions. -// domain - the name of the temporal domain +// namespace - the name of the temporal namespace // taskList - is the task list name you use to identify your client worker, also // identifies group of workflow and activity implementations that are // hosted by a single worker process diff --git a/workflow/doc.go b/workflow/doc.go index 6b34fc19c..626fc1a35 100644 --- a/workflow/doc.go +++ b/workflow/doc.go @@ -396,7 +396,7 @@ Query API A workflow execution could be stuck at some state for longer than expected period. Temporal provide facilities to query the current call stack of a workflow execution. You can use tctl to do the query, for example: - tctl --domain samples-domain workflow query -w my_workflow_id -r my_run_id -qt __stack_trace + tctl --namespace samples-namespace workflow query -w my_workflow_id -r my_run_id -qt __stack_trace The above cli command uses __stack_trace as the query type. The __stack_trace is a built-in query type that is supported by temporal client library. You can also add your own custom query types to support thing like query current @@ -432,7 +432,7 @@ query handler using workflow.SetQueryHandler in your workflow code: The above sample code sets up a query handler to handle query type "state". With that, you should be able to query with cli: - tctl --domain samples-domain workflow query -w my_workflow_id -r my_run_id -qt state + tctl --namespace samples-namespace workflow query -w my_workflow_id -r my_run_id -qt state Besides using tctl, you can also issue query from code using QueryWorkflow() API on temporal Client object. diff --git a/workflow/workflow.go b/workflow/workflow.go index 1d8dec146..46e9eae48 100644 --- a/workflow/workflow.go +++ b/workflow/workflow.go @@ -159,9 +159,9 @@ func GetMetricsScope(ctx Context) tally.Scope { // Input workflowID is the workflow ID of target workflow. // Input runID indicates the instance of a workflow. Input runID is optional (default is ""). When runID is not specified, // then the currently running instance of that workflowID will be used. -// By default, the current workflow's domain will be used as target domain. However, you can specify a different domain +// By default, the current workflow's namespace will be used as target namespace. However, you can specify a different namespace // of the target workflow using the context like: -// ctx := WithWorkflowDomain(ctx, "domain-name") +// ctx := WithWorkflowNamespace(ctx, "namespace") // RequestCancelExternalWorkflow return Future with failure or empty success result. func RequestCancelExternalWorkflow(ctx Context, workflowID, runID string) Future { return internal.RequestCancelExternalWorkflow(ctx, workflowID, runID) @@ -171,9 +171,9 @@ func RequestCancelExternalWorkflow(ctx Context, workflowID, runID string) Future // Input workflowID is the workflow ID of target workflow. // Input runID indicates the instance of a workflow. Input runID is optional (default is ""). When runID is not specified, // then the currently running instance of that workflowID will be used. -// By default, the current workflow's domain will be used as target domain. However, you can specify a different domain +// By default, the current workflow's namespace will be used as target namespace. However, you can specify a different namespace // of the target workflow using the context like: -// ctx := WithWorkflowDomain(ctx, "domain-name") +// ctx := WithWorkflowNamespace(ctx, "namespace") // SignalExternalWorkflow return Future with failure or empty success result. func SignalExternalWorkflow(ctx Context, workflowID, runID, signalName string, arg interface{}) Future { return internal.SignalExternalWorkflow(ctx, workflowID, runID, signalName, arg) diff --git a/workflow/workflow_options.go b/workflow/workflow_options.go index 5f55234db..cb233f614 100644 --- a/workflow/workflow_options.go +++ b/workflow/workflow_options.go @@ -32,9 +32,9 @@ func WithChildOptions(ctx Context, cwo ChildWorkflowOptions) Context { return internal.WithChildWorkflowOptions(ctx, cwo) } -// WithWorkflowDomain adds a domain to the context. -func WithWorkflowDomain(ctx Context, name string) Context { - return internal.WithWorkflowDomain(ctx, name) +// WithWorkflowNamespace adds a namespace to the context. +func WithWorkflowNamespace(ctx Context, name string) Context { + return internal.WithWorkflowNamespace(ctx, name) } // WithWorkflowTaskList adds a task list to the context.