Skip to content

Commit

Permalink
Merge from cadence at 8bcf4b1
Browse files Browse the repository at this point in the history
Merge from cadence at 8bcf4b1 (03/23/2020)
  • Loading branch information
alexshtin authored Mar 23, 2020
2 parents c8be5c5 + 69f88d2 commit b46ce50
Show file tree
Hide file tree
Showing 15 changed files with 279 additions and 53 deletions.
8 changes: 8 additions & 0 deletions error.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
package temporal

import (
"go.temporal.io/temporal-proto/serviceerror"

"go.temporal.io/temporal/internal"
"go.temporal.io/temporal/workflow"
)
Expand Down Expand Up @@ -54,6 +56,12 @@ func IsCustomError(err error) bool {
return ok
}

// IsWorkflowExecutionAlreadyStartedError return if the err is a WorkflowExecutionAlreadyStartedError
func IsWorkflowExecutionAlreadyStartedError(err error) bool {
_, ok := err.(*serviceerror.WorkflowExecutionAlreadyStarted)
return ok
}

// IsCanceledError return if the err is a CanceledError
func IsCanceledError(err error) bool {
_, ok := err.(*CanceledError)
Expand Down
1 change: 1 addition & 0 deletions idls
Submodule idls added at 56ca0b
2 changes: 1 addition & 1 deletion internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1808,7 +1808,7 @@ func (ath *activityTaskHandlerImpl) Execute(taskList string, t *workflowservice.
if <-ctx.Done(); ctx.Err() == context.DeadlineExceeded {
return nil, ctx.Err()
}
if err != nil {
if err != nil && err != ErrActivityResultPending {
ath.logger.Error("Activity error.",
zap.String(tagWorkflowID, t.WorkflowExecution.GetWorkflowId()),
zap.String(tagRunID, t.WorkflowExecution.GetRunId()),
Expand Down
6 changes: 6 additions & 0 deletions internal/internal_task_pollers.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,12 @@ func (lath *localActivityTaskHandler) executeLocalActivityTask(task *localActivi
rootCtx = context.Background()
}

workflowTypeLocal := task.params.WorkflowInfo.WorkflowType

ctx := context.WithValue(rootCtx, activityEnvContextKey, &activityEnvironment{
workflowType: &workflowTypeLocal,
workflowDomain: task.params.WorkflowInfo.Domain,
taskList: task.params.WorkflowInfo.TaskListName,
activityType: ActivityType{Name: activityType},
activityID: fmt.Sprintf("%v", task.activityID),
workflowExecution: task.params.WorkflowInfo.WorkflowExecution,
Expand Down Expand Up @@ -501,6 +506,7 @@ func (lath *localActivityTaskHandler) executeLocalActivityTask(task *localActivi
// this is attempt and expire time is before SCHEDULE_TO_CLOSE timeout
deadline = task.expireTime
}

ctx, cancel := context.WithDeadline(ctx, deadline)
task.Lock()
if task.canceled {
Expand Down
37 changes: 25 additions & 12 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"go.uber.org/zap/zapcore"

"go.temporal.io/temporal/internal/common/backoff"
"go.temporal.io/temporal/internal/common/metrics"
)

const (
Expand Down Expand Up @@ -125,21 +126,24 @@ type (
// Task list name to poll.
TaskList string

// Defines how many concurrent poll requests for the task list by this worker.
ConcurrentPollRoutineSize int

// Defines how many concurrent activity executions by this worker.
ConcurrentActivityExecutionSize int

// Defines rate limiting on number of activity tasks that can be executed per second per worker.
WorkerActivitiesPerSecond float64

// MaxConcurrentActivityPollers is the max number of pollers for activity task list
MaxConcurrentActivityPollers int

// Defines how many concurrent decision task executions by this worker.
ConcurrentDecisionTaskExecutionSize int

// Defines rate limiting on number of decision tasks that can be executed per second per worker.
WorkerDecisionTasksPerSecond float64

// MaxConcurrentDecisionPollers is the max number of pollers for decision task list
MaxConcurrentDecisionPollers int

// Defines how many concurrent local activity executions by this worker.
ConcurrentLocalActivityExecutionSize int

Expand Down Expand Up @@ -271,7 +275,7 @@ func newWorkflowTaskWorkerInternal(taskHandler WorkflowTaskHandler, service work
ensureRequiredParams(&params)
poller := newWorkflowTaskPoller(taskHandler, service, params)
worker := newBaseWorker(baseWorkerOptions{
pollerCount: params.ConcurrentPollRoutineSize,
pollerCount: params.MaxConcurrentDecisionPollers,
pollerRate: defaultPollerRate,
maxConcurrentTask: params.ConcurrentDecisionTaskExecutionSize,
maxTaskPerSecond: params.WorkerDecisionTasksPerSecond,
Expand Down Expand Up @@ -365,7 +369,7 @@ func newSessionWorker(service workflowservice.WorkflowServiceClient, params work
params.TaskList = sessionEnvironment.GetResourceSpecificTasklist()
activityWorker := newActivityWorker(service, params, overrides, env, nil)

params.ConcurrentPollRoutineSize = 1
params.MaxConcurrentActivityPollers = 1
params.TaskList = creationTasklist
creationWorker := newActivityWorker(service, params, overrides, env, sessionEnvironment.GetTokenBucket())

Expand Down Expand Up @@ -424,7 +428,7 @@ func newActivityTaskWorker(taskHandler ActivityTaskHandler, service workflowserv

base := newBaseWorker(
baseWorkerOptions{
pollerCount: workerParams.ConcurrentPollRoutineSize,
pollerCount: workerParams.MaxConcurrentActivityPollers,
pollerRate: defaultPollerRate,
maxConcurrentTask: workerParams.ConcurrentActivityExecutionSize,
maxTaskPerSecond: workerParams.WorkerActivitiesPerSecond,
Expand Down Expand Up @@ -1316,9 +1320,7 @@ func extractHistoryFromFile(jsonfileName string, lastEventID int64) (*commonprot
return history, nil
}

// NewAggregatedWorker returns an instance to manage the workers. Use defaultConcurrentPollRoutineSize (which is 2) as
// poller size. The typical RTT (round-trip time) is below 1ms within data center. And the poll API latency is about 5ms.
// With 2 poller, we could achieve around 300~400 RPS.
// NewAggregatedWorker returns an instance to manage both activity and decision workers
func NewAggregatedWorker(client *WorkflowClient, taskList string, options WorkerOptions) *AggregatedWorker {
setClientDefaults(client)
setWorkerOptionsDefaults(&options)
Expand All @@ -1331,13 +1333,14 @@ func NewAggregatedWorker(client *WorkflowClient, taskList string, options Worker
workerParams := workerExecutionParameters{
DomainName: client.domain,
TaskList: taskList,
ConcurrentPollRoutineSize: defaultConcurrentPollRoutineSize,
ConcurrentActivityExecutionSize: options.MaxConcurrentActivityExecutionSize,
WorkerActivitiesPerSecond: options.WorkerActivitiesPerSecond,
MaxConcurrentActivityPollers: options.MaxConcurrentActivityTaskPollers,
ConcurrentLocalActivityExecutionSize: options.MaxConcurrentLocalActivityExecutionSize,
WorkerLocalActivitiesPerSecond: options.WorkerLocalActivitiesPerSecond,
ConcurrentDecisionTaskExecutionSize: options.MaxConcurrentDecisionTaskExecutionSize,
WorkerDecisionTasksPerSecond: options.WorkerDecisionTasksPerSecond,
MaxConcurrentDecisionPollers: options.MaxConcurrentDecisionTaskPollers,
Identity: client.identity,
MetricsScope: client.metricsScope,
Logger: options.Logger,
Expand Down Expand Up @@ -1431,7 +1434,8 @@ func processTestTags(wOptions *WorkerOptions, ep *workerExecutionParameters) {
switch key {
case workerOptionsConfigConcurrentPollRoutineSize:
if size, err := strconv.Atoi(val); err == nil {
ep.ConcurrentPollRoutineSize = size
ep.MaxConcurrentActivityPollers = size
ep.MaxConcurrentDecisionPollers = size
}
}
}
Expand Down Expand Up @@ -1499,12 +1503,18 @@ func setWorkerOptionsDefaults(options *WorkerOptions) {
if options.WorkerActivitiesPerSecond == 0 {
options.WorkerActivitiesPerSecond = defaultWorkerActivitiesPerSecond
}
if options.MaxConcurrentActivityTaskPollers <= 0 {
options.MaxConcurrentActivityTaskPollers = defaultConcurrentPollRoutineSize
}
if options.MaxConcurrentDecisionTaskExecutionSize == 0 {
options.MaxConcurrentDecisionTaskExecutionSize = defaultMaxConcurrentTaskExecutionSize
}
if options.WorkerDecisionTasksPerSecond == 0 {
options.WorkerDecisionTasksPerSecond = defaultWorkerTaskExecutionRate
}
if options.MaxConcurrentDecisionTaskPollers <= 0 {
options.MaxConcurrentDecisionTaskPollers = defaultConcurrentPollRoutineSize
}
if options.MaxConcurrentLocalActivityExecutionSize == 0 {
options.MaxConcurrentLocalActivityExecutionSize = defaultMaxConcurrentLocalActivityExecutionSize
}
Expand All @@ -1522,8 +1532,8 @@ func setWorkerOptionsDefaults(options *WorkerOptions) {
}
}

// setClientDefaults should be needed only in unit tests.
func setClientDefaults(client *WorkflowClient) {
// This should be needed only in unit tests.
if client.dataConverter == nil {
client.dataConverter = getDefaultDataConverter()
}
Expand All @@ -1533,6 +1543,9 @@ func setClientDefaults(client *WorkflowClient) {
if client.tracer == nil {
client.tracer = opentracing.NoopTracer{}
}
if client.metricsScope == nil {
client.metricsScope = metrics.NewTaggedScope(nil)
}
}

// getTestTags returns the test tags in the context.
Expand Down
18 changes: 10 additions & 8 deletions internal/internal_worker_interfaces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,11 @@ func (s *InterfacesTestSuite) TestInterface() {
domain := "testDomain"
// Workflow execution parameters.
workflowExecutionParameters := workerExecutionParameters{
TaskList: "testTaskList",
ConcurrentPollRoutineSize: 4,
Logger: logger,
Tracer: opentracing.NoopTracer{},
TaskList: "testTaskList",
MaxConcurrentActivityPollers: 4,
MaxConcurrentDecisionPollers: 4,
Logger: logger,
Tracer: opentracing.NoopTracer{},
}

domainStatus := enums.DomainStatusRegistered
Expand All @@ -207,10 +208,11 @@ func (s *InterfacesTestSuite) TestInterface() {

// Create activity execution parameters.
activityExecutionParameters := workerExecutionParameters{
TaskList: "testTaskList",
ConcurrentPollRoutineSize: 10,
Logger: logger,
Tracer: opentracing.NoopTracer{},
TaskList: "testTaskList",
MaxConcurrentActivityPollers: 10,
MaxConcurrentDecisionPollers: 10,
Logger: logger,
Tracer: opentracing.NoopTracer{},
}

// Register activity instances and launch the worker.
Expand Down
127 changes: 125 additions & 2 deletions internal/internal_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"time"

"github.com/golang/mock/gomock"
"github.com/opentracing/opentracing-go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
Expand Down Expand Up @@ -1172,14 +1173,136 @@ func TestActivityNilArgs(t *testing.T) {
require.Equal(t, nilErr, reflectResults[0].Interface())
}

func TestWorkerOptionDefaults(t *testing.T) {
client := &WorkflowClient{}
taskList := "worker-options-tl"
aggWorker := NewAggregatedWorker(client, taskList, WorkerOptions{})

decisionWorker := aggWorker.workflowWorker
require.True(t, decisionWorker.executionParameters.Identity != "")
require.NotNil(t, decisionWorker.executionParameters.Logger)
require.NotNil(t, decisionWorker.executionParameters.MetricsScope)
require.Nil(t, decisionWorker.executionParameters.ContextPropagators)

expected := workerExecutionParameters{
DomainName: DefaultDomainName,
TaskList: taskList,
MaxConcurrentActivityPollers: defaultConcurrentPollRoutineSize,
MaxConcurrentDecisionPollers: defaultConcurrentPollRoutineSize,
ConcurrentLocalActivityExecutionSize: defaultMaxConcurrentLocalActivityExecutionSize,
ConcurrentActivityExecutionSize: defaultMaxConcurrentActivityExecutionSize,
ConcurrentDecisionTaskExecutionSize: defaultMaxConcurrentTaskExecutionSize,
WorkerActivitiesPerSecond: defaultTaskListActivitiesPerSecond,
WorkerDecisionTasksPerSecond: defaultWorkerTaskExecutionRate,
TaskListActivitiesPerSecond: defaultTaskListActivitiesPerSecond,
WorkerLocalActivitiesPerSecond: defaultWorkerLocalActivitiesPerSecond,
StickyScheduleToStartTimeout: stickyDecisionScheduleToStartTimeoutSeconds * time.Second,
DataConverter: getDefaultDataConverter(),
Tracer: opentracing.NoopTracer{},
Logger: decisionWorker.executionParameters.Logger,
MetricsScope: decisionWorker.executionParameters.MetricsScope,
Identity: decisionWorker.executionParameters.Identity,
UserContext: decisionWorker.executionParameters.UserContext,
}

assertWorkerExecutionParamsEqual(t, expected, decisionWorker.executionParameters)

activityWorker := aggWorker.activityWorker
require.True(t, activityWorker.executionParameters.Identity != "")
require.NotNil(t, activityWorker.executionParameters.Logger)
require.NotNil(t, activityWorker.executionParameters.MetricsScope)
require.Nil(t, activityWorker.executionParameters.ContextPropagators)
assertWorkerExecutionParamsEqual(t, expected, activityWorker.executionParameters)
}

func TestWorkerOptionNonDefaults(t *testing.T) {
taskList := "worker-options-tl"

client := &WorkflowClient{
workflowService: nil,
connectionCloser: nil,
domain: "worker-options-test",
registry: nil,
identity: "143@worker-options-test-1",
dataConverter: &defaultDataConverter{},
contextPropagators: nil,
tracer: nil,
}

options := WorkerOptions{
TaskListActivitiesPerSecond: 8888,
MaxConcurrentSessionExecutionSize: 3333,
MaxConcurrentDecisionTaskExecutionSize: 2222,
MaxConcurrentActivityExecutionSize: 1111,
MaxConcurrentLocalActivityExecutionSize: 101,
MaxConcurrentDecisionTaskPollers: 11,
MaxConcurrentActivityTaskPollers: 12,
WorkerLocalActivitiesPerSecond: 222,
WorkerDecisionTasksPerSecond: 111,
WorkerActivitiesPerSecond: 99,
StickyScheduleToStartTimeout: 555 * time.Minute,
BackgroundActivityContext: context.Background(),
Logger: zap.NewNop(),
}

aggWorker := NewAggregatedWorker(client, taskList, options)

decisionWorker := aggWorker.workflowWorker
require.Len(t, decisionWorker.executionParameters.ContextPropagators, 0)

expected := workerExecutionParameters{
TaskList: taskList,
MaxConcurrentActivityPollers: options.MaxConcurrentActivityTaskPollers,
MaxConcurrentDecisionPollers: options.MaxConcurrentDecisionTaskPollers,
ConcurrentLocalActivityExecutionSize: options.MaxConcurrentLocalActivityExecutionSize,
ConcurrentActivityExecutionSize: options.MaxConcurrentActivityExecutionSize,
ConcurrentDecisionTaskExecutionSize: options.MaxConcurrentDecisionTaskExecutionSize,
WorkerActivitiesPerSecond: options.WorkerActivitiesPerSecond,
WorkerDecisionTasksPerSecond: options.WorkerDecisionTasksPerSecond,
TaskListActivitiesPerSecond: options.TaskListActivitiesPerSecond,
WorkerLocalActivitiesPerSecond: options.WorkerLocalActivitiesPerSecond,
StickyScheduleToStartTimeout: options.StickyScheduleToStartTimeout,
DataConverter: client.dataConverter,
Tracer: client.tracer,
Logger: options.Logger,
MetricsScope: client.metricsScope,
Identity: client.identity,
}

assertWorkerExecutionParamsEqual(t, expected, decisionWorker.executionParameters)

activityWorker := aggWorker.activityWorker
require.Len(t, activityWorker.executionParameters.ContextPropagators, 0)
assertWorkerExecutionParamsEqual(t, expected, activityWorker.executionParameters)
}

func assertWorkerExecutionParamsEqual(t *testing.T, paramsA workerExecutionParameters, paramsB workerExecutionParameters) {
require.Equal(t, paramsA.TaskList, paramsA.TaskList)
require.Equal(t, paramsA.Identity, paramsB.Identity)
require.Equal(t, paramsA.DataConverter, paramsB.DataConverter)
require.Equal(t, paramsA.Tracer, paramsB.Tracer)
require.Equal(t, paramsA.ConcurrentLocalActivityExecutionSize, paramsB.ConcurrentLocalActivityExecutionSize)
require.Equal(t, paramsA.ConcurrentActivityExecutionSize, paramsB.ConcurrentActivityExecutionSize)
require.Equal(t, paramsA.ConcurrentDecisionTaskExecutionSize, paramsB.ConcurrentDecisionTaskExecutionSize)
require.Equal(t, paramsA.WorkerActivitiesPerSecond, paramsB.WorkerActivitiesPerSecond)
require.Equal(t, paramsA.WorkerDecisionTasksPerSecond, paramsB.WorkerDecisionTasksPerSecond)
require.Equal(t, paramsA.TaskListActivitiesPerSecond, paramsB.TaskListActivitiesPerSecond)
require.Equal(t, paramsA.StickyScheduleToStartTimeout, paramsB.StickyScheduleToStartTimeout)
require.Equal(t, paramsA.MaxConcurrentDecisionPollers, paramsB.MaxConcurrentDecisionPollers)
require.Equal(t, paramsA.MaxConcurrentActivityPollers, paramsB.MaxConcurrentActivityPollers)
require.Equal(t, paramsA.NonDeterministicWorkflowPolicy, paramsB.NonDeterministicWorkflowPolicy)
require.Equal(t, paramsA.EnableLoggingInReplay, paramsB.EnableLoggingInReplay)
require.Equal(t, paramsA.DisableStickyExecution, paramsB.DisableStickyExecution)
}

/*
type encodingTest struct {
encoding encoding
input []interface{}
}
var testWorkflowID1 = s.WorkflowExecution{WorkflowId: "testWID", RunId: "runID"}
var testWorkflowID2 = s.WorkflowExecution{WorkflowId: "testWID2", RunId: "runID2"}
var testWorkflowID1 = s.WorkflowExecution{WorkflowId: common.StringPtr("testWID"), RunId: common.StringPtr("runID")}
var testWorkflowID2 = s.WorkflowExecution{WorkflowId: common.StringPtr("testWID2"), RunId: common.StringPtr("runID2")}
var thriftEncodingTests = []encodingTest{
{&thriftEncoding{}, []interface{}{&testWorkflowID1}},
{&thriftEncoding{}, []interface{}{&testWorkflowID1, &testWorkflowID2}},
Expand Down
Loading

0 comments on commit b46ce50

Please sign in to comment.