Skip to content

Commit

Permalink
WorkflowInterceptor refactoring (#169)
Browse files Browse the repository at this point in the history
* renamed interceptors

* Refactored interceptors to match Java SDK

* Added newCoroutine do WorkflowOutboundCallsInterceptor

* Renamed NewCoroutine to Go
  • Loading branch information
mfateev committed Jun 24, 2020
1 parent 4f0fb49 commit 15b671e
Show file tree
Hide file tree
Showing 11 changed files with 314 additions and 201 deletions.
28 changes: 20 additions & 8 deletions interceptors/workflow_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,31 @@ import (
)

type (
// WorkflowInterceptorFactory is used to create a single link in the interceptor chain
WorkflowInterceptorFactory = internal.WorkflowInterceptorFactory
// WorkflowInterceptor is used to create a single link in the interceptor chain. Called once per workflow execution replay.
WorkflowInterceptor = internal.WorkflowInterceptor

// WorkflowInterceptor is an interface that can be implemented to intercept calls to the workflow function
// as well calls done by the workflow code.
// Use worker.WorkflowInterceptorBase as a base struct for implementations that do not want to implement every method.
// WorkflowInboundCallsInterceptor is an interface that can be implemented to intercept calls to the workflow.
// Use WorkflowInboundCallsInterceptorBase as a base struct for implementations that do not want to implement every method.
// Interceptor implementation must forward calls to the next in the interceptor chain.
// All code in the interceptor is executed in the workflow.Context of a workflow. So all the rules and restrictions
// that apply to the workflow code should be obeyed by the interceptor implementation.
// Use workflow.IsReplaying(ctx) to filter out duplicated calls.
WorkflowInterceptor = internal.WorkflowInterceptor
WorkflowInboundCallsInterceptor = internal.WorkflowInboundCallsInterceptor

// WorkflowOutboundCallsInterceptor is an interface that can be implemented to intercept calls to the SDK APIs done
// by the workflow code.
// Use worker.WorkflowOutboundCallsInterceptorBase as a base struct for implementations that do not want to implement every method.
// Interceptor implementation must forward calls to the next in the interceptor chain.
// All code in the interceptor is executed in the workflow.Context of a workflow. So all the rules and restrictions
// that apply to the workflow code should be obeyed by the interceptor implementation.
// Use workflow.IsReplaying(ctx) to filter out duplicated calls.
WorkflowOutboundCallsInterceptor = internal.WorkflowOutboundCallsInterceptor

// WorkflowInboundCallsInterceptorBase is a noop implementation of WorkflowInboundCallsInterceptor that just forwards requests
// to the next link in an interceptor chain. To be used as base implementation of interceptors.
WorkflowInboundCallsInterceptorBase = internal.WorkflowInboundCallsInterceptorBase

// WorkflowInterceptorBase is a noop implementation of WorkflowInterceptor that just forwards requests
// WorkflowOutboundCallsInterceptorBase is a noop implementation of WorkflowOutboundCallsInterceptor that just forwards requests
// to the next link in an interceptor chain. To be used as base implementation of interceptors.
WorkflowInterceptorBase = internal.WorkflowInterceptorBase
WorkflowOutboundCallsInterceptorBase = internal.WorkflowOutboundCallsInterceptorBase
)
103 changes: 67 additions & 36 deletions internal/interceptors.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,26 +31,39 @@ import (
"go.uber.org/zap"
)

// WorkflowInterceptorFactory is used to create a single link in the interceptor chain
type WorkflowInterceptorFactory interface {
// NewInterceptor creates an interceptor instance. The created instance must delegate every call to
// WorkflowInterceptor is used to create a single link in the interceptor chain
type WorkflowInterceptor interface {
// InterceptWorkflow creates an interceptor instance. The created instance must delegate every call to
// the next parameter for workflow code function correctly.
NewInterceptor(info *WorkflowInfo, next WorkflowInterceptor) WorkflowInterceptor
InterceptWorkflow(info *WorkflowInfo, next WorkflowInboundCallsInterceptor) WorkflowInboundCallsInterceptor
}

// WorkflowInterceptor is an interface that can be implemented to intercept calls to the workflow function
// as well calls done by the workflow code.
// Use worker.WorkflowInterceptorBase as a base struct for implementations that do not want to implement every method.
// WorkflowInboundCallsInterceptor is an interface that can be implemented to intercept calls to the workflow.
// Use WorkflowInboundCallsInterceptorBase as a base struct for implementations that do not want to implement every method.
// Interceptor implementation must forward calls to the next in the interceptor chain.
// All code in the interceptor is executed in the context of a workflow. So all the rules and restrictions
// All code in the interceptor is executed in the workflow.Context of a workflow. So all the rules and restrictions
// that apply to the workflow code should be obeyed by the interceptor implementation.
// Use workflow.IsReplaying(ctx) to filter out duplicated calls.
type WorkflowInterceptor interface {
type WorkflowInboundCallsInterceptor interface {
Init(outbound WorkflowOutboundCallsInterceptor) error

// ExecuteWorkflow intercepts workflow function invocation. As calls to other intercepted functions are done from
// a workflow function this function is the first to be called and completes workflow as soon as it returns.
// WorkflowType argument is for information purposes only and should not be mutated.
ExecuteWorkflow(ctx Context, workflowType string, args ...interface{}) []interface{}

//TODO(maxim): ProcessSignal(ctx Context, signalName string, arg interface{}) error
}

// WorkflowOutboundCallsInterceptor is an interface that can be implemented to intercept calls to the SDK APIs done
// by the workflow code.
// Use worker.WorkflowOutboundCallsInterceptorBase as a base struct for implementations that do not want to implement every method.
// Interceptor implementation must forward calls to the next in the interceptor chain.
// All code in the interceptor is executed in the workflow.Context of a workflow. So all the rules and restrictions
// that apply to the workflow code should be obeyed by the interceptor implementation.
// Use workflow.IsReplaying(ctx) to filter out duplicated calls.
type WorkflowOutboundCallsInterceptor interface {
Go(ctx Context, name string, f func(ctx Context)) Context
ExecuteActivity(ctx Context, activityType string, args ...interface{}) Future
ExecuteLocalActivity(ctx Context, activityType string, args ...interface{}) Future
ExecuteChildWorkflow(ctx Context, childWorkflowType string, args ...interface{}) ChildWorkflowFuture
Expand All @@ -73,114 +86,132 @@ type WorkflowInterceptor interface {
GetLastCompletionResult(ctx Context, d ...interface{}) error
}

var _ WorkflowInterceptor = (*WorkflowInterceptorBase)(nil)
var _ WorkflowOutboundCallsInterceptor = (*WorkflowOutboundCallsInterceptorBase)(nil)
var _ WorkflowInboundCallsInterceptor = (*WorkflowInboundCallsInterceptorBase)(nil)

// WorkflowInboundCallsInterceptorBase is a noop implementation of WorkflowInboundCallsInterceptor that just forwards requests
// to the next link in an interceptor chain. To be used as base implementation of interceptors.
type WorkflowInboundCallsInterceptorBase struct {
Next WorkflowInboundCallsInterceptor
}

// Init called before the workflow function is invoked
func (w WorkflowInboundCallsInterceptorBase) Init(outbound WorkflowOutboundCallsInterceptor) error {
return w.Next.Init(outbound)
}

// ExecuteWorkflow intercepts invocation of the workflow function
func (w WorkflowInboundCallsInterceptorBase) ExecuteWorkflow(ctx Context, workflowType string, args ...interface{}) []interface{} {
return w.Next.ExecuteWorkflow(ctx, workflowType, args...)
}

// WorkflowInterceptorBase is a helper type that can simplify creation of WorkflowInterceptors
type WorkflowInterceptorBase struct {
Next WorkflowInterceptor
// WorkflowOutboundCallsInterceptorBase is a noop implementation of WorkflowOutboundCallsInterceptor that just forwards requests
// to the next link in an interceptor chain. To be used as base implementation of interceptors.
type WorkflowOutboundCallsInterceptorBase struct {
Next WorkflowOutboundCallsInterceptor
}

// ExecuteWorkflow forwards to t.Next
func (t *WorkflowInterceptorBase) ExecuteWorkflow(ctx Context, workflowType string, args ...interface{}) []interface{} {
return t.Next.ExecuteWorkflow(ctx, workflowType, args...)
// Go forwards to t.Next
func (t *WorkflowOutboundCallsInterceptorBase) Go(ctx Context, name string, f func(ctx Context)) Context {
return t.Next.Go(ctx, name, f)
}

// ExecuteActivity forwards to t.Next
func (t *WorkflowInterceptorBase) ExecuteActivity(ctx Context, activityType string, args ...interface{}) Future {
func (t *WorkflowOutboundCallsInterceptorBase) ExecuteActivity(ctx Context, activityType string, args ...interface{}) Future {
return t.Next.ExecuteActivity(ctx, activityType, args...)
}

// ExecuteLocalActivity forwards to t.Next
func (t *WorkflowInterceptorBase) ExecuteLocalActivity(ctx Context, activityType string, args ...interface{}) Future {
func (t *WorkflowOutboundCallsInterceptorBase) ExecuteLocalActivity(ctx Context, activityType string, args ...interface{}) Future {
return t.Next.ExecuteLocalActivity(ctx, activityType, args...)
}

// ExecuteChildWorkflow forwards to t.Next
func (t *WorkflowInterceptorBase) ExecuteChildWorkflow(ctx Context, childWorkflowType string, args ...interface{}) ChildWorkflowFuture {
func (t *WorkflowOutboundCallsInterceptorBase) ExecuteChildWorkflow(ctx Context, childWorkflowType string, args ...interface{}) ChildWorkflowFuture {
return t.Next.ExecuteChildWorkflow(ctx, childWorkflowType, args...)
}

// GetWorkflowInfo forwards to t.Next
func (t *WorkflowInterceptorBase) GetWorkflowInfo(ctx Context) *WorkflowInfo {
func (t *WorkflowOutboundCallsInterceptorBase) GetWorkflowInfo(ctx Context) *WorkflowInfo {
return t.Next.GetWorkflowInfo(ctx)
}

// GetLogger forwards to t.Next
func (t *WorkflowInterceptorBase) GetLogger(ctx Context) *zap.Logger {
func (t *WorkflowOutboundCallsInterceptorBase) GetLogger(ctx Context) *zap.Logger {
return t.Next.GetLogger(ctx)
}

// GetMetricsScope forwards to t.Next
func (t *WorkflowInterceptorBase) GetMetricsScope(ctx Context) tally.Scope {
func (t *WorkflowOutboundCallsInterceptorBase) GetMetricsScope(ctx Context) tally.Scope {
return t.Next.GetMetricsScope(ctx)
}

// Now forwards to t.Next
func (t *WorkflowInterceptorBase) Now(ctx Context) time.Time {
func (t *WorkflowOutboundCallsInterceptorBase) Now(ctx Context) time.Time {
return t.Next.Now(ctx)
}

// NewTimer forwards to t.Next
func (t *WorkflowInterceptorBase) NewTimer(ctx Context, d time.Duration) Future {
func (t *WorkflowOutboundCallsInterceptorBase) NewTimer(ctx Context, d time.Duration) Future {
return t.Next.NewTimer(ctx, d)
}

// Sleep forwards to t.Next
func (t *WorkflowInterceptorBase) Sleep(ctx Context, d time.Duration) (err error) {
func (t *WorkflowOutboundCallsInterceptorBase) Sleep(ctx Context, d time.Duration) (err error) {
return t.Next.Sleep(ctx, d)
}

// RequestCancelExternalWorkflow forwards to t.Next
func (t *WorkflowInterceptorBase) RequestCancelExternalWorkflow(ctx Context, workflowID, runID string) Future {
func (t *WorkflowOutboundCallsInterceptorBase) RequestCancelExternalWorkflow(ctx Context, workflowID, runID string) Future {
return t.Next.RequestCancelExternalWorkflow(ctx, workflowID, runID)
}

// SignalExternalWorkflow forwards to t.Next
func (t *WorkflowInterceptorBase) SignalExternalWorkflow(ctx Context, workflowID, runID, signalName string, arg interface{}) Future {
func (t *WorkflowOutboundCallsInterceptorBase) SignalExternalWorkflow(ctx Context, workflowID, runID, signalName string, arg interface{}) Future {
return t.Next.SignalExternalWorkflow(ctx, workflowID, runID, signalName, arg)
}

// UpsertSearchAttributes forwards to t.Next
func (t *WorkflowInterceptorBase) UpsertSearchAttributes(ctx Context, attributes map[string]interface{}) error {
func (t *WorkflowOutboundCallsInterceptorBase) UpsertSearchAttributes(ctx Context, attributes map[string]interface{}) error {
return t.Next.UpsertSearchAttributes(ctx, attributes)
}

// GetSignalChannel forwards to t.Next
func (t *WorkflowInterceptorBase) GetSignalChannel(ctx Context, signalName string) ReceiveChannel {
func (t *WorkflowOutboundCallsInterceptorBase) GetSignalChannel(ctx Context, signalName string) ReceiveChannel {
return t.Next.GetSignalChannel(ctx, signalName)
}

// SideEffect forwards to t.Next
func (t *WorkflowInterceptorBase) SideEffect(ctx Context, f func(ctx Context) interface{}) Value {
func (t *WorkflowOutboundCallsInterceptorBase) SideEffect(ctx Context, f func(ctx Context) interface{}) Value {
return t.Next.SideEffect(ctx, f)
}

// MutableSideEffect forwards to t.Next
func (t *WorkflowInterceptorBase) MutableSideEffect(ctx Context, id string, f func(ctx Context) interface{}, equals func(a, b interface{}) bool) Value {
func (t *WorkflowOutboundCallsInterceptorBase) MutableSideEffect(ctx Context, id string, f func(ctx Context) interface{}, equals func(a, b interface{}) bool) Value {
return t.Next.MutableSideEffect(ctx, id, f, equals)
}

// GetVersion forwards to t.Next
func (t *WorkflowInterceptorBase) GetVersion(ctx Context, changeID string, minSupported, maxSupported Version) Version {
func (t *WorkflowOutboundCallsInterceptorBase) GetVersion(ctx Context, changeID string, minSupported, maxSupported Version) Version {
return t.Next.GetVersion(ctx, changeID, minSupported, maxSupported)
}

// SetQueryHandler forwards to t.Next
func (t *WorkflowInterceptorBase) SetQueryHandler(ctx Context, queryType string, handler interface{}) error {
func (t *WorkflowOutboundCallsInterceptorBase) SetQueryHandler(ctx Context, queryType string, handler interface{}) error {
return t.Next.SetQueryHandler(ctx, queryType, handler)
}

// IsReplaying forwards to t.Next
func (t *WorkflowInterceptorBase) IsReplaying(ctx Context) bool {
func (t *WorkflowOutboundCallsInterceptorBase) IsReplaying(ctx Context) bool {
return t.Next.IsReplaying(ctx)
}

// HasLastCompletionResult forwards to t.Next
func (t *WorkflowInterceptorBase) HasLastCompletionResult(ctx Context) bool {
func (t *WorkflowOutboundCallsInterceptorBase) HasLastCompletionResult(ctx Context) bool {
return t.Next.HasLastCompletionResult(ctx)
}

// GetLastCompletionResult forwards to t.Next
func (t *WorkflowInterceptorBase) GetLastCompletionResult(ctx Context, d ...interface{}) error {
func (t *WorkflowOutboundCallsInterceptorBase) GetLastCompletionResult(ctx Context, d ...interface{}) error {
return t.Next.GetLastCompletionResult(ctx, d...)
}
Loading

0 comments on commit 15b671e

Please sign in to comment.