From 8ab62d9e211fe86c3f5a5723f6c5e79a65f083fd Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Wed, 9 Nov 2022 09:15:44 -0800 Subject: [PATCH] Add schedule API (#943) Add experimental schedule API --- Makefile | 2 +- client/client.go | 100 +++ interceptor/interceptor.go | 4 + interceptor/tracing_interceptor.go | 19 + internal/client.go | 5 +- internal/error.go | 6 + internal/interceptor.go | 9 + internal/interceptor_base.go | 5 + internal/internal_schedule_client.go | 789 ++++++++++++++++++++ internal/internal_schedule_client_test.go | 223 ++++++ internal/internal_workflow_client.go | 14 +- internal/schedule_client.go | 604 ++++++++++++++++ mocks/Client.go | 16 + temporal/error.go | 6 + test/integration_test.go | 844 ++++++++++++++++++++++ test/workflow_test.go | 10 + 16 files changed, 2650 insertions(+), 6 deletions(-) create mode 100644 internal/internal_schedule_client.go create mode 100644 internal/internal_schedule_client_test.go create mode 100644 internal/schedule_client.go diff --git a/Makefile b/Makefile index 238f0d30f..982cc15fa 100644 --- a/Makefile +++ b/Makefile @@ -6,7 +6,7 @@ default: check test # general build-product folder, cleaned as part of `make clean` BUILD := .build -TEST_TIMEOUT := 3m +TEST_TIMEOUT := 5m TEST_ARG ?= -race -v -timeout $(TEST_TIMEOUT) INTEG_TEST_ROOT := ./test diff --git a/client/client.go b/client/client.go index e65f8aec5..8881659b4 100644 --- a/client/client.go +++ b/client/client.go @@ -91,6 +91,102 @@ type ( // CheckHealthResponse is a response for Client.CheckHealth. CheckHealthResponse = internal.CheckHealthResponse + // ScheduleRange represents a set of integer values. + // NOTE: Experimental + ScheduleRange = internal.ScheduleRange + + // ScheduleCalendarSpec is an event specification relative to the calendar. + // NOTE: Experimental + ScheduleCalendarSpec = internal.ScheduleCalendarSpec + + // ScheduleIntervalSpec describes periods a schedules action should occur. + // NOTE: Experimental + ScheduleIntervalSpec = internal.ScheduleIntervalSpec + + // ScheduleSpec describes when a schedules action should occur. + // NOTE: Experimental + ScheduleSpec = internal.ScheduleSpec + + // ScheduleBackfill desribes a time periods and policy and takes Actions as if that time passed by right now, all at once. + // NOTE: Experimental + ScheduleBackfill = internal.ScheduleBackfill + + // ScheduleAction is the interface for all actions a schedule can take. + // NOTE: Experimental + ScheduleAction = internal.ScheduleAction + + // ScheduleWorkflowAction is the implementation of ScheduleAction to start a workflow. + // NOTE: Experimental + ScheduleWorkflowAction = internal.ScheduleWorkflowAction + + // ScheduleOptions configuration parameters for creating a schedule. + // NOTE: Experimental + ScheduleOptions = internal.ScheduleOptions + + // ScheduleClient is the interface with the server to create and get handles to schedules. + // NOTE: Experimental + ScheduleClient = internal.ScheduleClient + + // ScheduleListOptions are configuration parameters for listing schedules. + // NOTE: Experimental + ScheduleListOptions = internal.ScheduleListOptions + + // ScheduleListIterator is a iterator which can return created schedules. + // NOTE: Experimental + ScheduleListIterator = internal.ScheduleListIterator + + // ScheduleListEntry is a result from ScheduleListEntry. + // NOTE: Experimental + ScheduleListEntry = internal.ScheduleListEntry + + // ScheduleUpdateOptions are configuration parameters for updating a schedule. + // NOTE: Experimental + ScheduleUpdateOptions = internal.ScheduleUpdateOptions + + // ScheduleHandle represents a created schedule. + // NOTE: Experimental + ScheduleHandle = internal.ScheduleHandle + + // ScheduleActionResult describes when a schedule action took place. + // NOTE: Experimental + ScheduleActionResult = internal.ScheduleActionResult + + // ScheduleWorkflowExecution contains details on a workflows execution stared by a schedule. + // NOTE: Experimental + ScheduleWorkflowExecution = internal.ScheduleWorkflowExecution + + // ScheduleDescription describes the current Schedule details from ScheduleHandle.Describe. + // NOTE: Experimental + ScheduleDescription = internal.ScheduleDescription + + // Schedule describes a created schedule. + // NOTE: Experimental + Schedule = internal.Schedule + + // ScheduleUpdate describes the desired new schedule from ScheduleHandle.Update. + // NOTE: Experimental + ScheduleUpdate = internal.ScheduleUpdate + + // ScheduleUpdateInput describes the current state of the schedule to be updated. + // NOTE: Experimental + ScheduleUpdateInput = internal.ScheduleUpdateInput + + // ScheduleTriggerOptions configure the parameters for triggering a schedule. + // NOTE: Experimental + ScheduleTriggerOptions = internal.ScheduleTriggerOptions + + // SchedulePauseOptions configure the parameters for pausing a schedule. + // NOTE: Experimental + SchedulePauseOptions = internal.SchedulePauseOptions + + // ScheduleUnpauseOptions configure the parameters for unpausing a schedule. + // NOTE: Experimental + ScheduleUnpauseOptions = internal.ScheduleUnpauseOptions + + // ScheduleBackfillOptions configure the parameters for backfilling a schedule. + // NOTE: Experimental + ScheduleBackfillOptions = internal.ScheduleBackfillOptions + // Client is the client for starting and getting information about a workflow executions as well as // completing activities asynchronously. Client interface { @@ -389,6 +485,10 @@ type ( // OperatorService creates a new operator service client with the same gRPC connection as this client. OperatorService() operatorservice.OperatorServiceClient + // Schedule creates a new shedule client with the same gRPC connection as this client. + // NOTE: Experimental + ScheduleClient() ScheduleClient + // Close client and clean up underlying resources. // // If this client was created via NewClientFromExisting or this client has diff --git a/interceptor/interceptor.go b/interceptor/interceptor.go index 79270c49e..008ba922e 100644 --- a/interceptor/interceptor.go +++ b/interceptor/interceptor.go @@ -196,6 +196,10 @@ type ClientTerminateWorkflowInput = internal.ClientTerminateWorkflowInput // ClientOutboundInterceptor.QueryWorkflow. type ClientQueryWorkflowInput = internal.ClientQueryWorkflowInput +// ScheduleClientCreateInput is input for +// ScheduleClientInterceptor.CreateSchedule. +type ScheduleClientCreateInput = internal.ScheduleClientCreateInput + // Header provides Temporal header information from the context for reading or // writing during specific interceptor calls. // diff --git a/interceptor/tracing_interceptor.go b/interceptor/tracing_interceptor.go index a7e5a8e68..5eff9a5cb 100644 --- a/interceptor/tracing_interceptor.go +++ b/interceptor/tracing_interceptor.go @@ -221,6 +221,25 @@ type tracingClientOutboundInterceptor struct { root *tracingInterceptor } +func (t *tracingClientOutboundInterceptor) CreateSchedule(ctx context.Context, in *ScheduleClientCreateInput) (client.ScheduleHandle, error) { + // Start span and write to header + span, ctx, err := t.root.startSpanFromContext(ctx, &TracerStartSpanOptions{ + Operation: "CreateSchedule", + Name: in.Options.ID, + ToHeader: true, + Time: time.Now(), + }) + if err != nil { + return nil, err + } + var finishOpts TracerFinishSpanOptions + defer span.Finish(&finishOpts) + + run, err := t.Next.CreateSchedule(ctx, in) + finishOpts.Error = err + return run, err +} + func (t *tracingClientOutboundInterceptor) ExecuteWorkflow( ctx context.Context, in *ClientExecuteWorkflowInput, diff --git a/internal/client.go b/internal/client.go index 574fd8150..76c4a0ff8 100644 --- a/internal/client.go +++ b/internal/client.go @@ -348,6 +348,9 @@ type ( // OperatorService creates a new operator service client with the same gRPC connection as this client. OperatorService() operatorservice.OperatorServiceClient + // Schedule creates a new shedule client with the same gRPC connection as this client. + ScheduleClient() ScheduleClient + // Close client and clean up underlying resources. Close() } @@ -395,7 +398,7 @@ type ( DataConverter converter.DataConverter // Optional: Sets FailureConverter to customize serialization/deserialization of errors. - // default: temporal.DefaultFailureConverter, does not encode any fields of the error. Use temporal.NewDefaultFailureConverter + // default: temporal.DefaultFailureConverter, does not encode any fields of the error. Use temporal.NewDefaultFailureConverter // options to configure or create a custom converter. FailureConverter converter.FailureConverter diff --git a/internal/error.go b/internal/error.go index 83799cac1..06f15ac57 100644 --- a/internal/error.go +++ b/internal/error.go @@ -271,6 +271,12 @@ var ( // which indicate the activity is not done yet. Then, when the waited human action happened, it needs to trigger something // that could report the activity completed event to temporal server via Client.CompleteActivity() API. ErrActivityResultPending = errors.New("not error: do not autocomplete, using Client.CompleteActivity() to complete") + + // ErrScheduleAlreadyRunning is returned if there's already a running (not deleted) Schedule with the same ID + ErrScheduleAlreadyRunning = errors.New("schedule with this ID is already registered") + + // ErrSkipScheduleUpdate is used by a user if they want to skip updating a schedule. + ErrSkipScheduleUpdate = errors.New("skip schedule update") ) // NewApplicationError create new instance of *ApplicationError with message, type, and optional details. diff --git a/internal/interceptor.go b/internal/interceptor.go index 788d88aa3..471040b88 100644 --- a/internal/interceptor.go +++ b/internal/interceptor.go @@ -278,6 +278,9 @@ type ClientOutboundInterceptor interface { // interceptor.Header will return a non-nil map for this context. ExecuteWorkflow(context.Context, *ClientExecuteWorkflowInput) (WorkflowRun, error) + // CreateSchedule - Intercept a service call to CreateSchedule + CreateSchedule(ctx context.Context, options *ScheduleClientCreateInput) (ScheduleHandle, error) + // SignalWorkflow intercepts client.Client.SignalWorkflow. // interceptor.Header will return a non-nil map for this context. SignalWorkflow(context.Context, *ClientSignalWorkflowInput) error @@ -299,6 +302,12 @@ type ClientOutboundInterceptor interface { mustEmbedClientOutboundInterceptorBase() } +// ScheduleClientCreateInput is the input to +// ClientOutboundInterceptor.CreateSchedule. +type ScheduleClientCreateInput struct { + Options *ScheduleOptions +} + // ClientExecuteWorkflowInput is the input to // ClientOutboundInterceptor.ExecuteWorkflow. type ClientExecuteWorkflowInput struct { diff --git a/internal/interceptor_base.go b/internal/interceptor_base.go index c6c08e958..14452aec0 100644 --- a/internal/interceptor_base.go +++ b/internal/interceptor_base.go @@ -429,4 +429,9 @@ func (c *ClientOutboundInterceptorBase) QueryWorkflow( return c.Next.QueryWorkflow(ctx, in) } +// ExecuteWorkflow implements ClientOutboundInterceptor.CreateSchedule. +func (c *ClientOutboundInterceptorBase) CreateSchedule(ctx context.Context, in *ScheduleClientCreateInput) (ScheduleHandle, error) { + return c.Next.CreateSchedule(ctx, in) +} + func (*ClientOutboundInterceptorBase) mustEmbedClientOutboundInterceptorBase() {} diff --git a/internal/internal_schedule_client.go b/internal/internal_schedule_client.go new file mode 100644 index 000000000..0771cfc23 --- /dev/null +++ b/internal/internal_schedule_client.go @@ -0,0 +1,789 @@ +// The MIT License +// +// Copyright (c) 2022 Temporal Technologies Inc. All rights reserved. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package internal + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/pborman/uuid" + commonpb "go.temporal.io/api/common/v1" + enumspb "go.temporal.io/api/enums/v1" + schedulepb "go.temporal.io/api/schedule/v1" + "go.temporal.io/api/serviceerror" + taskqueuepb "go.temporal.io/api/taskqueue/v1" + workflowpb "go.temporal.io/api/workflow/v1" + "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/sdk/converter" + "go.temporal.io/sdk/internal/common" +) + +type ( + + // ScheduleClient is the client for starting a workflow execution. + scheduleClient struct { + workflowClient *WorkflowClient + } + + // scheduleHandleImpl is the implementation of ScheduleHandle. + scheduleHandleImpl struct { + ID string + client *WorkflowClient + } + + // scheduleListIteratorImpl is the implementation of ScheduleListIterator + scheduleListIteratorImpl struct { + // nextScheduleIndex - Local cached schedules events and corresponding consuming index + nextScheduleIndex int + + // err - From getting the latest page of schedules + err error + + // response - From getting the latest page of schedules + response *workflowservice.ListSchedulesResponse + + // paginate - Function which use a next token to get next page of schedules events + paginate func(nexttoken []byte) (*workflowservice.ListSchedulesResponse, error) + } +) + +func (w *workflowClientInterceptor) CreateSchedule(ctx context.Context, in *ScheduleClientCreateInput) (ScheduleHandle, error) { + // This is always set before interceptor is invoked + ID := in.Options.ID + if ID == "" { + return nil, fmt.Errorf("no schedule ID in options") + } + + dataConverter := WithContext(ctx, w.client.dataConverter) + if dataConverter == nil { + dataConverter = converter.GetDefaultDataConverter() + } + + if in.Options.Action == nil { + return nil, fmt.Errorf("no schedule action in options") + } + action, err := convertToPBScheduleAction(ctx, w.client, in.Options.Action) + if err != nil { + return nil, err + } + + memo, err := getWorkflowMemo(in.Options.Memo, dataConverter) + if err != nil { + return nil, err + } + + searchAttr, err := serializeSearchAttributes(in.Options.SearchAttributes) + if err != nil { + return nil, err + } + + var triggerImmediately *schedulepb.TriggerImmediatelyRequest + if in.Options.TriggerImmediately { + triggerImmediately = &schedulepb.TriggerImmediatelyRequest{ + OverlapPolicy: in.Options.Overlap, + } + } + + backfillRequests := convertToPBBackfillList(in.Options.ScheduleBackfill) + + // Only send an initial patch if we need to. + var initialPatch *schedulepb.SchedulePatch + if in.Options.TriggerImmediately || len(in.Options.ScheduleBackfill) > 0 { + initialPatch = &schedulepb.SchedulePatch{ + TriggerImmediately: triggerImmediately, + BackfillRequest: backfillRequests, + } + } + + // run propagators to extract information about tracing and other stuff, store in headers field + startRequest := &workflowservice.CreateScheduleRequest{ + Namespace: w.client.namespace, + ScheduleId: ID, + RequestId: uuid.New(), + Schedule: &schedulepb.Schedule{ + Spec: convertToPBScheduleSpec(&in.Options.Spec), + Action: action, + Policies: &schedulepb.SchedulePolicies{ + OverlapPolicy: in.Options.Overlap, + CatchupWindow: &in.Options.CatchupWindow, + PauseOnFailure: in.Options.PauseOnFailure, + }, + State: &schedulepb.ScheduleState{ + Notes: in.Options.Note, + Paused: in.Options.Paused, + LimitedActions: in.Options.RemainingActions != 0, + RemainingActions: in.Options.RemainingActions, + }, + }, + InitialPatch: initialPatch, + Identity: w.client.identity, + Memo: memo, + SearchAttributes: searchAttr, + } + + grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx)) + defer cancel() + + _, err = w.client.workflowService.CreateSchedule(grpcCtx, startRequest) + if _, ok := err.(*serviceerror.WorkflowExecutionAlreadyStarted); ok { + return nil, ErrScheduleAlreadyRunning + } + if err != nil { + return nil, err + } + + return &scheduleHandleImpl{ + ID: ID, + client: w.client, + }, nil +} + +func (sc *scheduleClient) Create(ctx context.Context, options ScheduleOptions) (ScheduleHandle, error) { + if err := sc.workflowClient.ensureInitialized(); err != nil { + return nil, err + } + + // Set header before interceptor run + ctx = contextWithNewHeader(ctx) + + // Run via interceptor + return sc.workflowClient.interceptor.CreateSchedule(ctx, &ScheduleClientCreateInput{ + Options: &options, + }) +} + +func (sc *scheduleClient) GetHandle(ctx context.Context, scheduleID string) ScheduleHandle { + return &scheduleHandleImpl{ + ID: scheduleID, + client: sc.workflowClient, + } +} + +func (sc *scheduleClient) List(ctx context.Context, options ScheduleListOptions) (ScheduleListIterator, error) { + paginate := func(nextToken []byte) (*workflowservice.ListSchedulesResponse, error) { + grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx)) + defer cancel() + request := &workflowservice.ListSchedulesRequest{ + Namespace: sc.workflowClient.namespace, + MaximumPageSize: int32(options.PageSize), + NextPageToken: nextToken, + } + + return sc.workflowClient.workflowService.ListSchedules(grpcCtx, request) + } + + return &scheduleListIteratorImpl{ + paginate: paginate, + }, nil +} + +func (iter *scheduleListIteratorImpl) HasNext() bool { + if iter.err == nil { + if iter.response == nil || + (iter.nextScheduleIndex >= len(iter.response.Schedules) && len(iter.response.NextPageToken) > 0) { + iter.response, iter.err = iter.paginate(iter.response.GetNextPageToken()) + iter.nextScheduleIndex = 0 + } + } + return iter.nextScheduleIndex < len(iter.response.GetSchedules()) || iter.err != nil +} + +func (iter *scheduleListIteratorImpl) Next() (*ScheduleListEntry, error) { + if !iter.HasNext() { + panic("ScheduleListIterator Next() called without checking HasNext()") + } else if iter.err != nil { + return nil, iter.err + } + schedule := iter.response.Schedules[iter.nextScheduleIndex] + iter.nextScheduleIndex++ + return convertFromPBScheduleListEntry(schedule), nil +} + +func (scheduleHandle *scheduleHandleImpl) GetID() string { + return scheduleHandle.ID +} + +func (scheduleHandle *scheduleHandleImpl) Delete(ctx context.Context) error { + request := &workflowservice.DeleteScheduleRequest{ + Namespace: scheduleHandle.client.namespace, + ScheduleId: scheduleHandle.ID, + Identity: scheduleHandle.client.identity, + } + grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx)) + defer cancel() + _, err := scheduleHandle.client.workflowService.DeleteSchedule(grpcCtx, request) + return err +} + +func (scheduleHandle *scheduleHandleImpl) Backfill(ctx context.Context, options ScheduleBackfillOptions) error { + request := &workflowservice.PatchScheduleRequest{ + Namespace: scheduleHandle.client.namespace, + ScheduleId: scheduleHandle.ID, + Patch: &schedulepb.SchedulePatch{ + BackfillRequest: convertToPBBackfillList(options.Backfill), + }, + Identity: scheduleHandle.client.identity, + RequestId: uuid.New(), + } + grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx)) + defer cancel() + _, err := scheduleHandle.client.workflowService.PatchSchedule(grpcCtx, request) + return err +} + +func (scheduleHandle *scheduleHandleImpl) Update(ctx context.Context, options ScheduleUpdateOptions) error { + grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx)) + defer cancel() + ctx = contextWithNewHeader(ctx) + + describeRequest := &workflowservice.DescribeScheduleRequest{ + Namespace: scheduleHandle.client.namespace, + ScheduleId: scheduleHandle.ID, + } + describeResponse, err := scheduleHandle.client.workflowService.DescribeSchedule(grpcCtx, describeRequest) + if err != nil { + return err + } + scheduleDescription, err := scheduleDescriptionFromPB(describeResponse) + if err != nil { + return err + } + newSchedule, err := options.DoUpdate(ScheduleUpdateInput{ + Description: *scheduleDescription, + }) + if err != nil { + if errors.Is(err, ErrSkipScheduleUpdate) { + return nil + } + return err + } + newSchedulePB, err := convertToPBSchedule(ctx, scheduleHandle.client, newSchedule.Schedule) + if err != nil { + return err + } + _, err = scheduleHandle.client.workflowService.UpdateSchedule(grpcCtx, &workflowservice.UpdateScheduleRequest{ + Namespace: scheduleHandle.client.namespace, + ScheduleId: scheduleHandle.ID, + Schedule: newSchedulePB, + ConflictToken: nil, + Identity: scheduleHandle.client.identity, + RequestId: uuid.New(), + }) + return err +} + +func (scheduleHandle *scheduleHandleImpl) Describe(ctx context.Context) (*ScheduleDescription, error) { + request := &workflowservice.DescribeScheduleRequest{ + Namespace: scheduleHandle.client.namespace, + ScheduleId: scheduleHandle.ID, + } + grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx)) + defer cancel() + describeResponse, err := scheduleHandle.client.workflowService.DescribeSchedule(grpcCtx, request) + if err != nil { + return nil, err + } + return scheduleDescriptionFromPB(describeResponse) +} + +func (scheduleHandle *scheduleHandleImpl) Trigger(ctx context.Context, options ScheduleTriggerOptions) error { + request := &workflowservice.PatchScheduleRequest{ + Namespace: scheduleHandle.client.namespace, + ScheduleId: scheduleHandle.ID, + Patch: &schedulepb.SchedulePatch{ + TriggerImmediately: &schedulepb.TriggerImmediatelyRequest{ + OverlapPolicy: options.Overlap, + }, + }, + Identity: scheduleHandle.client.identity, + RequestId: uuid.New(), + } + grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx)) + defer cancel() + _, err := scheduleHandle.client.workflowService.PatchSchedule(grpcCtx, request) + return err +} + +func (scheduleHandle *scheduleHandleImpl) Pause(ctx context.Context, options SchedulePauseOptions) error { + pauseNote := "Paused via Go SDK" + if options.Note != "" { + pauseNote = options.Note + } + request := &workflowservice.PatchScheduleRequest{ + Namespace: scheduleHandle.client.namespace, + ScheduleId: scheduleHandle.ID, + Patch: &schedulepb.SchedulePatch{ + Pause: pauseNote, + }, + Identity: scheduleHandle.client.identity, + RequestId: uuid.New(), + } + grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx)) + defer cancel() + _, err := scheduleHandle.client.workflowService.PatchSchedule(grpcCtx, request) + return err +} + +func (scheduleHandle *scheduleHandleImpl) Unpause(ctx context.Context, options ScheduleUnpauseOptions) error { + unpauseNote := "Unpaused via Go SDK" + if options.Note != "" { + unpauseNote = options.Note + } + request := &workflowservice.PatchScheduleRequest{ + Namespace: scheduleHandle.client.namespace, + ScheduleId: scheduleHandle.ID, + Patch: &schedulepb.SchedulePatch{ + Unpause: unpauseNote, + }, + Identity: scheduleHandle.client.identity, + RequestId: uuid.New(), + } + grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx)) + defer cancel() + _, err := scheduleHandle.client.workflowService.PatchSchedule(grpcCtx, request) + return err +} + +func convertToPBScheduleSpec(scheduleSpec *ScheduleSpec) *schedulepb.ScheduleSpec { + if scheduleSpec == nil { + return nil + } + + calendar := convertToPBScheduleCalendarSpecList(scheduleSpec.Calendars) + + intervals := make([]*schedulepb.IntervalSpec, len(scheduleSpec.Intervals)) + for i, interval := range scheduleSpec.Intervals { + intervalSpec := interval + intervals[i] = &schedulepb.IntervalSpec{ + Interval: &intervalSpec.Every, + Phase: &intervalSpec.Offset, + } + } + + skip := convertToPBScheduleCalendarSpecList(scheduleSpec.Skip) + + var startTime *time.Time + if !scheduleSpec.StartAt.IsZero() { + startTime = &scheduleSpec.StartAt + } + + var endTime *time.Time + if !scheduleSpec.EndAt.IsZero() { + endTime = &scheduleSpec.EndAt + } + + return &schedulepb.ScheduleSpec{ + StructuredCalendar: calendar, + Interval: intervals, + CronString: scheduleSpec.CronExpressions, + ExcludeStructuredCalendar: skip, + StartTime: startTime, + EndTime: endTime, + Jitter: &scheduleSpec.Jitter, + // TODO support custom time zone data + TimezoneName: scheduleSpec.TimeZoneName, + } +} + +func convertFromPBScheduleSpec(scheduleSpec *schedulepb.ScheduleSpec) *ScheduleSpec { + if scheduleSpec == nil { + return nil + } + + calendars := convertFromPBScheduleCalendarSpecList(scheduleSpec.GetStructuredCalendar()) + + intervals := make([]ScheduleIntervalSpec, len(scheduleSpec.GetInterval())) + for i, s := range scheduleSpec.GetInterval() { + intervals[i] = ScheduleIntervalSpec{ + Every: common.DurationValue(s.Interval), + Offset: common.DurationValue(s.Phase), + } + } + + skip := convertFromPBScheduleCalendarSpecList(scheduleSpec.GetExcludeStructuredCalendar()) + + startAt := time.Time{} + if scheduleSpec.GetStartTime() != nil { + startAt = *scheduleSpec.GetStartTime() + } + + endAt := time.Time{} + if scheduleSpec.GetEndTime() != nil { + endAt = *scheduleSpec.GetEndTime() + } + + return &ScheduleSpec{ + Calendars: calendars, + Intervals: intervals, + Skip: skip, + StartAt: startAt, + EndAt: endAt, + Jitter: common.DurationValue(scheduleSpec.GetJitter()), + TimeZoneName: scheduleSpec.GetTimezoneName(), + } +} + +func scheduleDescriptionFromPB(describeResponse *workflowservice.DescribeScheduleResponse) (*ScheduleDescription, error) { + if describeResponse == nil { + return nil, nil + } + + runningWorkflows := make([]ScheduleWorkflowExecution, len(describeResponse.Info.GetRunningWorkflows())) + for i, s := range describeResponse.Info.GetRunningWorkflows() { + runningWorkflows[i] = ScheduleWorkflowExecution{ + WorkflowID: s.GetWorkflowId(), + FirstExecutionRunID: s.GetRunId(), + } + } + + recentActions := convertFromPBScheduleActionResultList(describeResponse.Info.GetRecentActions()) + + nextActionTimes := make([]time.Time, len(describeResponse.Info.GetFutureActionTimes())) + for i, t := range describeResponse.Info.GetFutureActionTimes() { + nextActionTimes[i] = common.TimeValue(t) + } + + actionDescription, err := convertFromPBScheduleAction(describeResponse.Schedule.Action) + if err != nil { + return nil, err + } + + return &ScheduleDescription{ + Schedule: Schedule{ + Action: actionDescription, + Spec: convertFromPBScheduleSpec(describeResponse.Schedule.Spec), + Policy: &SchedulePolicies{ + Overlap: describeResponse.Schedule.Policies.GetOverlapPolicy(), + CatchupWindow: common.DurationValue(describeResponse.Schedule.Policies.GetCatchupWindow()), + PauseOnFailure: describeResponse.Schedule.Policies.GetPauseOnFailure(), + }, + State: &ScheduleState{ + Note: describeResponse.Schedule.State.GetNotes(), + Paused: describeResponse.Schedule.State.GetPaused(), + LimitedActions: describeResponse.Schedule.State.GetLimitedActions(), + RemainingActions: int(describeResponse.Schedule.State.GetRemainingActions()), + }, + }, + Info: ScheduleInfo{ + NumActions: int(describeResponse.Info.ActionCount), + NumActionsMissedCatchupWindow: int(describeResponse.Info.MissedCatchupWindow), + NumActionsSkippedOverlap: int(describeResponse.Info.OverlapSkipped), + RunningWorkflows: runningWorkflows, + RecentActions: recentActions, + NextActionTimes: nextActionTimes, + CreatedAt: common.TimeValue(describeResponse.Info.GetCreateTime()), + LastUpdateAt: common.TimeValue(describeResponse.Info.GetUpdateTime()), + }, + Memo: describeResponse.Memo, + SearchAttributes: describeResponse.SearchAttributes, + }, nil +} + +func convertToPBSchedule(ctx context.Context, client *WorkflowClient, schedule *Schedule) (*schedulepb.Schedule, error) { + if schedule == nil { + return nil, nil + } + action, err := convertToPBScheduleAction(ctx, client, schedule.Action) + if err != nil { + return nil, err + } + return &schedulepb.Schedule{ + Spec: convertToPBScheduleSpec(schedule.Spec), + Action: action, + Policies: &schedulepb.SchedulePolicies{ + OverlapPolicy: schedule.Policy.Overlap, + CatchupWindow: &schedule.Policy.CatchupWindow, + PauseOnFailure: schedule.Policy.PauseOnFailure, + }, + State: &schedulepb.ScheduleState{ + Notes: schedule.State.Note, + Paused: schedule.State.Paused, + LimitedActions: schedule.State.LimitedActions, + RemainingActions: int64(schedule.State.RemainingActions), + }, + }, nil +} + +func convertFromPBScheduleListEntry(schedule *schedulepb.ScheduleListEntry) *ScheduleListEntry { + scheduleInfo := schedule.GetInfo() + + recentActions := convertFromPBScheduleActionResultList(scheduleInfo.GetRecentActions()) + + nextActionTimes := make([]time.Time, len(schedule.Info.GetFutureActionTimes())) + for i, t := range schedule.Info.GetFutureActionTimes() { + nextActionTimes[i] = common.TimeValue(t) + } + + return &ScheduleListEntry{ + ID: schedule.ScheduleId, + Spec: convertFromPBScheduleSpec(scheduleInfo.GetSpec()), + Note: scheduleInfo.GetNotes(), + Paused: scheduleInfo.GetPaused(), + WorkflowType: WorkflowType{ + Name: scheduleInfo.GetWorkflowType().GetName(), + }, + RecentActions: recentActions, + NextActionTimes: nextActionTimes, + Memo: schedule.Memo, + SearchAttributes: schedule.SearchAttributes, + } +} + +func convertToPBScheduleAction(ctx context.Context, client *WorkflowClient, scheduleAction ScheduleAction) (*schedulepb.ScheduleAction, error) { + switch action := scheduleAction.(type) { + case ScheduleWorkflowAction: + // Set header before interceptor run + dataConverter := WithContext(ctx, client.dataConverter) + + // Default workflow ID + if action.ID == "" { + action.ID = uuid.New() + } + + // Validate function and get name + if err := validateFunctionArgs(action.Workflow, action.Args, true); err != nil { + return nil, err + } + workflowType, err := getWorkflowFunctionName(client.registry, action.Workflow) + if err != nil { + return nil, err + } + // Encode workflow inputs that may already be encoded + input, err := encodeScheduleWorklowArgs(dataConverter, action.Args) + if err != nil { + return nil, err + } + // Encode workflow memos that may already be encoded + memo, err := encodeScheduleWorkflowMemo(dataConverter, action.Memo) + if err != nil { + return nil, err + } + + searchAttr, err := serializeSearchAttributes(action.SearchAttributes) + if err != nil { + return nil, err + } + + // get workflow headers from the context + header, err := headerPropagated(ctx, client.contextPropagators) + if err != nil { + return nil, err + } + + return &schedulepb.ScheduleAction{ + Action: &schedulepb.ScheduleAction_StartWorkflow{ + StartWorkflow: &workflowpb.NewWorkflowExecutionInfo{ + WorkflowId: action.ID, + WorkflowType: &commonpb.WorkflowType{Name: workflowType}, + TaskQueue: &taskqueuepb.TaskQueue{Name: action.TaskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, + Input: input, + WorkflowExecutionTimeout: &action.WorkflowExecutionTimeout, + WorkflowRunTimeout: &action.WorkflowRunTimeout, + WorkflowTaskTimeout: &action.WorkflowTaskTimeout, + WorkflowIdReusePolicy: action.WorkflowIDReusePolicy, + RetryPolicy: convertToPBRetryPolicy(action.RetryPolicy), + Memo: memo, + SearchAttributes: searchAttr, + Header: header, + }, + }, + }, nil + default: + // TODO maybe just panic instead? + return nil, fmt.Errorf("could not parse ScheduleAction") + } +} + +func convertFromPBScheduleAction(action *schedulepb.ScheduleAction) (ScheduleAction, error) { + switch action := action.Action.(type) { + case *schedulepb.ScheduleAction_StartWorkflow: + workflow := action.StartWorkflow + + args := make([]interface{}, len(workflow.GetInput().GetPayloads())) + for i, p := range workflow.GetInput().GetPayloads() { + args[i] = p + } + + memos := make(map[string]interface{}) + for key, element := range workflow.GetMemo().GetFields() { + memos[key] = element + } + + searchAttributes := make(map[string]interface{}) + for key, element := range workflow.GetSearchAttributes().GetIndexedFields() { + searchAttributes[key] = element + } + + return ScheduleWorkflowAction{ + ID: workflow.GetWorkflowId(), + Workflow: workflow.WorkflowType.GetName(), + Args: args, + TaskQueue: workflow.TaskQueue.GetName(), + WorkflowExecutionTimeout: common.DurationValue(workflow.GetWorkflowExecutionTimeout()), + WorkflowRunTimeout: common.DurationValue(workflow.GetWorkflowRunTimeout()), + WorkflowTaskTimeout: common.DurationValue(workflow.GetWorkflowTaskTimeout()), + WorkflowIDReusePolicy: workflow.GetWorkflowIdReusePolicy(), + RetryPolicy: convertFromPBRetryPolicy(workflow.RetryPolicy), + Memo: memos, + SearchAttributes: searchAttributes, + }, nil + default: + // TODO maybe just panic instead? + return nil, fmt.Errorf("could not parse ScheduleAction") + } +} + +func convertToPBBackfillList(backfillRequests []ScheduleBackfill) []*schedulepb.BackfillRequest { + backfillRequestsPB := make([]*schedulepb.BackfillRequest, len(backfillRequests)) + for i, b := range backfillRequests { + backfill := b + backfillRequestsPB[i] = &schedulepb.BackfillRequest{ + StartTime: &backfill.Start, + EndTime: &backfill.End, + OverlapPolicy: backfill.Overlap, + } + } + return backfillRequestsPB +} + +func convertToPBRangeList(scheduleRange []ScheduleRange) []*schedulepb.Range { + rangesPB := make([]*schedulepb.Range, len(scheduleRange)) + for i, r := range scheduleRange { + rangesPB[i] = &schedulepb.Range{ + Start: int32(r.Start), + End: int32(r.End), + Step: int32(r.Step), + } + } + return rangesPB +} + +func convertFromPBRangeList(scheduleRangePB []*schedulepb.Range) []ScheduleRange { + scheduleRange := make([]ScheduleRange, len(scheduleRangePB)) + for i, r := range scheduleRangePB { + if r == nil { + continue + } + scheduleRange[i] = ScheduleRange{ + Start: int(r.Start), + End: int(r.End), + Step: int(r.Step), + } + } + return scheduleRange +} + +func convertFromPBScheduleCalendarSpecList(calendarSpecPB []*schedulepb.StructuredCalendarSpec) []ScheduleCalendarSpec { + calendarSpec := make([]ScheduleCalendarSpec, len(calendarSpecPB)) + for i, e := range calendarSpecPB { + calendarSpec[i] = ScheduleCalendarSpec{ + Second: convertFromPBRangeList(e.Second), + Minute: convertFromPBRangeList(e.Minute), + Hour: convertFromPBRangeList(e.Hour), + DayOfMonth: convertFromPBRangeList(e.DayOfMonth), + Month: convertFromPBRangeList(e.Month), + Year: convertFromPBRangeList(e.Year), + DayOfWeek: convertFromPBRangeList(e.DayOfWeek), + Comment: e.Comment, + } + } + return calendarSpec +} + +func convertToPBScheduleCalendarSpecList(calendarSpec []ScheduleCalendarSpec) []*schedulepb.StructuredCalendarSpec { + calendarSpecPB := make([]*schedulepb.StructuredCalendarSpec, len(calendarSpec)) + for i, e := range calendarSpec { + calendarSpecPB[i] = &schedulepb.StructuredCalendarSpec{ + Second: convertToPBRangeList(e.Second), + Minute: convertToPBRangeList(e.Minute), + Hour: convertToPBRangeList(e.Hour), + DayOfMonth: convertToPBRangeList(e.DayOfMonth), + Month: convertToPBRangeList(e.Month), + Year: convertToPBRangeList(e.Year), + DayOfWeek: convertToPBRangeList(e.DayOfWeek), + Comment: e.Comment, + } + } + return calendarSpecPB +} + +func convertFromPBScheduleActionResultList(aa []*schedulepb.ScheduleActionResult) []ScheduleActionResult { + recentActions := make([]ScheduleActionResult, len(aa)) + for i, a := range aa { + var workflowExecution *ScheduleWorkflowExecution + if a.GetStartWorkflowResult() != nil { + workflowExecution = &ScheduleWorkflowExecution{ + WorkflowID: a.GetStartWorkflowResult().GetWorkflowId(), + FirstExecutionRunID: a.GetStartWorkflowResult().GetRunId(), + } + } + recentActions[i] = ScheduleActionResult{ + ScheduleTime: common.TimeValue(a.GetScheduleTime()), + ActualTime: common.TimeValue(a.GetActualTime()), + StartWorkflowResult: workflowExecution, + } + } + return recentActions +} + +func encodeScheduleWorklowArgs(dc converter.DataConverter, args []interface{}) (*commonpb.Payloads, error) { + payloads := make([]*commonpb.Payload, len(args)) + for i, arg := range args { + // arg is already encoded + if enc, ok := arg.(*commonpb.Payload); ok { + payloads[i] = enc + } else { + payload, err := dc.ToPayload(arg) + if err != nil { + return nil, err + } + payloads[i] = payload + } + + } + return &commonpb.Payloads{ + Payloads: payloads, + }, nil +} + +func encodeScheduleWorkflowMemo(dc converter.DataConverter, input map[string]interface{}) (*commonpb.Memo, error) { + if input == nil { + return nil, nil + } + + memo := make(map[string]*commonpb.Payload) + for k, v := range input { + if enc, ok := v.(*commonpb.Payload); ok { + memo[k] = enc + } else { + memoBytes, err := converter.GetDefaultDataConverter().ToPayload(v) + if err != nil { + return nil, fmt.Errorf("encode workflow memo error: %v", err.Error()) + } + memo[k] = memoBytes + } + } + return &commonpb.Memo{Fields: memo}, nil +} diff --git a/internal/internal_schedule_client_test.go b/internal/internal_schedule_client_test.go new file mode 100644 index 000000000..2a212e73f --- /dev/null +++ b/internal/internal_schedule_client_test.go @@ -0,0 +1,223 @@ +// The MIT License +// +// Copyright (c) 2022 Temporal Technologies Inc. All rights reserved. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package internal + +import ( + "context" + "testing" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/suite" + schedulepb "go.temporal.io/api/schedule/v1" + "go.temporal.io/api/serviceerror" + "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/api/workflowservicemock/v1" + "go.temporal.io/sdk/converter" +) + +const ( + scheduleID = "some random schedule ID" +) + +// schedule client test suite +type ( + scheduleClientTestSuite struct { + suite.Suite + mockCtrl *gomock.Controller + service *workflowservicemock.MockWorkflowServiceClient + client Client + dataConverter converter.DataConverter + } +) + +func TestScheduleClientSuite(t *testing.T) { + suite.Run(t, new(scheduleClientTestSuite)) +} + +func (s *scheduleClientTestSuite) SetupTest() { + s.mockCtrl = gomock.NewController(s.T()) + s.service = workflowservicemock.NewMockWorkflowServiceClient(s.mockCtrl) + s.service.EXPECT().GetSystemInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(&workflowservice.GetSystemInfoResponse{}, nil).AnyTimes() + s.client = NewServiceClient(s.service, nil, ClientOptions{}) + s.dataConverter = converter.GetDefaultDataConverter() +} + +func (s *scheduleClientTestSuite) TearDownTest() { + s.mockCtrl.Finish() // assert mock’s expectations +} + + +func (s *scheduleClientTestSuite) TestCreateScheduleClient() { + wf := func(ctx Context) string { + panic("this is just a stub") + } + options := ScheduleOptions{ + ID: scheduleID, + Spec: ScheduleSpec{ + CronExpressions: []string{"*"}, + }, + Action: ScheduleWorkflowAction{ + Workflow: wf, + ID: workflowID, + TaskQueue: taskqueue, + WorkflowExecutionTimeout: timeoutInSeconds, + WorkflowTaskTimeout: timeoutInSeconds, + }, + } + createResp := &workflowservice.CreateScheduleResponse{} + s.service.EXPECT().CreateSchedule(gomock.Any(), gomock.Any(), gomock.Any()).Return(createResp, nil).Times(1) + + scheduleHandle, err := s.client.ScheduleClient().Create(context.Background(), options) + s.Nil(err) + s.Equal(scheduleHandle.GetID(), scheduleID) +} + +func (s *scheduleClientTestSuite) TestCreateScheduleWithMemoAndSearchAttr() { + memo := map[string]interface{}{ + "testMemo": "memo value", + } + searchAttributes := map[string]interface{}{ + "testAttr": "attr value", + } + + wf := func(ctx Context) string { + panic("this is just a stub") + } + + options := ScheduleOptions{ + ID: scheduleID, + Spec: ScheduleSpec{ + CronExpressions: []string{"*"}, + }, + Action: ScheduleWorkflowAction{ + Workflow: wf, + ID: "wid", + TaskQueue: taskqueue, + WorkflowExecutionTimeout: timeoutInSeconds, + WorkflowTaskTimeout: timeoutInSeconds, + }, + Memo: memo, + SearchAttributes: searchAttributes, + } + createResp := &workflowservice.CreateScheduleResponse{} + + s.service.EXPECT().CreateSchedule(gomock.Any(), gomock.Any(), gomock.Any()).Return(createResp, nil). + Do(func(_ interface{}, req *workflowservice.CreateScheduleRequest, _ ...interface{}) { + var resultMemo, resultAttr string + // verify the schedules memo and search attributes + err := converter.GetDefaultDataConverter().FromPayload(req.Memo.Fields["testMemo"], &resultMemo) + s.NoError(err) + s.Equal("memo value", resultMemo) + + err = converter.GetDefaultDataConverter().FromPayload(req.SearchAttributes.IndexedFields["testAttr"], &resultAttr) + s.NoError(err) + s.Equal("attr value", resultAttr) + }) + _, _ = s.client.ScheduleClient().Create(context.Background(), options) +} + +func getListSchedulesRequest() *workflowservice.ListSchedulesRequest { + request := &workflowservice.ListSchedulesRequest{ + Namespace: DefaultNamespace, + } + + return request +} + +// ScheduleIterator + +func (s *scheduleClientTestSuite) TestScheduleIterator_NoError() { + request1 := getListSchedulesRequest() + response1 := &workflowservice.ListSchedulesResponse{ + Schedules: []*schedulepb.ScheduleListEntry{ + { + ScheduleId: "", + }, + }, + NextPageToken: []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + } + request2 := getListSchedulesRequest() + request2.NextPageToken = response1.NextPageToken + response2 := &workflowservice.ListSchedulesResponse{ + Schedules: []*schedulepb.ScheduleListEntry{ + { + ScheduleId: "", + }, + }, + NextPageToken: []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + } + + request3 := getListSchedulesRequest() + request3.NextPageToken = response2.NextPageToken + response3 := &workflowservice.ListSchedulesResponse{ + Schedules: []*schedulepb.ScheduleListEntry{ + { + ScheduleId: "", + }, + }, + NextPageToken: nil, + } + + s.service.EXPECT().ListSchedules(gomock.Any(), request1, gomock.Any()).Return(response1, nil).Times(1) + s.service.EXPECT().ListSchedules(gomock.Any(), request2, gomock.Any()).Return(response2, nil).Times(1) + s.service.EXPECT().ListSchedules(gomock.Any(), request3, gomock.Any()).Return(response3, nil).Times(1) + + var events []*ScheduleListEntry + iter, _ := s.client.ScheduleClient().List(context.Background(), ScheduleListOptions{}) + for iter.HasNext() { + event, err := iter.Next() + s.Nil(err) + events = append(events, event) + } + s.Equal(3, len(events)) +} + +func (s *scheduleClientTestSuite) TestIteratorError() { + request1 := getListSchedulesRequest() + response1 := &workflowservice.ListSchedulesResponse{ + Schedules: []*schedulepb.ScheduleListEntry{ + { + ScheduleId: "", + }, + }, + NextPageToken: []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + } + request2 := getListSchedulesRequest() + request2.NextPageToken = response1.NextPageToken + + s.service.EXPECT().ListSchedules(gomock.Any(), request1, gomock.Any()).Return(response1, nil).Times(1) + + iter, _ := s.client.ScheduleClient().List(context.Background(), ScheduleListOptions{}) + + s.True(iter.HasNext()) + event, err := iter.Next() + s.NotNil(event) + s.Nil(err) + + s.service.EXPECT().ListSchedules(gomock.Any(), request2, gomock.Any()).Return(nil, serviceerror.NewNotFound("")).Times(1) + + s.True(iter.HasNext()) + event, err = iter.Next() + s.Nil(event) + s.NotNil(err) +} \ No newline at end of file diff --git a/internal/internal_workflow_client.go b/internal/internal_workflow_client.go index 3f0f22880..fc9bdc477 100644 --- a/internal/internal_workflow_client.go +++ b/internal/internal_workflow_client.go @@ -44,15 +44,14 @@ import ( "go.temporal.io/api/serviceerror" taskqueuepb "go.temporal.io/api/taskqueue/v1" "go.temporal.io/api/workflowservice/v1" - uberatomic "go.uber.org/atomic" - "google.golang.org/grpc" - healthpb "google.golang.org/grpc/health/grpc_health_v1" - "go.temporal.io/sdk/converter" "go.temporal.io/sdk/internal/common/metrics" "go.temporal.io/sdk/internal/common/serializer" "go.temporal.io/sdk/internal/common/util" "go.temporal.io/sdk/log" + uberatomic "go.uber.org/atomic" + "google.golang.org/grpc" + healthpb "google.golang.org/grpc/health/grpc_health_v1" ) // Assert that structs do indeed implement the interfaces @@ -954,6 +953,13 @@ func (wc *WorkflowClient) ensureInitialized() error { return err } +// ScheduleClient implements Client.ScheduleClient. +func (wc *WorkflowClient) ScheduleClient() ScheduleClient { + return &scheduleClient { + workflowClient: wc, + } +} + // Close client and clean up underlying resources. func (wc *WorkflowClient) Close() { // If there's a set of unclosed clients, we have to decrement it and then diff --git a/internal/schedule_client.go b/internal/schedule_client.go new file mode 100644 index 000000000..33464f3eb --- /dev/null +++ b/internal/schedule_client.go @@ -0,0 +1,604 @@ +// The MIT License +// +// Copyright (c) 2022 Temporal Technologies Inc. All rights reserved. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package internal + +import ( + "context" + "time" + + commonpb "go.temporal.io/api/common/v1" + enumspb "go.temporal.io/api/enums/v1" +) + +type ( + // ScheduleRange represents a set of integer values, used to match fields of a calendar + // time in StructuredCalendarSpec. If end < start, then end is interpreted as + // equal to start. This means you can use a Range with start set to a value, and + // end and step unset (defaulting to 0) to represent a single value. + ScheduleRange struct { + // Start of the range (inclusive) + Start int + + // End of the range (inclusive) + // Optional: defaulted to Start + End int + + // Step to be take between each value + // Optional: defaulted to 1 + Step int + } + + // ScheduleCalendarSpec is an event specification relative to the calendar, similar to a traditional cron specification. + // A timestamp matches if at least one range of each field matches the + // corresponding fields of the timestamp, except for year: if year is missing, + // that means all years match. For all fields besides year, at least one Range + ScheduleCalendarSpec struct { + // Second range to match (0-59). + Second []ScheduleRange + + // Minute range to match (0-59). + Minute []ScheduleRange + + // Hour range to match (0-23). + Hour []ScheduleRange + + // DayOfMonth range to match (1-31) + DayOfMonth []ScheduleRange + + // Month range to match (1-12) + Month []ScheduleRange + + // Year range to match. + // Optional: Defaulted to "*" + Year []ScheduleRange + + // DayOfWeek range to match (0-6; 0 is Sunday) + DayOfWeek []ScheduleRange + + // Comment - Description of the intention of this schedule. + Comment string + } + + // ScheduleBackfill desribes a time periods and policy and takes Actions as if that time passed by right now, all at once. + ScheduleBackfill struct { + // Start - start of the range to evaluate schedule in. + Start time.Time + + // End - end of the range to evaluate schedule in. + End time.Time + + // Overlap - Override the Overlap Policy for this request. + Overlap enumspb.ScheduleOverlapPolicy + } + + // ScheduleIntervalSpec - matches times that can be expressed as: + // + // Epoch + (n * every) + offset + // + // where n is all integers ≥ 0. + // + // For example, an `every` of 1 hour with `offset` of zero would match every hour, on the hour. The same `every` but an `offset` + // of 19 minutes would match every `xx:19:00`. An `every` of 28 days with `offset` zero would match `2022-02-17T00:00:00Z` + // (among other times). The same `every` with `offset` of 3 days, 5 hours, and 23 minutes would match `2022-02-20T05:23:00Z` + // instead. + ScheduleIntervalSpec struct { + // Every - describes the period to repeat the interval. + Every time.Duration + + // Offset - is a fixed offset added to the intervals period. + // Optional: Defaulted to 0 + Offset time.Duration + } + + // ScheduleSpec is a complete description of a set of absolute times (possibly infinite) that a action should occur at. + // The times are the union of Calendars, Intervals, and CronExpressions, minus the Skip times. These times + // never change, except that the definition of a time zone can change over time (most commonly, when daylight saving + // time policy changes for an area). To create a totally self-contained ScheduleSpec, use UTC. + ScheduleSpec struct { + // Calendars - Calendar-based specifications of times + Calendars []ScheduleCalendarSpec + + // Intervals - Interval-based specifications of times. + Intervals []ScheduleIntervalSpec + + // CronExpressions - CronExpressions-based specifications of times. CronExpressions is provided for easy migration from legacy Cron Workflows. For new + // use cases, we recommend using ScheduleSpec.Calendars or ScheduleSpec.Intervals for readability and maintainability. Once a schedule is created all + // expressions in CronExpressions will be translated to ScheduleSpec.Calendars on the server. + // + // For example, `0 12 * * MON-WED,FRI` is every M/Tu/W/F at noon, and is equivalent to this ScheduleCalendarSpec: + // + // client.ScheduleCalendarSpec{ + // Second: []ScheduleRange{{}}, + // Minute: []ScheduleRanges{{}}, + // Hour: []ScheduleRange{{ + // Start: 12, + // }}, + // DayOfMonth: []ScheduleRange{ + // { + // Start: 1, + // End: 31, + // }, + // }, + // Month: []ScheduleRange{ + // { + // Start: 1, + // End: 12, + // }, + // }, + // DayOfWeek: []ScheduleRange{ + // { + // Start: 1, + // End: 3, + // }, + // { + // Start: 5, + // }, + // }, + // } + // + // + // The string can have 5, 6, or 7 fields, separated by spaces, and they are interpreted in the + // same way as a ScheduleCalendarSpec: + // - 5 fields: Minute, Hour, DayOfMonth, Month, DayOfWeek + // - 6 fields: Minute, Hour, DayOfMonth, Month, DayOfWeek, Year + // - 7 fields: Second, Minute, Hour, DayOfMonth, Month, DayOfWeek, Year + // + // Notes: + // - If Year is not given, it defaults to *. + // - If Second is not given, it defaults to 0. + // - Shorthands @yearly, @monthly, @weekly, @daily, and @hourly are also + // accepted instead of the 5-7 time fields. + // - @every [/] is accepted and gets compiled into an + // IntervalSpec instead. and should be a decimal integer + // with a unit suffix s, m, h, or d. + // - Optionally, the string can be preceded by CRON_TZ=