Skip to content

Commit

Permalink
Make WaitForOrchestrationXXX gRPC APIs resilient (#80)
Browse files Browse the repository at this point in the history
  • Loading branch information
famarting authored Oct 23, 2024
1 parent fac9dd9 commit 65c308b
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 32 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Add API to set custom status ([#81](https://github.com/microsoft/durabletask-go/pull/81)) - by [@famarting](https://github.com/famarting)
- Add missing purge orchestration options ([#82](https://github.com/microsoft/durabletask-go/pull/82)) - by [@famarting](https://github.com/famarting)

### Changed

- Make WaitForOrchestrationXXX gRPC APIs resilient ([#80](https://github.com/microsoft/durabletask-go/pull/81)) - by [@famarting](https://github.com/famarting)

## [v0.5.0] - 2024-06-28

### Added
Expand Down
82 changes: 50 additions & 32 deletions client/client_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"

"github.com/cenkalti/backoff/v4"
"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/wrapperspb"
Expand Down Expand Up @@ -61,52 +62,57 @@ func (c *TaskHubGrpcClient) FetchOrchestrationMetadata(ctx context.Context, id a
}
return nil, fmt.Errorf("failed to fetch orchestration metadata: %w", err)
}
if !resp.Exists {
return nil, api.ErrInstanceNotFound
}

metadata := makeOrchestrationMetadata(resp)
return metadata, nil
return makeOrchestrationMetadata(resp)
}

// WaitForOrchestrationStart waits for an orchestration to start running and returns an [api.OrchestrationMetadata] object that contains
// metadata about the started instance.
//
// api.ErrInstanceNotFound is returned when the specified orchestration doesn't exist.
func (c *TaskHubGrpcClient) WaitForOrchestrationStart(ctx context.Context, id api.InstanceID, opts ...api.FetchOrchestrationMetadataOptions) (*api.OrchestrationMetadata, error) {
req := makeGetInstanceRequest(id, opts)
resp, err := c.client.WaitForInstanceStart(ctx, req)
if err != nil {
if ctx.Err() != nil {
return nil, ctx.Err()
var resp *protos.GetInstanceResponse
var err error
err = backoff.Retry(func() error {
req := makeGetInstanceRequest(id, opts)
resp, err = c.client.WaitForInstanceStart(ctx, req)
if err != nil {
// if its context cancelled stop retrying
if ctx.Err() != nil {
return backoff.Permanent(ctx.Err())
}
return fmt.Errorf("failed to wait for orchestration start: %w", err)
}
return nil, fmt.Errorf("failed to wait for orchestration start: %w", err)
}
if !resp.Exists {
return nil, api.ErrInstanceNotFound
return nil
}, backoff.WithContext(newInfiniteRetries(), ctx))
if err != nil {
return nil, err
}
metadata := makeOrchestrationMetadata(resp)
return metadata, nil
return makeOrchestrationMetadata(resp)
}

// WaitForOrchestrationCompletion waits for an orchestration to complete and returns an [api.OrchestrationMetadata] object that contains
// metadata about the completed instance.
//
// api.ErrInstanceNotFound is returned when the specified orchestration doesn't exist.
func (c *TaskHubGrpcClient) WaitForOrchestrationCompletion(ctx context.Context, id api.InstanceID, opts ...api.FetchOrchestrationMetadataOptions) (*api.OrchestrationMetadata, error) {
req := makeGetInstanceRequest(id, opts)
resp, err := c.client.WaitForInstanceCompletion(ctx, req)
if err != nil {
if ctx.Err() != nil {
return nil, ctx.Err()
var resp *protos.GetInstanceResponse
var err error
err = backoff.Retry(func() error {
req := makeGetInstanceRequest(id, opts)
resp, err = c.client.WaitForInstanceCompletion(ctx, req)
if err != nil {
// if its context cancelled stop retrying
if ctx.Err() != nil {
return backoff.Permanent(ctx.Err())
}
return fmt.Errorf("failed to wait for orchestration completion: %w", err)
}
return nil, fmt.Errorf("failed to wait for orchestration completion: %w", err)
}
if !resp.Exists {
return nil, api.ErrInstanceNotFound
return nil
}, backoff.WithContext(newInfiniteRetries(), ctx))
if err != nil {
return nil, err
}
metadata := makeOrchestrationMetadata(resp)
return metadata, nil
return makeOrchestrationMetadata(resp)
}

// TerminateOrchestration terminates a running orchestration by causing it to stop receiving new events and
Expand Down Expand Up @@ -215,16 +221,28 @@ func makeGetInstanceRequest(id api.InstanceID, opts []api.FetchOrchestrationMeta
return req
}

func makeOrchestrationMetadata(resp *protos.GetInstanceResponse) *api.OrchestrationMetadata {
// makeOrchestrationMetadata validates and converts protos.GetInstanceResponse to api.OrchestrationMetadata
// api.ErrInstanceNotFound is returned when the specified orchestration doesn't exist.
func makeOrchestrationMetadata(resp *protos.GetInstanceResponse) (*api.OrchestrationMetadata, error) {
if !resp.Exists {
return nil, api.ErrInstanceNotFound
}
if resp.OrchestrationState == nil {
return nil, fmt.Errorf("orchestration state is nil")
}
metadata := &api.OrchestrationMetadata{
InstanceID: api.InstanceID(resp.OrchestrationState.InstanceId),
Name: resp.OrchestrationState.Name,
RuntimeStatus: resp.OrchestrationState.OrchestrationStatus,
CreatedAt: resp.OrchestrationState.CreatedTimestamp.AsTime(),
LastUpdatedAt: resp.OrchestrationState.LastUpdatedTimestamp.AsTime(),
SerializedInput: resp.OrchestrationState.Input.GetValue(),
SerializedCustomStatus: resp.OrchestrationState.CustomStatus.GetValue(),
SerializedOutput: resp.OrchestrationState.Output.GetValue(),
}
return metadata
if resp.OrchestrationState.CreatedTimestamp != nil {
metadata.CreatedAt = resp.OrchestrationState.CreatedTimestamp.AsTime()
}
if resp.OrchestrationState.LastUpdatedTimestamp != nil {
metadata.LastUpdatedAt = resp.OrchestrationState.LastUpdatedTimestamp.AsTime()
}
return metadata, nil
}

0 comments on commit 65c308b

Please sign in to comment.