Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions service/frontend/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ var Module = fx.Options(
fx.Provide(AuthorizationInterceptorProvider),
fx.Provide(NamespaceCheckerProvider),
fx.Provide(func(so GrpcServerOptions) *grpc.Server { return grpc.NewServer(so.Options...) }),
fx.Provide(CallbackValidatorProvider),
fx.Provide(HandlerProvider),
fx.Provide(AdminHandlerProvider),
fx.Provide(OperatorHandlerProvider),
Expand Down Expand Up @@ -700,6 +701,11 @@ func OperatorHandlerProvider(
return NewOperatorHandlerImpl(args)
}

func CallbackValidatorProvider() CallbackValidator {
// By default, no custom validation is performed.
return nil
}

func HandlerProvider(
cfg *config.Config,
serviceName primitives.ServiceName,
Expand Down Expand Up @@ -731,6 +737,7 @@ func HandlerProvider(
membershipMonitor membership.Monitor,
healthInterceptor *interceptor.HealthInterceptor,
scheduleSpecBuilder *scheduler.SpecBuilder,
callbackValidator CallbackValidator,
) Handler {
wfHandler := NewWorkflowHandler(
serviceConfig,
Expand Down Expand Up @@ -758,6 +765,7 @@ func HandlerProvider(
healthInterceptor,
scheduleSpecBuilder,
httpEnabled(cfg, serviceName),
callbackValidator,
)
return wfHandler
}
Expand Down
23 changes: 20 additions & 3 deletions service/frontend/workflow_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,17 @@ const (
)

type (
// CallbackValidator is an interface for validating callbacks.
// It may be optionally provided to the WorkflowHandler.
//
// NOTE: Experimental API, subject to change or removal without notice.
CallbackValidator interface {
// ValidateCallback validates the callback for the given namespace.
// It is called after the default validation logic is run, which means that this can only make validation stricter
// than the default behavior.
ValidateCallback(ns namespace.Name, callback *commonpb.Callback) error
}

// WorkflowHandler - gRPC handler interface for workflowservice
WorkflowHandler struct {
workflowservice.UnimplementedWorkflowServiceServer
Expand Down Expand Up @@ -142,6 +153,7 @@ type (
scheduleSpecBuilder *scheduler.SpecBuilder
outstandingPollers collection.SyncMap[string, collection.SyncMap[string, context.CancelFunc]]
httpEnabled bool
callbackValidator CallbackValidator
}
)

Expand Down Expand Up @@ -172,6 +184,7 @@ func NewWorkflowHandler(
healthInterceptor *interceptor.HealthInterceptor,
scheduleSpecBuilder *scheduler.SpecBuilder,
httpEnabled bool,
callbackValidator CallbackValidator,
) *WorkflowHandler {
handler := &WorkflowHandler{
status: common.DaemonStatusInitialized,
Expand Down Expand Up @@ -225,6 +238,7 @@ func NewWorkflowHandler(
scheduleSpecBuilder: scheduleSpecBuilder,
outstandingPollers: collection.NewSyncMap[string, collection.SyncMap[string, context.CancelFunc]](),
httpEnabled: httpEnabled,
callbackValidator: callbackValidator,
}

return handler
Expand Down Expand Up @@ -5229,9 +5243,12 @@ func (wh *WorkflowHandler) validateWorkflowCompletionCallbacks(
),
)
}
case *commonpb.Callback_Internal_:
// TODO(Tianyu): For now, there is nothing to validate given that this is an internal field.
continue
if wh.callbackValidator == nil {
return nil
}
if err := wh.callbackValidator.ValidateCallback(ns, callback); err != nil {
return err
}
default:
return status.Error(codes.Unimplemented, fmt.Sprintf("unknown callback variant: %T", cb))
}
Expand Down
1 change: 1 addition & 0 deletions service/frontend/workflow_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ func (s *WorkflowHandlerSuite) getWorkflowHandler(config *Config) *WorkflowHandl
healthInterceptor,
scheduler.NewSpecBuilder(),
true,
nil,
)
}

Expand Down
Loading