From 6d109bfebcf1176b1c10590428aad69dc15595a4 Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Mon, 13 Apr 2020 18:33:40 -0700 Subject: [PATCH] Broke channel into SendChannel and ReceiveChannel interfaces (#100) --- internal/context.go | 4 +- internal/interceptors.go | 4 +- internal/internal_coroutines_test.go | 46 ++++++++++++++++---- internal/internal_task_handlers_test.go | 1 + internal/internal_worker.go | 31 +++++++------ internal/internal_workflow.go | 10 ++--- internal/internal_workflow_test.go | 16 +++---- internal/internal_workflow_testsuite_test.go | 6 +-- internal/session.go | 2 +- internal/workflow.go | 41 ++++++++++------- workflow/deterministic_wrappers.go | 8 ++++ workflow/workflow.go | 2 +- 12 files changed, 109 insertions(+), 62 deletions(-) diff --git a/internal/context.go b/internal/context.go index ef770a3c1..b251fdb26 100644 --- a/internal/context.go +++ b/internal/context.go @@ -238,10 +238,10 @@ func propagateCancel(parent Context, child canceler) { } else { go func() { s := NewSelector(parent) - s.AddReceive(parent.Done(), func(c Channel, more bool) { + s.AddReceive(parent.Done(), func(c ReceiveChannel, more bool) { child.cancel(false, parent.Err()) }) - s.AddReceive(child.Done(), func(c Channel, more bool) {}) + s.AddReceive(child.Done(), func(c ReceiveChannel, more bool) {}) s.Select(parent) }() } diff --git a/internal/interceptors.go b/internal/interceptors.go index 1d93aae48..dfc17ebfc 100644 --- a/internal/interceptors.go +++ b/internal/interceptors.go @@ -63,7 +63,7 @@ type WorkflowInterceptor interface { RequestCancelExternalWorkflow(ctx Context, workflowID, runID string) Future SignalExternalWorkflow(ctx Context, workflowID, runID, signalName string, arg interface{}) Future UpsertSearchAttributes(ctx Context, attributes map[string]interface{}) error - GetSignalChannel(ctx Context, signalName string) Channel + GetSignalChannel(ctx Context, signalName string) ReceiveChannel SideEffect(ctx Context, f func(ctx Context) interface{}) Value MutableSideEffect(ctx Context, id string, f func(ctx Context) interface{}, equals func(a, b interface{}) bool) Value GetVersion(ctx Context, changeID string, minSupported, maxSupported Version) Version @@ -146,7 +146,7 @@ func (t *WorkflowInterceptorBase) UpsertSearchAttributes(ctx Context, attributes } // GetSignalChannel forwards to t.Next -func (t *WorkflowInterceptorBase) GetSignalChannel(ctx Context, signalName string) Channel { +func (t *WorkflowInterceptorBase) GetSignalChannel(ctx Context, signalName string) ReceiveChannel { return t.Next.GetSignalChannel(ctx, signalName) } diff --git a/internal/internal_coroutines_test.go b/internal/internal_coroutines_test.go index 8fce8ed34..1465a1868 100644 --- a/internal/internal_coroutines_test.go +++ b/internal/internal_coroutines_test.go @@ -52,6 +52,7 @@ func requireNoExecuteErr(t *testing.T, err error) { func TestDispatcher(t *testing.T) { value := "foo" d, _ := newDispatcher(createRootTestContext(), func(ctx Context) { value = "bar" }) + defer d.Close() require.Equal(t, "foo", value) requireNoExecuteErr(t, d.ExecuteUntilAllBlocked()) require.True(t, d.IsDone()) @@ -69,6 +70,7 @@ func TestNonBlockingChildren(t *testing.T) { } history = append(history, "root") }) + defer d.Close() require.EqualValues(t, 0, len(history)) requireNoExecuteErr(t, d.ExecuteUntilAllBlocked()) require.True(t, d.IsDone()) @@ -92,6 +94,7 @@ func TestNonbufferedChannel(t *testing.T) { history = append(history, "root-after-channel-put") }) + defer d.Close() require.EqualValues(t, 0, len(history)) requireNoExecuteErr(t, d.ExecuteUntilAllBlocked()) require.True(t, d.IsDone()) @@ -138,6 +141,7 @@ func TestNonbufferedChannelBlockedReceive(t *testing.T) { history = append(history, "root-after-channel-put") }) + defer d.Close() require.EqualValues(t, 0, len(history)) requireNoExecuteErr(t, d.ExecuteUntilAllBlocked()) c2.SendAsync("value21") @@ -169,6 +173,7 @@ func TestBufferedChannelPut(t *testing.T) { c1.Send(ctx, "value2") history = append(history, "root-after-channel-put2") }) + defer d.Close() require.EqualValues(t, 0, len(history)) requireNoExecuteErr(t, d.ExecuteUntilAllBlocked()) require.True(t, d.IsDone()) @@ -219,6 +224,7 @@ func TestBufferedChannelGet(t *testing.T) { c1.Send(ctx, "value2") history = append(history, "root-after-channel-put2") }) + defer d.Close() require.EqualValues(t, 0, len(history)) requireNoExecuteErr(t, d.ExecuteUntilAllBlocked()) require.True(t, d.IsDone(), strings.Join(history, "\n")+"\n\n"+d.StackTrace()) @@ -246,13 +252,13 @@ func TestNotBlockingSelect(t *testing.T) { c2 := NewBufferedChannel(ctx, 1) s := NewSelector(ctx) s. - AddReceive(c1, func(c Channel, more bool) { + AddReceive(c1, func(c ReceiveChannel, more bool) { require.True(t, more) var v string c.Receive(ctx, &v) history = append(history, fmt.Sprintf("c1-%v", v)) }). - AddReceive(c2, func(c Channel, more bool) { + AddReceive(c2, func(c ReceiveChannel, more bool) { require.True(t, more) var v string c.Receive(ctx, &v) @@ -265,6 +271,7 @@ func TestNotBlockingSelect(t *testing.T) { s.Select(ctx) s.Select(ctx) }) + defer d.Close() requireNoExecuteErr(t, d.ExecuteUntilAllBlocked()) require.True(t, d.IsDone()) @@ -295,13 +302,13 @@ func TestBlockingSelect(t *testing.T) { s := NewSelector(ctx) s. - AddReceive(c1, func(c Channel, more bool) { + AddReceive(c1, func(c ReceiveChannel, more bool) { require.True(t, more) var v string c.Receive(ctx, &v) history = append(history, fmt.Sprintf("c1-%v", v)) }). - AddReceive(c2, func(c Channel, more bool) { + AddReceive(c2, func(c ReceiveChannel, more bool) { var v string c.Receive(ctx, &v) history = append(history, fmt.Sprintf("c2-%v", v)) @@ -312,6 +319,7 @@ func TestBlockingSelect(t *testing.T) { s.Select(ctx) history = append(history, "done") }) + defer d.Close() requireNoExecuteErr(t, d.ExecuteUntilAllBlocked()) require.True(t, d.IsDone(), strings.Join(history, "\n")) @@ -336,7 +344,7 @@ func TestBlockingSelectAsyncSend(t *testing.T) { c1 := NewChannel(ctx) s := NewSelector(ctx) s. - AddReceive(c1, func(c Channel, more bool) { + AddReceive(c1, func(c ReceiveChannel, more bool) { require.True(t, more) var v int c.Receive(ctx, &v) @@ -353,6 +361,7 @@ func TestBlockingSelectAsyncSend(t *testing.T) { } history = append(history, "done") }) + defer d.Close() requireNoExecuteErr(t, d.ExecuteUntilAllBlocked()) require.True(t, d.IsDone(), strings.Join(history, "\n")) @@ -380,7 +389,7 @@ func TestSelectOnClosedChannel(t *testing.T) { selector := NewNamedSelector(ctx, "waiting for channel") - selector.AddReceive(c, func(f Channel, more bool) { + selector.AddReceive(c, func(f ReceiveChannel, more bool) { var n int if !more { @@ -401,6 +410,7 @@ func TestSelectOnClosedChannel(t *testing.T) { selector.Select(ctx) selector.Select(ctx) }) + defer d.Close() requireNoExecuteErr(t, d.ExecuteUntilAllBlocked()) require.True(t, d.IsDone(), strings.Join(history, "\n")) @@ -420,13 +430,13 @@ func TestBlockingSelectAsyncSend2(t *testing.T) { c2 := NewBufferedChannel(ctx, 100) s := NewSelector(ctx) s. - AddReceive(c1, func(c Channel, more bool) { + AddReceive(c1, func(c ReceiveChannel, more bool) { require.True(t, more) var v string c.Receive(ctx, &v) history = append(history, fmt.Sprintf("c1-%v", v)) }). - AddReceive(c2, func(c Channel, more bool) { + AddReceive(c2, func(c ReceiveChannel, more bool) { require.True(t, more) var v string c.Receive(ctx, &v) @@ -443,6 +453,7 @@ func TestBlockingSelectAsyncSend2(t *testing.T) { s.Select(ctx) history = append(history, "done") }) + defer d.Close() requireNoExecuteErr(t, d.ExecuteUntilAllBlocked()) require.True(t, d.IsDone(), strings.Join(history, "\n")) @@ -483,6 +494,7 @@ func TestSendSelect(t *testing.T) { s.Select(ctx) history = append(history, "done") }) + defer d.Close() requireNoExecuteErr(t, d.ExecuteUntilAllBlocked()) require.True(t, d.IsDone()) @@ -525,6 +537,7 @@ func TestSendSelectWithAsyncReceive(t *testing.T) { s.Select(ctx) history = append(history, "done") }) + defer d.Close() requireNoExecuteErr(t, d.ExecuteUntilAllBlocked()) require.True(t, d.IsDone(), strings.Join(history, "\n")) @@ -570,6 +583,7 @@ func TestChannelClose(t *testing.T) { history = append(history, "done") }) + defer d.Close() require.EqualValues(t, 0, len(history)) requireNoExecuteErr(t, d.ExecuteUntilAllBlocked()) require.True(t, d.IsDone(), d.StackTrace()) @@ -599,6 +613,7 @@ func TestSendClosedChannel(t *testing.T) { }) c.Send(ctx, "baz") }) + defer d.Close() requireNoExecuteErr(t, d.ExecuteUntilAllBlocked()) require.True(t, d.IsDone()) } @@ -613,6 +628,7 @@ func TestBlockedSendClosedChannel(t *testing.T) { c.Close() c.Send(ctx, "baz") }) + defer d.Close() requireNoExecuteErr(t, d.ExecuteUntilAllBlocked()) require.True(t, d.IsDone()) } @@ -627,6 +643,7 @@ func TestAsyncSendClosedChannel(t *testing.T) { c.Close() _ = c.SendAsync("baz") }) + defer d.Close() requireNoExecuteErr(t, d.ExecuteUntilAllBlocked()) require.True(t, d.IsDone()) } @@ -683,6 +700,7 @@ func TestPanic(t *testing.T) { history = append(history, "root") c.Receive(ctx, nil) // blocked forever }) + defer d.Close() require.EqualValues(t, 0, len(history)) err := d.ExecuteUntilAllBlocked() require.Error(t, err) @@ -699,6 +717,7 @@ func TestAwait(t *testing.T) { d, _ := newDispatcher(createRootTestContext(), func(ctx Context) { _ = Await(ctx, func() bool { return flag }) }) + defer d.Close() err := d.ExecuteUntilAllBlocked() require.NoError(t, err) require.False(t, d.IsDone()) @@ -718,6 +737,7 @@ func TestAwaitCancellation(t *testing.T) { d, _ := newDispatcher(ctx, func(ctx Context) { awaitError = Await(ctx, func() bool { return false }) }) + defer d.Close() err := d.ExecuteUntilAllBlocked() require.NoError(t, err) require.False(t, d.IsDone()) @@ -737,6 +757,7 @@ func TestAwaitWithTimeoutNoTimeout(t *testing.T) { d, _ := newDispatcher(createRootTestContext(), func(ctx Context) { awaitOk, awaitWithTimeoutError = AwaitWithTimeout(ctx, time.Hour, func() bool { return flag }) }) + defer d.Close() err := d.ExecuteUntilAllBlocked() require.NoError(t, err) require.False(t, d.IsDone()) @@ -760,6 +781,7 @@ func TestAwaitWithTimeoutCancellation(t *testing.T) { d, _ := newDispatcher(ctx, func(ctx Context) { awaitOk, awaitWithTimeoutError = AwaitWithTimeout(ctx, time.Hour, func() bool { return false }) }) + defer d.Close() err := d.ExecuteUntilAllBlocked() require.NoError(t, err) require.False(t, d.IsDone()) @@ -797,6 +819,7 @@ func TestFutureSetValue(t *testing.T) { history = append(history, "root-end") }) + defer d.Close() require.EqualValues(t, 0, len(history)) requireNoExecuteErr(t, d.ExecuteUntilAllBlocked()) require.False(t, d.IsDone(), fmt.Sprintf("%v", d.StackTrace())) @@ -841,6 +864,7 @@ func TestFutureFail(t *testing.T) { history = append(history, "root-end") }) + defer d.Close() require.EqualValues(t, 0, len(history)) requireNoExecuteErr(t, d.ExecuteUntilAllBlocked()) require.False(t, d.IsDone(), fmt.Sprintf("%v", d.StackTrace())) @@ -898,6 +922,7 @@ func TestFutureSet(t *testing.T) { }) history = append(history, "root-end") }) + defer d.Close() require.EqualValues(t, 0, len(history)) requireNoExecuteErr(t, d.ExecuteUntilAllBlocked()) @@ -971,6 +996,7 @@ func TestFutureChain(t *testing.T) { history = append(history, "root-end") }) + defer d.Close() require.EqualValues(t, 0, len(history)) requireNoExecuteErr(t, d.ExecuteUntilAllBlocked()) require.False(t, d.IsDone(), fmt.Sprintf("%v", d.StackTrace())) @@ -1036,6 +1062,7 @@ func TestSelectFuture(t *testing.T) { s.Select(ctx) history = append(history, "done") }) + defer d.Close() requireNoExecuteErr(t, d.ExecuteUntilAllBlocked()) require.True(t, d.IsDone()) @@ -1085,6 +1112,7 @@ func TestSelectDecodeFuture(t *testing.T) { s.Select(ctx) history = append(history, "done") }) + defer d.Close() requireNoExecuteErr(t, d.ExecuteUntilAllBlocked()) require.True(t, d.IsDone()) @@ -1143,6 +1171,7 @@ func TestDecodeFutureChain(t *testing.T) { }) history = append(history, "root-end") }) + defer d.Close() require.EqualValues(t, 0, len(history)) requireNoExecuteErr(t, d.ExecuteUntilAllBlocked()) // set f1 @@ -1211,6 +1240,7 @@ func TestSelectFuture_WithBatchSets(t *testing.T) { s.Select(ctx) s.Select(ctx) }) + defer d.Close() requireNoExecuteErr(t, d.ExecuteUntilAllBlocked()) require.True(t, d.IsDone()) diff --git a/internal/internal_task_handlers_test.go b/internal/internal_task_handlers_test.go index bf93a7fdd..e887cc560 100644 --- a/internal/internal_task_handlers_test.go +++ b/internal/internal_task_handlers_test.go @@ -1364,6 +1364,7 @@ func (t *TaskHandlersTestSuite) TestActivityExecutionWorkerStop() { UserContext: ctx, UserContextCancel: cancel, WorkerStopChannel: workerStopCh, + Tracer: opentracing.NoopTracer{}, } activityHandler := newActivityTaskHandler(mockService, wep, registry) pats := &workflowservice.PollForActivityTaskResponse{ diff --git a/internal/internal_worker.go b/internal/internal_worker.go index 49831bc6b..ed43657a3 100644 --- a/internal/internal_worker.go +++ b/internal/internal_worker.go @@ -971,30 +971,29 @@ func (aw *AggregatedWorker) Start() error { if !isInterfaceNil(aw.workflowWorker) { if len(aw.registry.getRegisteredWorkflowTypes()) == 0 { - aw.logger.Warn( - "Starting worker without any workflows. Workflows must be registered before start.", - ) - } - if err := aw.workflowWorker.Start(); err != nil { - return err + aw.logger.Info("No workflows registered. Skipping workflow worker start") + } else { + if err := aw.workflowWorker.Start(); err != nil { + return err + } } } if !isInterfaceNil(aw.activityWorker) { if len(aw.registry.getRegisteredActivities()) == 0 { - aw.logger.Warn( - "Starting worker without any activities. Activities must be registered before start.", - ) - } - if err := aw.activityWorker.Start(); err != nil { - // stop workflow worker. - if !isInterfaceNil(aw.workflowWorker) { - aw.workflowWorker.Stop() + aw.logger.Info("No activities registered. Skipping activity worker start") + } else { + if err := aw.activityWorker.Start(); err != nil { + // stop workflow worker. + if !isInterfaceNil(aw.workflowWorker) && len(aw.registry.getRegisteredWorkflowTypes()) > 0 { + aw.workflowWorker.Stop() + } + return err } - return err } } - if !isInterfaceNil(aw.sessionWorker) { + if !isInterfaceNil(aw.sessionWorker) && len(aw.registry.getRegisteredActivities()) > 0 { + aw.logger.Info("Starting session worker") if err := aw.sessionWorker.Start(); err != nil { // stop workflow worker and activity worker. if !isInterfaceNil(aw.workflowWorker) { diff --git a/internal/internal_workflow.go b/internal/internal_workflow.go index 11d30f87c..508159bc2 100644 --- a/internal/internal_workflow.go +++ b/internal/internal_workflow.go @@ -124,8 +124,8 @@ type ( // Single case statement of the Select selectCase struct { - channel *channelImpl // Channel of this case. - receiveFunc *func(c Channel, more bool) // function to call when channel has a message. nil for send case. + channel *channelImpl // Channel of this case. + receiveFunc *func(c ReceiveChannel, more bool) // function to call when channel has a message. nil for send case. sendFunc *func() // function to call when channel accepted a message. nil for receive case. sendValue *interface{} // value to send to the channel. Used only for send case. @@ -971,12 +971,12 @@ func (d *dispatcherImpl) StackTrace() string { return result } -func (s *selectorImpl) AddReceive(c Channel, f func(c Channel, more bool)) Selector { +func (s *selectorImpl) AddReceive(c ReceiveChannel, f func(c ReceiveChannel, more bool)) Selector { s.cases = append(s.cases, &selectCase{channel: c.(*channelImpl), receiveFunc: &f}) return s } -func (s *selectorImpl) AddSend(c Channel, v interface{}, f func()) Selector { +func (s *selectorImpl) AddSend(c SendChannel, v interface{}, f func()) Selector { s.cases = append(s.cases, &selectCase{channel: c.(*channelImpl), sendFunc: &f, sendValue: &v}) return s } @@ -1246,7 +1246,7 @@ func getHeadersFromContext(ctx Context) *commonpb.Header { } // getSignalChannel finds the associated channel for the signal. -func (w *workflowOptions) getSignalChannel(ctx Context, signalName string) Channel { +func (w *workflowOptions) getSignalChannel(ctx Context, signalName string) ReceiveChannel { if ch, ok := w.signalChannels[signalName]; ok { return ch } diff --git a/internal/internal_workflow_test.go b/internal/internal_workflow_test.go index e7538e9eb..e6134f920 100644 --- a/internal/internal_workflow_test.go +++ b/internal/internal_workflow_test.go @@ -146,7 +146,7 @@ func splitJoinActivityWorkflow(ctx Context, testPanic bool) (result string, err c1.Receive(ctx, nil) // Use selector to test it selected := false - NewSelector(ctx).AddReceive(c2, func(c Channel, more bool) { + NewSelector(ctx).AddReceive(c2, func(c ReceiveChannel, more bool) { if !more { panic("more should be true") } @@ -494,7 +494,7 @@ func signalWorkflowTest(ctx Context) ([]byte, error) { // Read on a selector. ch2 := GetSignalChannel(ctx, "testSig2") s := NewSelector(ctx) - s.AddReceive(ch2, func(c Channel, more bool) { + s.AddReceive(ch2, func(c ReceiveChannel, more bool) { c.Receive(ctx, &v) result += v }) @@ -505,7 +505,7 @@ func signalWorkflowTest(ctx Context) ([]byte, error) { // Read on a selector inside the callback, multiple times. ch2 = GetSignalChannel(ctx, "testSig2") s = NewSelector(ctx) - s.AddReceive(ch2, func(c Channel, more bool) { + s.AddReceive(ch2, func(c ReceiveChannel, more bool) { for i := 0; i < 4; i++ { c.Receive(ctx, &v) result += v @@ -589,7 +589,7 @@ func receiveCorruptSignalOnClosedChannelWorkflowTest(ctx Context) ([]message, er ch := GetSignalChannel(ctx, "channelExpectingTypeMessage") var result []message var m message - ch.Close() + ch.(Channel).Close() more := ch.Receive(ctx, &m) result = append(result, message{Value: fmt.Sprintf("%v", more)}) @@ -602,7 +602,7 @@ func receiveWithSelectorCorruptSignalWorkflowTest(ctx Context) ([]message, error // Read on a selector ch := GetSignalChannel(ctx, "channelExpectingTypeMessage") s := NewSelector(ctx) - s.AddReceive(ch, func(c Channel, more bool) { + s.AddReceive(ch, func(c ReceiveChannel, more bool) { var m message ch.Receive(ctx, &m) result = append(result, m) @@ -612,7 +612,7 @@ func receiveWithSelectorCorruptSignalWorkflowTest(ctx Context) ([]message, error } func receiveAsyncCorruptSignalOnClosedChannelWorkflowTest(ctx Context) ([]int, error) { - ch := GetSignalChannel(ctx, "channelExpectingInt") + ch := GetSignalChannel(ctx, "channelExpectingInt").(Channel) var result []int var m int @@ -627,7 +627,7 @@ func receiveAsyncCorruptSignalOnClosedChannelWorkflowTest(ctx Context) ([]int, e } func receiveAsyncCorruptSignalWorkflowTest(ctx Context) ([]message, error) { - ch := GetSignalChannel(ctx, "channelExpectingTypeMessage") + ch := GetSignalChannel(ctx, "channelExpectingTypeMessage").(Channel) var result []message var m message @@ -799,7 +799,7 @@ func closeChannelInSelectTest(ctx Context) error { s.AddSend(sendCh, struct{}{}, func() { panic("callback for sendCh should not be executed") }) - s.AddReceive(receiveCh, func(c Channel, m bool) { + s.AddReceive(receiveCh, func(c ReceiveChannel, m bool) { c.Receive(ctx, &v) }) s.Select(ctx) diff --git a/internal/internal_workflow_testsuite_test.go b/internal/internal_workflow_testsuite_test.go index e30e2719d..2c358a65b 100644 --- a/internal/internal_workflow_testsuite_test.go +++ b/internal/internal_workflow_testsuite_test.go @@ -2067,8 +2067,8 @@ func (s *WorkflowTestSuiteUnitTest) Test_Channel() { doneCh := NewBufferedChannel(ctx, 100) selector := NewSelector(ctx) - selector.AddReceive(signalCh, func(c Channel, more bool) { - }).AddReceive(doneCh, func(c Channel, more bool) { + selector.AddReceive(signalCh, func(c ReceiveChannel, more bool) { + }).AddReceive(doneCh, func(c ReceiveChannel, more bool) { var doneSignal string c.Receive(ctx, &doneSignal) }) @@ -2480,7 +2480,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_SignalChildWorkflowRetry() { var signal string s.AddFuture(timeout, func(f Future) { signal = "timeout" - }).AddReceive(ch, func(c Channel, more bool) { + }).AddReceive(ch, func(c ReceiveChannel, more bool) { c.Receive(ctx, &signal) }).Select(ctx) diff --git a/internal/session.go b/internal/session.go index 68c4f389d..303882cfd 100644 --- a/internal/session.go +++ b/internal/session.go @@ -332,7 +332,7 @@ func createSession(ctx Context, creationTasklist string, options *SessionOptions var creationErr error var creationResponse sessionCreationResponse s := NewSelector(creationCtx) - s.AddReceive(tasklistChan, func(c Channel, more bool) { + s.AddReceive(tasklistChan, func(c ReceiveChannel, more bool) { c.Receive(creationCtx, &creationResponse) }) s.AddFuture(creationFuture, func(f Future) { diff --git a/internal/workflow.go b/internal/workflow.go index 5b28772bf..d05bf1d8b 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -49,9 +49,20 @@ var ( ) type ( - // Channel must be used instead of native go channel by workflow code. - // Use workflow.NewChannel(ctx) method to create Channel instance. - Channel interface { + // SendChannel is a write only view of the Channel + SendChannel interface { + // Send blocks until the data is sent. + Send(ctx Context, v interface{}) + + // SendAsync try to send without blocking. It returns true if the data was sent, otherwise it returns false. + SendAsync(v interface{}) (ok bool) + + // Close close the Channel, and prohibit subsequent sends. + Close() + } + + // ReceiveChannel is a read only view of the Channel + ReceiveChannel interface { // Receive blocks until it receives a value, and then assigns the received value to the provided pointer. // Returns false when Channel is closed. // Parameter valuePtr is a pointer to the expected data structure to be received. For example: @@ -66,22 +77,20 @@ type ( // ReceiveAsyncWithMoreFlag is same as ReceiveAsync with extra return value more to indicate if there could be // more value from the Channel. The more is false when Channel is closed. ReceiveAsyncWithMoreFlag(valuePtr interface{}) (ok bool, more bool) + } - // Send blocks until the data is sent. - Send(ctx Context, v interface{}) - - // SendAsync try to send without blocking. It returns true if the data was sent, otherwise it returns false. - SendAsync(v interface{}) (ok bool) - - // Close close the Channel, and prohibit subsequent sends. - Close() + // Channel must be used instead of native go channel by workflow code. + // Use workflow.NewChannel(ctx) method to create Channel instance. + Channel interface { + SendChannel + ReceiveChannel } // Selector must be used instead of native go select by workflow code. - // Use workflow.NewSelector(ctx) method to create a Selector instance. + // Create through workflow.NewSelector(ctx). Selector interface { - AddReceive(c Channel, f func(c Channel, more bool)) Selector - AddSend(c Channel, v interface{}, f func()) Selector + AddReceive(c ReceiveChannel, f func(c ReceiveChannel, more bool)) Selector + AddSend(c SendChannel, v interface{}, f func()) Selector AddFuture(future Future, f func(f Future)) Selector AddDefault(f func()) Select(ctx Context) @@ -1045,12 +1054,12 @@ func withContextPropagators(ctx Context, contextPropagators []ContextPropagator) } // GetSignalChannel returns channel corresponding to the signal name. -func GetSignalChannel(ctx Context, signalName string) Channel { +func GetSignalChannel(ctx Context, signalName string) ReceiveChannel { i := getWorkflowInterceptor(ctx) return i.GetSignalChannel(ctx, signalName) } -func (wc *workflowEnvironmentInterceptor) GetSignalChannel(ctx Context, signalName string) Channel { +func (wc *workflowEnvironmentInterceptor) GetSignalChannel(ctx Context, signalName string) ReceiveChannel { return getWorkflowEnvOptions(ctx).getSignalChannel(ctx, signalName) } diff --git a/workflow/deterministic_wrappers.go b/workflow/deterministic_wrappers.go index 86fa29c01..65d8034f1 100644 --- a/workflow/deterministic_wrappers.go +++ b/workflow/deterministic_wrappers.go @@ -34,8 +34,16 @@ type ( // Channel must be used instead of native go channel by workflow code. // Use workflow.NewChannel(ctx) method to create Channel instance. + // Channel extends both ReadChanel and SendChannel. Prefer to use one of these interfaces + // to share Channel with consumers or producers. Channel = internal.Channel + // ReceiveChannel is a read only view of the Channel + ReceiveChannel = internal.ReceiveChannel + + // SendChannel is a write only view of the Channel + SendChannel = internal.SendChannel + // Selector must be used instead of native go select by workflow code. // Use workflow.NewSelector(ctx) method to create a Selector instance. Selector = internal.Selector diff --git a/workflow/workflow.go b/workflow/workflow.go index 9cccc7fd6..b2f58958e 100644 --- a/workflow/workflow.go +++ b/workflow/workflow.go @@ -184,7 +184,7 @@ func SignalExternalWorkflow(ctx Context, workflowID, runID, signalName string, a } // GetSignalChannel returns channel corresponding to the signal name. -func GetSignalChannel(ctx Context, signalName string) Channel { +func GetSignalChannel(ctx Context, signalName string) ReceiveChannel { return internal.GetSignalChannel(ctx, signalName) }