From fee5fb686021eefb7de527f45245d5b8a38bdcd5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Minaudier?= Date: Fri, 2 Apr 2021 00:05:01 +0200 Subject: [PATCH] Support executing local activities by name (#397) The change replicates the behavior of ExecuteActivity for ExecuteLocalActivity to allow executing local activities by supplying funcs or func names. In the process, ExecuteLocalActivity is using the activity registry to lookup the activity function by name when needed. --- internal/workflow.go | 39 +++++++++++---- internal/workflow_testsuite_test.go | 73 +++++++++++++++++++++++++++++ 2 files changed, 104 insertions(+), 8 deletions(-) diff --git a/internal/workflow.go b/internal/workflow.go index 7cf5c3cd8..2620b6bbe 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -546,27 +546,50 @@ func ExecuteLocalActivity(ctx Context, activity interface{}, args ...interface{} return i.ExecuteLocalActivity(ctx, activityType, args...) } -func (wc *workflowEnvironmentInterceptor) ExecuteLocalActivity(ctx Context, activityType string, args ...interface{}) Future { +func (wc *workflowEnvironmentInterceptor) ExecuteLocalActivity(ctx Context, typeName string, args ...interface{}) Future { header := getHeadersFromContext(ctx) - activityFn := ctx.Value(localActivityFnContextKey) - if activityFn == nil { + future, settable := newDecodeFuture(ctx, typeName) + + var activityFn interface{} + activityFnOrName := ctx.Value(localActivityFnContextKey) + if activityFnOrName == nil { panic("ExecuteLocalActivity: Expected context key " + localActivityFnContextKey + " is missing") } - future, settable := newDecodeFuture(ctx, activityFn) - if err := validateFunctionArgs(activityFn, args, false); err != nil { - settable.Set(nil, err) - return future + if activityName, ok := activityFnOrName.(string); ok { + registry := getRegistryFromWorkflowContext(ctx) + activityType, err := getValidatedActivityFunction(typeName, args, registry) + if err != nil { + settable.Set(nil, err) + return future + } + + activity, ok := registry.GetActivity(activityName) + if !ok { + settable.Set(nil, fmt.Errorf("local activity %s is not registered by the worker", activityType.Name)) + return future + } + + activityFn = activity.GetFunction() + } else { + if err := validateFunctionArgs(activityFnOrName, args, false); err != nil { + settable.Set(nil, err) + return future + } + + activityFn = activityFnOrName } + options, err := getValidatedLocalActivityOptions(ctx) if err != nil { settable.Set(nil, err) return future } + params := &ExecuteLocalActivityParams{ ExecuteLocalActivityOptions: *options, ActivityFn: activityFn, - ActivityType: activityType, + ActivityType: typeName, InputArgs: args, WorkflowInfo: GetWorkflowInfo(ctx), DataConverter: getDataConverterFromWorkflowContext(ctx), diff --git a/internal/workflow_testsuite_test.go b/internal/workflow_testsuite_test.go index 6a5e2a0ef..f2462beb4 100644 --- a/internal/workflow_testsuite_test.go +++ b/internal/workflow_testsuite_test.go @@ -128,3 +128,76 @@ func TestUnregisteredActivity(t *testing.T) { require.True(t, strings.HasPrefix(err1.Error(), "unable to find activityType=unregistered"), err1.Error()) } + +func namedActivity(ctx context.Context, arg string) (string, error) { + return arg + " World!", nil +} + +func TestLocalActivityExecutionByActivityName(t *testing.T) { + testSuite := &WorkflowTestSuite{} + env := testSuite.NewTestWorkflowEnvironment() + + env.RegisterActivity(namedActivity) + env.ExecuteWorkflow(func(ctx Context, arg1 string) (string, error) { + ctx = WithLocalActivityOptions(ctx, LocalActivityOptions{ + ScheduleToCloseTimeout: time.Hour, + StartToCloseTimeout: time.Hour, + }) + var result string + err := ExecuteLocalActivity(ctx, "namedActivity", arg1).Get(ctx, &result) + if err != nil { + return "", err + } + return result, nil + }, "Hello") + require.NoError(t, env.GetWorkflowError()) + var result string + err := env.GetWorkflowResult(&result) + require.NoError(t, err) + require.Equal(t, "Hello World!", result) +} + +func TestLocalActivityExecutionByActivityNameAlias(t *testing.T) { + testSuite := &WorkflowTestSuite{} + env := testSuite.NewTestWorkflowEnvironment() + + env.RegisterActivityWithOptions(namedActivity, RegisterActivityOptions{ + Name: "localActivity", + }) + env.ExecuteWorkflow(func(ctx Context, arg1 string) (string, error) { + ctx = WithLocalActivityOptions(ctx, LocalActivityOptions{ + ScheduleToCloseTimeout: time.Hour, + StartToCloseTimeout: time.Hour, + }) + var result string + err := ExecuteLocalActivity(ctx, "localActivity", arg1).Get(ctx, &result) + if err != nil { + return "", err + } + return result, nil + }, "Hello") + require.NoError(t, env.GetWorkflowError()) + var result string + err := env.GetWorkflowResult(&result) + require.NoError(t, err) + require.Equal(t, "Hello World!", result) +} + +func TestLocalActivityExecutionByActivityNameAliasMissingRegistration(t *testing.T) { + testSuite := &WorkflowTestSuite{} + env := testSuite.NewTestWorkflowEnvironment() + + env.ExecuteWorkflow(func(ctx Context, arg1 string) (string, error) { + ctx = WithLocalActivityOptions(ctx, LocalActivityOptions{ + ScheduleToCloseTimeout: time.Hour, + StartToCloseTimeout: time.Hour, + }) + var result string + err := ExecuteLocalActivity(ctx, "localActivity", arg1).Get(ctx, &result) + if err != nil { + return "", err + } + return result, nil + }, "Hello") + require.NotNil(t, env.GetWorkflowError()) +}