Skip to content

Commit

Permalink
Delete Namespace: clean up errors (#7029)
Browse files Browse the repository at this point in the history
## What changed?
<!-- Describe what has changed in this PR -->
Clean up errors in delete namespace workflows:
1. Use `FailedPrecondition` instead of `InvalidArgument` where it makes
more sense.
2. Add helpers to construct `ApplicationError`s of different type.
3. Remove `unable to execute activity` error which was wrongly used in
many places.
4. Few minor cleanups.

## Why?
<!-- Tell your future self why have you made these changes -->
Simplify and unify error handling in delete namespace workflows.

## How did you test it?
<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->
Modified tests. Manual runs.

## Potential risks
<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->
No risks.

## Documentation
<!-- Have you made sure this change doesn't falsify anything currently
stated in `docs/`? If significant
new behavior is added, have you described that in `docs/`? -->
No.

## Is hotfix candidate?
<!-- Is this PR a hotfix candidate or does it require a notification to
be sent to the broader community? (Yes/No) -->
No.
  • Loading branch information
alexshtin authored Dec 30, 2024
1 parent a513035 commit 480a0f5
Show file tree
Hide file tree
Showing 13 changed files with 140 additions and 119 deletions.
21 changes: 8 additions & 13 deletions service/frontend/operator_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ package frontend

import (
"context"
"errors"
"fmt"
"maps"
"sync/atomic"
"time"

commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
Expand All @@ -38,7 +38,6 @@ import (
"go.temporal.io/api/serviceerror"
"go.temporal.io/api/workflowservice/v1"
sdkclient "go.temporal.io/sdk/client"
"go.temporal.io/sdk/temporal"
"go.temporal.io/server/api/adminservice/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
svc "go.temporal.io/server/client"
Expand Down Expand Up @@ -68,7 +67,6 @@ import (
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"
)

var _ OperatorHandler = (*OperatorHandlerImpl)(nil)
Expand Down Expand Up @@ -609,14 +607,16 @@ func (h *OperatorHandlerImpl) DeleteNamespace(
) (_ *operatorservice.DeleteNamespaceResponse, retError error) {
defer log.CapturePanic(h.logger, &retError)

// validate request
if request == nil {
return nil, errRequestNotSet
}

// If NamespaceDeleteDelay is not provided, the default delay configured in the cluster should be used.
var namespaceDeleteDelay time.Duration
if request.NamespaceDeleteDelay == nil {
request.NamespaceDeleteDelay = durationpb.New(h.config.DeleteNamespaceNamespaceDeleteDelay())
namespaceDeleteDelay = h.config.DeleteNamespaceNamespaceDeleteDelay()
} else {
namespaceDeleteDelay = request.NamespaceDeleteDelay.AsDuration()
}

// Execute workflow.
Expand All @@ -629,7 +629,7 @@ func (h *OperatorHandlerImpl) DeleteNamespace(
PagesPerExecution: h.config.DeleteNamespacePagesPerExecution(),
ConcurrentDeleteExecutionsActivities: h.config.DeleteNamespaceConcurrentDeleteExecutionsActivities(),
},
NamespaceDeleteDelay: request.NamespaceDeleteDelay.AsDuration(),
NamespaceDeleteDelay: namespaceDeleteDelay,
}

sdkClient := h.sdkClientFactory.GetSystemClient()
Expand All @@ -646,16 +646,11 @@ func (h *OperatorHandlerImpl) DeleteNamespace(
return nil, serviceerror.NewUnavailable(fmt.Sprintf(errUnableToStartWorkflowMessage, deletenamespace.WorkflowName, err))
}

// Wait for workflow to complete.
// Wait for the workflow to complete.
var wfResult deletenamespace.DeleteNamespaceWorkflowResult
err = run.Get(ctx, &wfResult)
if err != nil {
// Special handling for validation errors. Convert them to InvalidArgument.
var appErr *temporal.ApplicationError
if errors.As(err, &appErr) && appErr.Type() == delnserrors.ValidationErrorErrType {
return nil, serviceerror.NewInvalidArgument(appErr.Message())
}
return nil, serviceerror.NewSystemWorkflow(&commonpb.WorkflowExecution{WorkflowId: deletenamespace.WorkflowName, RunId: run.GetRunID()}, err)
return nil, delnserrors.ToServiceError(err, run.GetID(), run.GetRunID())
}

return &operatorservice.DeleteNamespaceResponse{
Expand Down
26 changes: 22 additions & 4 deletions service/frontend/operator_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
"go.temporal.io/api/serviceerror"
"go.temporal.io/api/workflowservice/v1"
sdkclient "go.temporal.io/sdk/client"
"go.temporal.io/sdk/temporal"
"go.temporal.io/server/api/adminservice/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/cluster"
Expand Down Expand Up @@ -1158,6 +1157,7 @@ func (s *operatorHandlerSuite) Test_DeleteNamespace() {
mockRun.EXPECT().Get(gomock.Any(), gomock.Any()).Return(errors.New("workflow failed"))
const RunId = "9a9f668a-58b1-427e-bed6-bf1401049f7d"
mockRun.EXPECT().GetRunID().Return(RunId)
mockRun.EXPECT().GetID().Return("test-workflow-id")
mockSdkClient.EXPECT().ExecuteWorkflow(gomock.Any(), gomock.Any(), "temporal-sys-delete-namespace-workflow", gomock.Any()).Return(mockRun, nil)
resp, err = handler.DeleteNamespace(ctx, &operatorservice.DeleteNamespaceRequest{
Namespace: "test-namespace",
Expand All @@ -1166,20 +1166,38 @@ func (s *operatorHandlerSuite) Test_DeleteNamespace() {
var sysWfErr *serviceerror.SystemWorkflow
s.ErrorAs(err, &sysWfErr)
s.Equal(RunId, sysWfErr.WorkflowExecution.RunId)
s.Equal(fmt.Sprintf("System Workflow with WorkflowId temporal-sys-delete-namespace-workflow and RunId %s returned an error: workflow failed", RunId), err.Error())
s.Equal(fmt.Sprintf("System Workflow with WorkflowId test-workflow-id and RunId %s returned an error: workflow failed", RunId), err.Error())
s.Nil(resp)

// Workflow failed because of validation error (an attempt to delete system namespace).
mockRun2 := mocksdk.NewMockWorkflowRun(s.controller)
mockRun2.EXPECT().Get(gomock.Any(), gomock.Any()).Return(temporal.NewNonRetryableApplicationError("unable to delete system namespace", delnserrors.ValidationErrorErrType, nil, nil))
mockRun2.EXPECT().Get(gomock.Any(), gomock.Any()).Return(delnserrors.NewFailedPrecondition("unable to delete system namespace", nil))
mockRun2.EXPECT().GetRunID().Return(RunId)
mockRun2.EXPECT().GetID().Return("test-workflow-id")
mockSdkClient.EXPECT().ExecuteWorkflow(gomock.Any(), gomock.Any(), "temporal-sys-delete-namespace-workflow", gomock.Any()).Return(mockRun2, nil)
resp, err = handler.DeleteNamespace(ctx, &operatorservice.DeleteNamespaceRequest{
Namespace: "temporal-system",
})
s.Error(err)
var failedPreconditionErr *serviceerror.FailedPrecondition
s.ErrorAs(err, &failedPreconditionErr)
s.Equal("unable to delete system namespace", failedPreconditionErr.Error())
s.Nil(resp)

// Workflow failed because of validation error (an attempt to delete system namespace).
mockRun3 := mocksdk.NewMockWorkflowRun(s.controller)
mockRun3.EXPECT().Get(gomock.Any(), gomock.Any()).Return(delnserrors.NewInvalidArgument("only one of namespace or namespace ID must be set", nil))
mockRun3.EXPECT().GetRunID().Return(RunId)
mockRun3.EXPECT().GetID().Return("test-workflow-id")
mockSdkClient.EXPECT().ExecuteWorkflow(gomock.Any(), gomock.Any(), "temporal-sys-delete-namespace-workflow", gomock.Any()).Return(mockRun3, nil)
resp, err = handler.DeleteNamespace(ctx, &operatorservice.DeleteNamespaceRequest{
Namespace: "temporal-system",
NamespaceId: "c13c01a7-3887-4eda-ba4b-9a07a6359e7e",
})
s.Error(err)
var invalidArgErr *serviceerror.InvalidArgument
s.ErrorAs(err, &invalidArgErr)
s.Equal("unable to delete system namespace", invalidArgErr.Error())
s.Equal("only one of namespace or namespace ID must be set", invalidArgErr.Error())
s.Nil(resp)

// Success case.
Expand Down
62 changes: 36 additions & 26 deletions service/worker/deletenamespace/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ package deletenamespace

import (
"context"
stderrors "errors"
"fmt"
"slices"
"strings"

enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/sdk/temporal"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/headers"
"go.temporal.io/server/common/log"
Expand Down Expand Up @@ -88,11 +88,19 @@ func (a *localActivities) GetNamespaceInfoActivity(ctx context.Context, nsID nam

getNamespaceResponse, err := a.metadataManager.GetNamespace(ctx, getNamespaceRequest)
if err != nil {
var nsNotFoundErr *serviceerror.NamespaceNotFound
if stderrors.As(err, &nsNotFoundErr) {
ns := nsName.String()
if ns == "" {
ns = nsID.String()
}
return getNamespaceInfoResult{}, errors.NewInvalidArgument(fmt.Sprintf("namespace %s is not found", ns), err)
}
return getNamespaceInfoResult{}, err
}

if getNamespaceResponse.Namespace == nil || getNamespaceResponse.Namespace.Info == nil || getNamespaceResponse.Namespace.Info.Id == "" {
return getNamespaceInfoResult{}, temporal.NewNonRetryableApplicationError("namespace info is corrupted", "", nil)
return getNamespaceInfoResult{}, stderrors.New("namespace info is corrupted")
}

return getNamespaceInfoResult{
Expand All @@ -104,36 +112,37 @@ func (a *localActivities) GetNamespaceInfoActivity(ctx context.Context, nsID nam

func (a *localActivities) ValidateProtectedNamespacesActivity(_ context.Context, nsName namespace.Name) error {
if slices.Contains(a.protectedNamespaces(), nsName.String()) {
return temporal.NewNonRetryableApplicationError(fmt.Sprintf("namespace %s is protected from deletion", nsName), errors.ValidationErrorErrType, nil, nil)
return errors.NewFailedPrecondition(fmt.Sprintf("namespace %s is protected from deletion", nsName), nil)
}
return nil
}

func (a *localActivities) ValidateNexusEndpointsActivity(ctx context.Context, nsID namespace.ID, nsName namespace.Name) error {
if !a.allowDeleteNamespaceIfNexusEndpointTarget() {
// Prevent deletion of a namespace that is targeted by a Nexus endpoint.
var nextPageToken []byte
for {
resp, err := a.nexusEndpointManager.ListNexusEndpoints(ctx, &persistence.ListNexusEndpointsRequest{
LastKnownTableVersion: 0,
NextPageToken: nextPageToken,
PageSize: a.nexusEndpointListDefaultPageSize(),
})
if err != nil {
a.logger.Error("Unable to list Nexus endpoints from persistence.", tag.WorkflowNamespace(nsName.String()), tag.Error(err))
return fmt.Errorf("unable to list Nexus endpoints for namespace %s: %w", nsName, err)
}
if a.allowDeleteNamespaceIfNexusEndpointTarget() {
return nil
}
// Prevent deletion of a namespace that is targeted by a Nexus endpoint.
var nextPageToken []byte
for {
resp, err := a.nexusEndpointManager.ListNexusEndpoints(ctx, &persistence.ListNexusEndpointsRequest{
LastKnownTableVersion: 0,
NextPageToken: nextPageToken,
PageSize: a.nexusEndpointListDefaultPageSize(),
})
if err != nil {
a.logger.Error("Unable to list Nexus endpoints from persistence.", tag.WorkflowNamespace(nsName.String()), tag.WorkflowNamespaceID(nsID.String()), tag.Error(err))
return fmt.Errorf("unable to list Nexus endpoints for namespace %s: %w", nsName, err)
}

for _, entry := range resp.Entries {
if endpointNsID := entry.GetEndpoint().GetSpec().GetTarget().GetWorker().GetNamespaceId(); endpointNsID == nsID.String() {
return temporal.NewNonRetryableApplicationError(fmt.Sprintf("cannot delete a namespace that is a target of a Nexus endpoint %s", entry.GetEndpoint().GetSpec().GetName()), errors.ValidationErrorErrType, nil, nil)
}
}
nextPageToken = resp.NextPageToken
if len(nextPageToken) == 0 {
break
for _, entry := range resp.Entries {
if endpointNsID := entry.GetEndpoint().GetSpec().GetTarget().GetWorker().GetNamespaceId(); endpointNsID == nsID.String() {
return errors.NewFailedPrecondition(fmt.Sprintf("cannot delete a namespace that is a target of a Nexus endpoint %s", entry.GetEndpoint().GetSpec().GetName()), nil)
}
}
nextPageToken = resp.NextPageToken
if len(nextPageToken) == 0 {
break
}
}
return nil
}
Expand Down Expand Up @@ -201,11 +210,12 @@ func (a *localActivities) GenerateDeletedNamespaceNameActivity(ctx context.Conte
return namespace.Name(newName), nil
default:
logger.Error("Unable to get namespace details.", tag.Error(err))
return namespace.EmptyName, err
return namespace.EmptyName, fmt.Errorf("unable to get namespace details: %w", err)
}
}
// Should never get here because namespace ID is guaranteed to be unique.
panic(fmt.Sprintf("Unable to generate new name for deleted namespace %s. ID %q is not unique.", nsName, nsID))
return namespace.EmptyName, fmt.Errorf("unable to generate new name for deleted namespace %s. ID %q is not unique", nsName, nsID)

}

func (a *localActivities) RenameNamespaceActivity(ctx context.Context, previousName namespace.Name, newName namespace.Name) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (a *LocalActivities) GetNextPageTokenActivity(ctx context.Context, params G

resp, err := a.visibilityManager.ListWorkflowExecutions(ctx, req)
if err != nil {
a.logger.Error("Unable to list all workflows to get next page token.", tag.WorkflowNamespace(params.Namespace.String()), tag.Error(err))
a.logger.Error("Unable to list all workflows to get next page token.", tag.WorkflowNamespace(params.Namespace.String()), tag.WorkflowNamespaceID(params.NamespaceID.String()), tag.Error(err))
return nil, err
}

Expand Down
14 changes: 5 additions & 9 deletions service/worker/deletenamespace/deleteexecutions/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
package deleteexecutions

import (
"fmt"
"time"

"go.temporal.io/sdk/log"
Expand Down Expand Up @@ -81,15 +80,12 @@ var (

func validateParams(params *DeleteExecutionsParams) error {
if params.NamespaceID.IsEmpty() {
return temporal.NewNonRetryableApplicationError("namespace ID is required", "", nil)
return errors.NewInvalidArgument("namespace ID is required", nil)
}

if params.Namespace.IsEmpty() {
return temporal.NewNonRetryableApplicationError("namespace is required", "", nil)
return errors.NewInvalidArgument("namespace is required", nil)
}

params.Config.ApplyDefaults()

return nil
}

Expand Down Expand Up @@ -145,7 +141,7 @@ func DeleteExecutionsWorkflow(ctx workflow.Context, params DeleteExecutionsParam
NextPageToken: nextPageToken,
}).Get(ctx, &nextPageToken)
if err != nil {
return result, fmt.Errorf("%w: GetNextPageTokenActivity: %v", errors.ErrUnableToExecuteActivity, err)
return result, err
}

runningDeleteExecutionsActivityCount++
Expand All @@ -165,7 +161,7 @@ func DeleteExecutionsWorkflow(ctx workflow.Context, params DeleteExecutionsParam
// Wait for one of running activities to complete.
runningDeleteExecutionsSelector.Select(ctx)
if lastDeleteExecutionsActivityErr != nil {
return result, fmt.Errorf("%w: DeleteExecutionsActivity: %v", errors.ErrUnableToExecuteActivity, lastDeleteExecutionsActivityErr)
return result, lastDeleteExecutionsActivityErr
}
}

Expand All @@ -178,7 +174,7 @@ func DeleteExecutionsWorkflow(ctx workflow.Context, params DeleteExecutionsParam
for runningDeleteExecutionsActivityCount > 0 {
runningDeleteExecutionsSelector.Select(ctx)
if lastDeleteExecutionsActivityErr != nil {
return result, fmt.Errorf("%w: DeleteExecutionsActivity: %v", errors.ErrUnableToExecuteActivity, lastDeleteExecutionsActivityErr)
return result, lastDeleteExecutionsActivityErr
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,7 @@ func Test_DeleteExecutionsWorkflow_ManyExecutions_ActivityError(t *testing.T) {
require.Error(t, err)
var appErr *temporal.ApplicationError
require.True(t, stderrors.As(err, &appErr))
require.Contains(t, appErr.Error(), "unable to execute activity: DeleteExecutionsActivity")
require.Contains(t, appErr.Error(), "specific_error_from_activity")
require.Equal(t, appErr.Error(), "specific_error_from_activity (type: Unavailable, retryable: true)")
}

func Test_DeleteExecutionsWorkflow_NoActivityMocks_ManyExecutions(t *testing.T) {
Expand Down
39 changes: 30 additions & 9 deletions service/worker/deletenamespace/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,30 +28,51 @@ import (
"errors"
"fmt"

commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/sdk/temporal"
)

const (
ValidationErrorErrType = "ValidationError"
InvalidArgumentErrType = "InvalidArgument"
FailedPreconditionErrType = "FailedPrecondition"
ExecutionsStillExistErrType = "ExecutionsStillExist"
NoProgressErrType = "NoProgress"
NotDeletedExecutionsStillExistErrType = "NotDeletedExecutionsStillExist"
)

var (
ErrUnableToExecuteActivity = errors.New("unable to execute activity")
ErrUnableToExecuteChildWorkflow = errors.New("unable to execute child workflow")
ErrUnableToSetUpdateHandler = errors.New("unable to set Update handler")
)
func NewInvalidArgument(message string, cause error) error {
return temporal.NewNonRetryableApplicationError(message, InvalidArgumentErrType, cause, nil)
}

func NewFailedPrecondition(message string, cause error) error {
return temporal.NewNonRetryableApplicationError(message, FailedPreconditionErrType, cause, nil)
}

func NewExecutionsStillExistError(count int) error {
func NewExecutionsStillExist(count int) error {
return temporal.NewApplicationError(fmt.Sprintf("%d executions are still exist", count), ExecutionsStillExistErrType, count)
}

func NewNoProgressError(count int) error {
func NewNoProgress(count int) error {
return temporal.NewNonRetryableApplicationError(fmt.Sprintf("no progress was made: %d executions are still exist", count), NoProgressErrType, nil, count)
}

func NewNotDeletedExecutionsStillExistError(count int) error {
func NewNotDeletedExecutionsStillExist(count int) error {
return temporal.NewNonRetryableApplicationError(fmt.Sprintf("%d not deleted executions are still exist", count), NotDeletedExecutionsStillExistErrType, nil, count)
}

func ToServiceError(err error, workflowID, runID string) error {
var appErr *temporal.ApplicationError
if errors.As(err, &appErr) {
switch appErr.Type() {
case InvalidArgumentErrType:
return serviceerror.NewInvalidArgument(appErr.Message())
case FailedPreconditionErrType:
return serviceerror.NewFailedPrecondition(appErr.Message())
}
}
return serviceerror.NewSystemWorkflow(
&commonpb.WorkflowExecution{WorkflowId: workflowID, RunId: runID},
err,
)
}
6 changes: 3 additions & 3 deletions service/worker/deletenamespace/reclaimresources/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,18 +138,18 @@ func (a *Activities) EnsureNoExecutionsAdvVisibilityActivity(ctx context.Context
// No progress was made. Something bad happened on the task processor side or new executions were created during deletion.
// Return non-retryable error and workflow will try to delete executions again.
logger.Warn("No progress was made.", tag.Attempt(activityInfo.Attempt), tag.Counter(count))
return errors.NewNoProgressError(count)
return errors.NewNoProgress(count)
}
}

logger.Warn("Some workflow executions still exist.", tag.Counter(count))
activity.RecordHeartbeat(ctx, count)
return errors.NewExecutionsStillExistError(count)
return errors.NewExecutionsStillExist(count)
}

if notDeletedCount > 0 {
logger.Warn("Some workflow executions were not deleted and still exist.", tag.Counter(notDeletedCount))
return errors.NewNotDeletedExecutionsStillExistError(notDeletedCount)
return errors.NewNotDeletedExecutionsStillExist(notDeletedCount)
}

logger.Info("All workflow executions are deleted successfully.")
Expand Down
Loading

0 comments on commit 480a0f5

Please sign in to comment.