Skip to content

Commit

Permalink
Add TaskReachability API (temporalio#1247)
Browse files Browse the repository at this point in the history
Add TaskReachability API
  • Loading branch information
Quinn-With-Two-Ns authored Oct 4, 2023
1 parent fecfba8 commit a001e5f
Show file tree
Hide file tree
Showing 7 changed files with 355 additions and 1 deletion.
4 changes: 3 additions & 1 deletion .github/workflows/docker/dynamic-config-custom.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,6 @@ frontend.workerVersioningDataAPIs:
frontend.workerVersioningWorkflowAPIs:
- value: true
worker.buildIdScavengerEnabled:
- value: true
- value: true
worker.removableBuildIdDurationSinceDefault:
- value: 1
44 changes: 44 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,25 @@ import (
"go.temporal.io/sdk/internal/common/metrics"
)

// TaskReachability specifies which category of tasks may reach a worker on a versioned task queue.
// Used both in a reachability query and its response.
// WARNING: Worker versioning is currently experimental
type TaskReachability = internal.TaskReachability

const (
// TaskReachabilityUnspecified indicates the reachability was not specified
TaskReachabilityUnspecified = internal.TaskReachabilityUnspecified
// TaskReachabilityNewWorkflows indicates the Build Id might be used by new workflows
TaskReachabilityNewWorkflows = internal.TaskReachabilityNewWorkflows
// TaskReachabilityExistingWorkflows indicates the Build Id might be used by open workflows
// and/or closed workflows.
TaskReachabilityExistingWorkflows = internal.TaskReachabilityExistingWorkflows
// TaskReachabilityOpenWorkflows indicates the Build Id might be used by open workflows.
TaskReachabilityOpenWorkflows = internal.TaskReachabilityOpenWorkflows
// TaskReachabilityClosedWorkflows indicates the Build Id might be used by closed workflows
TaskReachabilityClosedWorkflows = internal.TaskReachabilityClosedWorkflows
)

const (
// DefaultHostPort is the host:port which is used if not passed with options.
DefaultHostPort = internal.LocalHostPort
Expand All @@ -58,6 +77,10 @@ const (
// QueryTypeOpenSessions is the build in query type for Client.QueryWorkflow() call. Use this query type to get all open
// sessions in the workflow. The result will be a list of SessionInfo encoded in the converter.EncodedValue.
QueryTypeOpenSessions string = internal.QueryTypeOpenSessions

// UnversionedBuildID is a stand-in for a Build Id for unversioned Workers.
// WARNING: Worker versioning is currently experimental
UnversionedBuildID string = internal.UnversionedBuildID
)

type (
Expand Down Expand Up @@ -216,6 +239,22 @@ type (
// WARNING: Worker versioning is currently experimental
BuildIDOpPromoteIDWithinSet = internal.BuildIDOpPromoteIDWithinSet

// GetWorkerTaskReachabilityOptions is the input to Client.GetWorkerTaskReachability.
// WARNING: Worker versioning is currently experimental
GetWorkerTaskReachabilityOptions = internal.GetWorkerTaskReachabilityOptions

// WorkerTaskReachability is the response for Client.GetWorkerTaskReachability.
// WARNING: Worker versioning is currently experimental
WorkerTaskReachability = internal.WorkerTaskReachability

// BuildIDReachability describes the reachability of a buildID
// WARNING: Worker versioning is currently experimental
BuildIDReachability = internal.BuildIDReachability

// TaskQueueReachability Describes how the Build ID may be reachable from the task queue.
// WARNING: Worker versioning is currently experimental
TaskQueueReachability = internal.TaskQueueReachability

// Client is the client for starting and getting information about a workflow executions as well as
// completing activities asynchronously.
Client interface {
Expand Down Expand Up @@ -513,6 +552,11 @@ type (
// WARNING: Worker versioning is currently experimental
GetWorkerBuildIdCompatibility(ctx context.Context, options *GetWorkerBuildIdCompatibilityOptions) (*WorkerBuildIDVersionSets, error)

// GetWorkerTaskReachability
// Returns which versions are is still in use by open or closed workflows
// WARNING: Worker versioning is currently experimental
GetWorkerTaskReachability(ctx context.Context, options *GetWorkerTaskReachabilityOptions) (*WorkerTaskReachability, error)

// CheckHealth performs a server health check using the gRPC health check
// API. If the check fails, an error is returned.
CheckHealth(ctx context.Context, request *CheckHealthRequest) (*CheckHealthResponse, error)
Expand Down
3 changes: 3 additions & 0 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,9 @@ type (
// GetWorkerBuildIdCompatibility returns the worker-build-id based version sets for a particular task queue.
GetWorkerBuildIdCompatibility(ctx context.Context, options *GetWorkerBuildIdCompatibilityOptions) (*WorkerBuildIDVersionSets, error)

// GetWorkerTaskReachability returns which versions are is still in use by open or closed workflows.
GetWorkerTaskReachability(ctx context.Context, options *GetWorkerTaskReachabilityOptions) (*WorkerTaskReachability, error)

// CheckHealth performs a server health check using the gRPC health check
// API. If the check fails, an error is returned.
CheckHealth(ctx context.Context, request *CheckHealthRequest) (*CheckHealthResponse, error)
Expand Down
23 changes: 23 additions & 0 deletions internal/internal_workflow_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1007,6 +1007,29 @@ func (wc *WorkflowClient) GetWorkerBuildIdCompatibility(ctx context.Context, opt
return converted, nil
}

// GetWorkerTaskReachability returns which versions are is still in use by open or closed workflows.
func (wc *WorkflowClient) GetWorkerTaskReachability(ctx context.Context, options *GetWorkerTaskReachabilityOptions) (*WorkerTaskReachability, error) {
if err := wc.ensureInitialized(); err != nil {
return nil, err
}

grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx))
defer cancel()

request := &workflowservice.GetWorkerTaskReachabilityRequest{
Namespace: wc.namespace,
BuildIds: options.BuildIDs,
TaskQueues: options.TaskQueues,
Reachability: taskReachabilityToProto(options.Reachability),
}
resp, err := wc.workflowService.GetWorkerTaskReachability(grpcCtx, request)
if err != nil {
return nil, err
}
converted := workerTaskReachabilityFromProtoResponse(resp)
return converted, nil
}

func (wc *WorkflowClient) UpdateWorkflowWithOptions(
ctx context.Context,
req *UpdateWorkflowWithOptionsRequest,
Expand Down
136 changes: 136 additions & 0 deletions internal/worker_version_sets.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,14 @@ package internal
import (
"errors"

enumspb "go.temporal.io/api/enums/v1"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
"go.temporal.io/api/workflowservice/v1"
)

// A stand-in for a Build Id for unversioned Workers.
const UnversionedBuildID = ""

// VersioningIntent indicates whether the user intends certain commands to be run on
// a compatible worker build ID version or not.
type VersioningIntent int
Expand All @@ -47,6 +51,24 @@ const (
VersioningIntentDefault
)

// TaskReachability specifies which category of tasks may reach a worker on a versioned task queue.
// Used both in a reachability query and its response.
type TaskReachability int

const (
// TaskReachabilityUnspecified indicates the reachability was not specified
TaskReachabilityUnspecified = iota
// TaskReachabilityNewWorkflows indicates the Build Id might be used by new workflows
TaskReachabilityNewWorkflows
// TaskReachabilityExistingWorkflows indicates the Build Id might be used by open workflows
// and/or closed workflows.
TaskReachabilityExistingWorkflows
// TaskReachabilityOpenWorkflows indicates the Build Id might be used by open workflows.
TaskReachabilityOpenWorkflows
// TaskReachabilityClosedWorkflows indicates the Build Id might be used by closed workflows
TaskReachabilityClosedWorkflows
)

type (
// UpdateWorkerBuildIdCompatibilityOptions is the input to
// Client.UpdateWorkerBuildIdCompatibility.
Expand Down Expand Up @@ -130,6 +152,38 @@ type GetWorkerBuildIdCompatibilityOptions struct {
MaxSets int
}

type GetWorkerTaskReachabilityOptions struct {
// BuildIDs - The build IDs to query the reachability of. At least one build ID must be provided.
BuildIDs []string
// TaskQueues - The task queues with Build IDs defined on them that the request is
// concerned with.
// Optional: defaults to all task queues
TaskQueues []string
// Reachability - The reachability this request is concerned with.
// Optional: defaults to all types of reachability
Reachability TaskReachability
}

type WorkerTaskReachability struct {
// BuildIDReachability - map of build IDs and their reachability information
// May contain an entry with UnversionedBuildID for an unversioned worker
BuildIDReachability map[string]*BuildIDReachability
}

type BuildIDReachability struct {
// TaskQueueReachable map of task queues and their reachability information.
TaskQueueReachable map[string]*TaskQueueReachability
// UnretrievedTaskQueues is a list of task queues not retrieved because the server limits
// the number that can be queried at once.
UnretrievedTaskQueues []string
}

type TaskQueueReachability struct {
// TaskQueueReachability for a worker in a single task queue.
// If TaskQueueReachability is empty, this worker is considered unreachable in this task queue.
TaskQueueReachability []TaskReachability
}

// WorkerBuildIDVersionSets is the response for Client.GetWorkerBuildIdCompatibility and represents the sets
// of worker build id based versions.
type WorkerBuildIDVersionSets struct {
Expand Down Expand Up @@ -176,6 +230,88 @@ func workerVersionSetsFromProto(sets []*taskqueuepb.CompatibleVersionSet) []*Com
return result
}

func workerTaskReachabilityFromProtoResponse(response *workflowservice.GetWorkerTaskReachabilityResponse) *WorkerTaskReachability {
if response == nil {
return nil
}
return &WorkerTaskReachability{
BuildIDReachability: buildIDReachabilityFromProto(response.GetBuildIdReachability()),
}
}

func buildIDReachabilityFromProto(sets []*taskqueuepb.BuildIdReachability) map[string]*BuildIDReachability {
if sets == nil {
return nil
}
result := make(map[string]*BuildIDReachability, len(sets))
for _, s := range sets {
retrievedTaskQueues, unretrievedTaskQueues := taskQueueReachabilityFromProto(s.GetTaskQueueReachability())
result[s.GetBuildId()] = &BuildIDReachability{
TaskQueueReachable: retrievedTaskQueues,
UnretrievedTaskQueues: unretrievedTaskQueues,
}
}
return result
}

func taskQueueReachabilityFromProto(sets []*taskqueuepb.TaskQueueReachability) (map[string]*TaskQueueReachability, []string) {
if sets == nil {
return nil, nil
}
retrievedTaskQueues := make(map[string]*TaskQueueReachability, len(sets))
unretrievedTaskQueues := make([]string, 0, len(sets))
for _, s := range sets {
reachability := make([]TaskReachability, len(s.GetReachability()))
for i, r := range s.GetReachability() {
reachability[i] = taskReachabilityFromProto(r)
}
if len(reachability) == 1 && reachability[0] == TaskReachabilityUnspecified {
unretrievedTaskQueues = append(unretrievedTaskQueues, s.GetTaskQueue())
} else {
retrievedTaskQueues[s.GetTaskQueue()] = &TaskQueueReachability{
TaskQueueReachability: reachability,
}
}

}
return retrievedTaskQueues, unretrievedTaskQueues
}

func taskReachabilityToProto(r TaskReachability) enumspb.TaskReachability {
switch r {
case TaskReachabilityUnspecified:
return enumspb.TASK_REACHABILITY_UNSPECIFIED
case TaskReachabilityNewWorkflows:
return enumspb.TASK_REACHABILITY_NEW_WORKFLOWS
case TaskReachabilityExistingWorkflows:
return enumspb.TASK_REACHABILITY_EXISTING_WORKFLOWS
case TaskReachabilityOpenWorkflows:
return enumspb.TASK_REACHABILITY_OPEN_WORKFLOWS
case TaskReachabilityClosedWorkflows:
return enumspb.TASK_REACHABILITY_CLOSED_WORKFLOWS
default:
panic("unknown task reachability")

}
}

func taskReachabilityFromProto(r enumspb.TaskReachability) TaskReachability {
switch r {
case enumspb.TASK_REACHABILITY_UNSPECIFIED:
return TaskReachabilityUnspecified
case enumspb.TASK_REACHABILITY_NEW_WORKFLOWS:
return TaskReachabilityNewWorkflows
case enumspb.TASK_REACHABILITY_EXISTING_WORKFLOWS:
return TaskReachabilityExistingWorkflows
case enumspb.TASK_REACHABILITY_OPEN_WORKFLOWS:
return TaskReachabilityOpenWorkflows
case enumspb.TASK_REACHABILITY_CLOSED_WORKFLOWS:
return TaskReachabilityClosedWorkflows
default:
panic("unknown task reachability")
}
}

func (v *BuildIDOpAddNewIDInNewDefaultSet) targetedBuildId() string { return v.BuildID }
func (v *BuildIDOpAddNewCompatibleVersion) targetedBuildId() string { return v.BuildID }
func (v *BuildIDOpPromoteSet) targetedBuildId() string { return v.BuildID }
Expand Down
23 changes: 23 additions & 0 deletions mocks/Client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit a001e5f

Please sign in to comment.