Skip to content

Commit

Permalink
Add schedule API (#943)
Browse files Browse the repository at this point in the history
Add experimental schedule API
  • Loading branch information
Quinn-With-Two-Ns committed Nov 9, 2022
1 parent 620fa72 commit 8ab62d9
Show file tree
Hide file tree
Showing 16 changed files with 2,650 additions and 6 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ default: check test
# general build-product folder, cleaned as part of `make clean`
BUILD := .build

TEST_TIMEOUT := 3m
TEST_TIMEOUT := 5m
TEST_ARG ?= -race -v -timeout $(TEST_TIMEOUT)

INTEG_TEST_ROOT := ./test
Expand Down
100 changes: 100 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,102 @@ type (
// CheckHealthResponse is a response for Client.CheckHealth.
CheckHealthResponse = internal.CheckHealthResponse

// ScheduleRange represents a set of integer values.
// NOTE: Experimental
ScheduleRange = internal.ScheduleRange

// ScheduleCalendarSpec is an event specification relative to the calendar.
// NOTE: Experimental
ScheduleCalendarSpec = internal.ScheduleCalendarSpec

// ScheduleIntervalSpec describes periods a schedules action should occur.
// NOTE: Experimental
ScheduleIntervalSpec = internal.ScheduleIntervalSpec

// ScheduleSpec describes when a schedules action should occur.
// NOTE: Experimental
ScheduleSpec = internal.ScheduleSpec

// ScheduleBackfill desribes a time periods and policy and takes Actions as if that time passed by right now, all at once.
// NOTE: Experimental
ScheduleBackfill = internal.ScheduleBackfill

// ScheduleAction is the interface for all actions a schedule can take.
// NOTE: Experimental
ScheduleAction = internal.ScheduleAction

// ScheduleWorkflowAction is the implementation of ScheduleAction to start a workflow.
// NOTE: Experimental
ScheduleWorkflowAction = internal.ScheduleWorkflowAction

// ScheduleOptions configuration parameters for creating a schedule.
// NOTE: Experimental
ScheduleOptions = internal.ScheduleOptions

// ScheduleClient is the interface with the server to create and get handles to schedules.
// NOTE: Experimental
ScheduleClient = internal.ScheduleClient

// ScheduleListOptions are configuration parameters for listing schedules.
// NOTE: Experimental
ScheduleListOptions = internal.ScheduleListOptions

// ScheduleListIterator is a iterator which can return created schedules.
// NOTE: Experimental
ScheduleListIterator = internal.ScheduleListIterator

// ScheduleListEntry is a result from ScheduleListEntry.
// NOTE: Experimental
ScheduleListEntry = internal.ScheduleListEntry

// ScheduleUpdateOptions are configuration parameters for updating a schedule.
// NOTE: Experimental
ScheduleUpdateOptions = internal.ScheduleUpdateOptions

// ScheduleHandle represents a created schedule.
// NOTE: Experimental
ScheduleHandle = internal.ScheduleHandle

// ScheduleActionResult describes when a schedule action took place.
// NOTE: Experimental
ScheduleActionResult = internal.ScheduleActionResult

// ScheduleWorkflowExecution contains details on a workflows execution stared by a schedule.
// NOTE: Experimental
ScheduleWorkflowExecution = internal.ScheduleWorkflowExecution

// ScheduleDescription describes the current Schedule details from ScheduleHandle.Describe.
// NOTE: Experimental
ScheduleDescription = internal.ScheduleDescription

// Schedule describes a created schedule.
// NOTE: Experimental
Schedule = internal.Schedule

// ScheduleUpdate describes the desired new schedule from ScheduleHandle.Update.
// NOTE: Experimental
ScheduleUpdate = internal.ScheduleUpdate

// ScheduleUpdateInput describes the current state of the schedule to be updated.
// NOTE: Experimental
ScheduleUpdateInput = internal.ScheduleUpdateInput

// ScheduleTriggerOptions configure the parameters for triggering a schedule.
// NOTE: Experimental
ScheduleTriggerOptions = internal.ScheduleTriggerOptions

// SchedulePauseOptions configure the parameters for pausing a schedule.
// NOTE: Experimental
SchedulePauseOptions = internal.SchedulePauseOptions

// ScheduleUnpauseOptions configure the parameters for unpausing a schedule.
// NOTE: Experimental
ScheduleUnpauseOptions = internal.ScheduleUnpauseOptions

// ScheduleBackfillOptions configure the parameters for backfilling a schedule.
// NOTE: Experimental
ScheduleBackfillOptions = internal.ScheduleBackfillOptions

// Client is the client for starting and getting information about a workflow executions as well as
// completing activities asynchronously.
Client interface {
Expand Down Expand Up @@ -389,6 +485,10 @@ type (
// OperatorService creates a new operator service client with the same gRPC connection as this client.
OperatorService() operatorservice.OperatorServiceClient

// Schedule creates a new shedule client with the same gRPC connection as this client.
// NOTE: Experimental
ScheduleClient() ScheduleClient

// Close client and clean up underlying resources.
//
// If this client was created via NewClientFromExisting or this client has
Expand Down
4 changes: 4 additions & 0 deletions interceptor/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ type ClientTerminateWorkflowInput = internal.ClientTerminateWorkflowInput
// ClientOutboundInterceptor.QueryWorkflow.
type ClientQueryWorkflowInput = internal.ClientQueryWorkflowInput

// ScheduleClientCreateInput is input for
// ScheduleClientInterceptor.CreateSchedule.
type ScheduleClientCreateInput = internal.ScheduleClientCreateInput

// Header provides Temporal header information from the context for reading or
// writing during specific interceptor calls.
//
Expand Down
19 changes: 19 additions & 0 deletions interceptor/tracing_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,25 @@ type tracingClientOutboundInterceptor struct {
root *tracingInterceptor
}

func (t *tracingClientOutboundInterceptor) CreateSchedule(ctx context.Context, in *ScheduleClientCreateInput) (client.ScheduleHandle, error) {
// Start span and write to header
span, ctx, err := t.root.startSpanFromContext(ctx, &TracerStartSpanOptions{
Operation: "CreateSchedule",
Name: in.Options.ID,
ToHeader: true,
Time: time.Now(),
})
if err != nil {
return nil, err
}
var finishOpts TracerFinishSpanOptions
defer span.Finish(&finishOpts)

run, err := t.Next.CreateSchedule(ctx, in)
finishOpts.Error = err
return run, err
}

func (t *tracingClientOutboundInterceptor) ExecuteWorkflow(
ctx context.Context,
in *ClientExecuteWorkflowInput,
Expand Down
5 changes: 4 additions & 1 deletion internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,9 @@ type (
// OperatorService creates a new operator service client with the same gRPC connection as this client.
OperatorService() operatorservice.OperatorServiceClient

// Schedule creates a new shedule client with the same gRPC connection as this client.
ScheduleClient() ScheduleClient

// Close client and clean up underlying resources.
Close()
}
Expand Down Expand Up @@ -395,7 +398,7 @@ type (
DataConverter converter.DataConverter

// Optional: Sets FailureConverter to customize serialization/deserialization of errors.
// default: temporal.DefaultFailureConverter, does not encode any fields of the error. Use temporal.NewDefaultFailureConverter
// default: temporal.DefaultFailureConverter, does not encode any fields of the error. Use temporal.NewDefaultFailureConverter
// options to configure or create a custom converter.
FailureConverter converter.FailureConverter

Expand Down
6 changes: 6 additions & 0 deletions internal/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,12 @@ var (
// which indicate the activity is not done yet. Then, when the waited human action happened, it needs to trigger something
// that could report the activity completed event to temporal server via Client.CompleteActivity() API.
ErrActivityResultPending = errors.New("not error: do not autocomplete, using Client.CompleteActivity() to complete")

// ErrScheduleAlreadyRunning is returned if there's already a running (not deleted) Schedule with the same ID
ErrScheduleAlreadyRunning = errors.New("schedule with this ID is already registered")

// ErrSkipScheduleUpdate is used by a user if they want to skip updating a schedule.
ErrSkipScheduleUpdate = errors.New("skip schedule update")
)

// NewApplicationError create new instance of *ApplicationError with message, type, and optional details.
Expand Down
9 changes: 9 additions & 0 deletions internal/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,9 @@ type ClientOutboundInterceptor interface {
// interceptor.Header will return a non-nil map for this context.
ExecuteWorkflow(context.Context, *ClientExecuteWorkflowInput) (WorkflowRun, error)

// CreateSchedule - Intercept a service call to CreateSchedule
CreateSchedule(ctx context.Context, options *ScheduleClientCreateInput) (ScheduleHandle, error)

// SignalWorkflow intercepts client.Client.SignalWorkflow.
// interceptor.Header will return a non-nil map for this context.
SignalWorkflow(context.Context, *ClientSignalWorkflowInput) error
Expand All @@ -299,6 +302,12 @@ type ClientOutboundInterceptor interface {
mustEmbedClientOutboundInterceptorBase()
}

// ScheduleClientCreateInput is the input to
// ClientOutboundInterceptor.CreateSchedule.
type ScheduleClientCreateInput struct {
Options *ScheduleOptions
}

// ClientExecuteWorkflowInput is the input to
// ClientOutboundInterceptor.ExecuteWorkflow.
type ClientExecuteWorkflowInput struct {
Expand Down
5 changes: 5 additions & 0 deletions internal/interceptor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,4 +429,9 @@ func (c *ClientOutboundInterceptorBase) QueryWorkflow(
return c.Next.QueryWorkflow(ctx, in)
}

// ExecuteWorkflow implements ClientOutboundInterceptor.CreateSchedule.
func (c *ClientOutboundInterceptorBase) CreateSchedule(ctx context.Context, in *ScheduleClientCreateInput) (ScheduleHandle, error) {
return c.Next.CreateSchedule(ctx, in)
}

func (*ClientOutboundInterceptorBase) mustEmbedClientOutboundInterceptorBase() {}
Loading

0 comments on commit 8ab62d9

Please sign in to comment.