Skip to content

Commit

Permalink
Support executing local activities by name (#397)
Browse files Browse the repository at this point in the history
The change replicates the behavior of ExecuteActivity for ExecuteLocalActivity to allow executing local activities by supplying funcs or func names.

In the process, ExecuteLocalActivity is using the activity registry to lookup the activity function by name when needed.
  • Loading branch information
lminaudier authored Apr 1, 2021
1 parent d46c109 commit fee5fb6
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 8 deletions.
39 changes: 31 additions & 8 deletions internal/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,27 +546,50 @@ func ExecuteLocalActivity(ctx Context, activity interface{}, args ...interface{}
return i.ExecuteLocalActivity(ctx, activityType, args...)
}

func (wc *workflowEnvironmentInterceptor) ExecuteLocalActivity(ctx Context, activityType string, args ...interface{}) Future {
func (wc *workflowEnvironmentInterceptor) ExecuteLocalActivity(ctx Context, typeName string, args ...interface{}) Future {
header := getHeadersFromContext(ctx)
activityFn := ctx.Value(localActivityFnContextKey)
if activityFn == nil {
future, settable := newDecodeFuture(ctx, typeName)

var activityFn interface{}
activityFnOrName := ctx.Value(localActivityFnContextKey)
if activityFnOrName == nil {
panic("ExecuteLocalActivity: Expected context key " + localActivityFnContextKey + " is missing")
}

future, settable := newDecodeFuture(ctx, activityFn)
if err := validateFunctionArgs(activityFn, args, false); err != nil {
settable.Set(nil, err)
return future
if activityName, ok := activityFnOrName.(string); ok {
registry := getRegistryFromWorkflowContext(ctx)
activityType, err := getValidatedActivityFunction(typeName, args, registry)
if err != nil {
settable.Set(nil, err)
return future
}

activity, ok := registry.GetActivity(activityName)
if !ok {
settable.Set(nil, fmt.Errorf("local activity %s is not registered by the worker", activityType.Name))
return future
}

activityFn = activity.GetFunction()
} else {
if err := validateFunctionArgs(activityFnOrName, args, false); err != nil {
settable.Set(nil, err)
return future
}

activityFn = activityFnOrName
}

options, err := getValidatedLocalActivityOptions(ctx)
if err != nil {
settable.Set(nil, err)
return future
}

params := &ExecuteLocalActivityParams{
ExecuteLocalActivityOptions: *options,
ActivityFn: activityFn,
ActivityType: activityType,
ActivityType: typeName,
InputArgs: args,
WorkflowInfo: GetWorkflowInfo(ctx),
DataConverter: getDataConverterFromWorkflowContext(ctx),
Expand Down
73 changes: 73 additions & 0 deletions internal/workflow_testsuite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,76 @@ func TestUnregisteredActivity(t *testing.T) {

require.True(t, strings.HasPrefix(err1.Error(), "unable to find activityType=unregistered"), err1.Error())
}

func namedActivity(ctx context.Context, arg string) (string, error) {
return arg + " World!", nil
}

func TestLocalActivityExecutionByActivityName(t *testing.T) {
testSuite := &WorkflowTestSuite{}
env := testSuite.NewTestWorkflowEnvironment()

env.RegisterActivity(namedActivity)
env.ExecuteWorkflow(func(ctx Context, arg1 string) (string, error) {
ctx = WithLocalActivityOptions(ctx, LocalActivityOptions{
ScheduleToCloseTimeout: time.Hour,
StartToCloseTimeout: time.Hour,
})
var result string
err := ExecuteLocalActivity(ctx, "namedActivity", arg1).Get(ctx, &result)
if err != nil {
return "", err
}
return result, nil
}, "Hello")
require.NoError(t, env.GetWorkflowError())
var result string
err := env.GetWorkflowResult(&result)
require.NoError(t, err)
require.Equal(t, "Hello World!", result)
}

func TestLocalActivityExecutionByActivityNameAlias(t *testing.T) {
testSuite := &WorkflowTestSuite{}
env := testSuite.NewTestWorkflowEnvironment()

env.RegisterActivityWithOptions(namedActivity, RegisterActivityOptions{
Name: "localActivity",
})
env.ExecuteWorkflow(func(ctx Context, arg1 string) (string, error) {
ctx = WithLocalActivityOptions(ctx, LocalActivityOptions{
ScheduleToCloseTimeout: time.Hour,
StartToCloseTimeout: time.Hour,
})
var result string
err := ExecuteLocalActivity(ctx, "localActivity", arg1).Get(ctx, &result)
if err != nil {
return "", err
}
return result, nil
}, "Hello")
require.NoError(t, env.GetWorkflowError())
var result string
err := env.GetWorkflowResult(&result)
require.NoError(t, err)
require.Equal(t, "Hello World!", result)
}

func TestLocalActivityExecutionByActivityNameAliasMissingRegistration(t *testing.T) {
testSuite := &WorkflowTestSuite{}
env := testSuite.NewTestWorkflowEnvironment()

env.ExecuteWorkflow(func(ctx Context, arg1 string) (string, error) {
ctx = WithLocalActivityOptions(ctx, LocalActivityOptions{
ScheduleToCloseTimeout: time.Hour,
StartToCloseTimeout: time.Hour,
})
var result string
err := ExecuteLocalActivity(ctx, "localActivity", arg1).Get(ctx, &result)
if err != nil {
return "", err
}
return result, nil
}, "Hello")
require.NotNil(t, env.GetWorkflowError())
}

0 comments on commit fee5fb6

Please sign in to comment.