From 5ed438885de2af00c3808c51de1fdadc847a0161 Mon Sep 17 00:00:00 2001 From: congqixia Date: Wed, 17 Jan 2024 11:42:45 +0800 Subject: [PATCH] enhance: [Cherry-pick] Add merr package to utilize milvus error (#645) (#648) This PR: - Add merr package, which is simplified version from milvus main repo - Add mock API due to proto update --------- --------- Signed-off-by: Congqi Xia --- .github/workflows/main.yml | 3 +- merr/errors.go | 243 +++++++++++++++++++++++++++++++++++++ merr/errors_test.go | 77 ++++++++++++ merr/utils.go | 230 +++++++++++++++++++++++++++++++++++ 4 files changed, 552 insertions(+), 1 deletion(-) create mode 100644 merr/errors.go create mode 100644 merr/errors_test.go create mode 100644 merr/utils.go diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 16ec70aff..13a71f387 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -7,6 +7,7 @@ on: - 'client/**' - 'entity/**' - 'internal/**' + - 'merr/**' # Triggers the workflow on push or pull request events but only for the master branch pull_request: paths: @@ -14,7 +15,7 @@ on: - 'client/**' - 'entity/**' - 'internal/**' - + - 'merr/**' jobs: # This workflow contains a single job called "build" diff --git a/merr/errors.go b/merr/errors.go new file mode 100644 index 000000000..e7c86bf1c --- /dev/null +++ b/merr/errors.go @@ -0,0 +1,243 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package merr + +import ( + "github.com/cockroachdb/errors" +) + +const ( + CanceledCode int32 = 10000 + TimeoutCode int32 = 10001 +) + +// Define leaf errors here, +// WARN: take care to add new error, +// check whehter you can use the erorrs below before adding a new one. +// Name: Err + related prefix + error name +var ( + // Service related + ErrServiceNotReady = newMilvusError("service not ready", 1, true) // This indicates the service is still in init + ErrServiceUnavailable = newMilvusError("service unavailable", 2, true) + ErrServiceMemoryLimitExceeded = newMilvusError("memory limit exceeded", 3, false) + ErrServiceRequestLimitExceeded = newMilvusError("request limit exceeded", 4, true) + ErrServiceInternal = newMilvusError("service internal error", 5, false) // Never return this error out of Milvus + ErrServiceCrossClusterRouting = newMilvusError("cross cluster routing", 6, false) + ErrServiceDiskLimitExceeded = newMilvusError("disk limit exceeded", 7, false) + ErrServiceRateLimit = newMilvusError("rate limit exceeded", 8, true) + ErrServiceForceDeny = newMilvusError("force deny", 9, false) + ErrServiceUnimplemented = newMilvusError("service unimplemented", 10, false) + + // Collection related + ErrCollectionNotFound = newMilvusError("collection not found", 100, false) + ErrCollectionNotLoaded = newMilvusError("collection not loaded", 101, false) + ErrCollectionNumLimitExceeded = newMilvusError("exceeded the limit number of collections", 102, false) + ErrCollectionNotFullyLoaded = newMilvusError("collection not fully loaded", 103, true) + + // Partition related + ErrPartitionNotFound = newMilvusError("partition not found", 200, false) + ErrPartitionNotLoaded = newMilvusError("partition not loaded", 201, false) + ErrPartitionNotFullyLoaded = newMilvusError("partition not fully loaded", 202, true) + + // ResourceGroup related + ErrResourceGroupNotFound = newMilvusError("resource group not found", 300, false) + + // Replica related + ErrReplicaNotFound = newMilvusError("replica not found", 400, false) + ErrReplicaNotAvailable = newMilvusError("replica not available", 401, false) + + // Channel & Delegator related + ErrChannelNotFound = newMilvusError("channel not found", 500, false) + ErrChannelLack = newMilvusError("channel lacks", 501, false) + ErrChannelReduplicate = newMilvusError("channel reduplicates", 502, false) + ErrChannelNotAvailable = newMilvusError("channel not available", 503, false) + + // Segment related + ErrSegmentNotFound = newMilvusError("segment not found", 600, false) + ErrSegmentNotLoaded = newMilvusError("segment not loaded", 601, false) + ErrSegmentLack = newMilvusError("segment lacks", 602, false) + ErrSegmentReduplicate = newMilvusError("segment reduplicates", 603, false) + + // Index related + ErrIndexNotFound = newMilvusError("index not found", 700, false) + ErrIndexNotSupported = newMilvusError("index type not supported", 701, false) + ErrIndexDuplicate = newMilvusError("index duplicates", 702, false) + + // Database related + ErrDatabaseNotFound = newMilvusError("database not found", 800, false) + ErrDatabaseNumLimitExceeded = newMilvusError("exceeded the limit number of database", 801, false) + ErrDatabaseInvalidName = newMilvusError("invalid database name", 802, false) + + // Node related + ErrNodeNotFound = newMilvusError("node not found", 901, false) + ErrNodeOffline = newMilvusError("node offline", 902, false) + ErrNodeLack = newMilvusError("node lacks", 903, false) + ErrNodeNotMatch = newMilvusError("node not match", 904, false) + ErrNodeNotAvailable = newMilvusError("node not available", 905, false) + + // IO related + ErrIoKeyNotFound = newMilvusError("key not found", 1000, false) + ErrIoFailed = newMilvusError("IO failed", 1001, false) + + // Parameter related + ErrParameterInvalid = newMilvusError("invalid parameter", 1100, false) + + // Metrics related + ErrMetricNotFound = newMilvusError("metric not found", 1200, false) + + // Message queue related + ErrMqTopicNotFound = newMilvusError("topic not found", 1300, false) + ErrMqTopicNotEmpty = newMilvusError("topic not empty", 1301, false) + ErrMqInternal = newMilvusError("message queue internal error", 1302, false) + ErrDenyProduceMsg = newMilvusError("deny to write the message to mq", 1303, false) + + // Privilege related + // this operation is denied because the user not authorized, user need to login in first + ErrPrivilegeNotAuthenticated = newMilvusError("not authenticated", 1400, false) + // this operation is denied because the user has no permission to do this, user need higher privilege + ErrPrivilegeNotPermitted = newMilvusError("privilege not permitted", 1401, false) + + // Alias related + ErrAliasNotFound = newMilvusError("alias not found", 1600, false) + ErrAliasCollectionNameConfilct = newMilvusError("alias and collection name conflict", 1601, false) + ErrAliasAlreadyExist = newMilvusError("alias already exist", 1602, false) + + // field related + ErrFieldNotFound = newMilvusError("field not found", 1700, false) + ErrFieldInvalidName = newMilvusError("field name invalid", 1701, false) + + // high-level restful api related + ErrNeedAuthenticate = newMilvusError("user hasn't authenticated", 1800, false) + ErrIncorrectParameterFormat = newMilvusError("can only accept json format request", 1801, false) + ErrMissingRequiredParameters = newMilvusError("missing required parameters", 1802, false) + ErrMarshalCollectionSchema = newMilvusError("fail to marshal collection schema", 1803, false) + ErrInvalidInsertData = newMilvusError("fail to deal the insert data", 1804, false) + ErrInvalidSearchResult = newMilvusError("fail to parse search result", 1805, false) + ErrCheckPrimaryKey = newMilvusError("please check the primary key and its' type can only in [int, string]", 1806, false) + + // replicate related + ErrDenyReplicateMessage = newMilvusError("deny to use the replicate message in the normal instance", 1900, false) + ErrInvalidMsgBytes = newMilvusError("invalid replicate msg bytes", 1901, false) + ErrNoAssignSegmentID = newMilvusError("no assign segment id", 1902, false) + ErrInvalidStreamObj = newMilvusError("invalid stream object", 1903, false) + + // Segcore related + ErrSegcore = newMilvusError("segcore error", 2000, false) + + // Do NOT export this, + // never allow programmer using this, keep only for converting unknown error to milvusError + errUnexpected = newMilvusError("unexpected error", (1<<16)-1, false) + + // import + ErrImportFailed = newMilvusError("importing data failed", 2100, false) +) + +type milvusError struct { + msg string + detail string + retriable bool + errCode int32 +} + +func newMilvusError(msg string, code int32, retriable bool) milvusError { + return milvusError{ + msg: msg, + detail: msg, + retriable: retriable, + errCode: code, + } +} + +func newMilvusErrorWithDetail(msg string, detail string, code int32, retriable bool) milvusError { + return milvusError{ + msg: msg, + detail: detail, + retriable: retriable, + errCode: code, + } +} + +func (e milvusError) code() int32 { + return e.errCode +} + +func (e milvusError) Error() string { + return e.msg +} + +func (e milvusError) Detail() string { + return e.detail +} + +func (e milvusError) Is(err error) bool { + cause := errors.Cause(err) + if cause, ok := cause.(milvusError); ok { + return e.errCode == cause.errCode + } + return false +} + +type multiErrors struct { + errs []error +} + +func (e multiErrors) Unwrap() error { + if len(e.errs) <= 1 { + return nil + } + // To make merr work for multi errors, + // we need cause of multi errors, which defined as the last error + if len(e.errs) == 2 { + return e.errs[1] + } + + return multiErrors{ + errs: e.errs[1:], + } +} + +func (e multiErrors) Error() string { + final := e.errs[0] + for i := 1; i < len(e.errs); i++ { + final = errors.Wrap(e.errs[i], final.Error()) + } + return final.Error() +} + +func (e multiErrors) Is(err error) bool { + for _, item := range e.errs { + if errors.Is(item, err) { + return true + } + } + return false +} + +func Combine(errs ...error) error { + var filtered []error + for _, e := range errs { + if e != nil { + filtered = append(filtered, e) + } + } + if len(filtered) == 0 { + return nil + } + return multiErrors{ + errs, + } +} diff --git a/merr/errors_test.go b/merr/errors_test.go new file mode 100644 index 000000000..7cc82e992 --- /dev/null +++ b/merr/errors_test.go @@ -0,0 +1,77 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package merr + +import ( + "testing" + + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/suite" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" +) + +type ErrSuite struct { + suite.Suite +} + +func (s *ErrSuite) SetupSuite() { +} + +func (s *ErrSuite) TestOldCode() { + s.ErrorIs(OldCodeToMerr(commonpb.ErrorCode_NotReadyServe), ErrServiceNotReady) + s.ErrorIs(OldCodeToMerr(commonpb.ErrorCode_CollectionNotExists), ErrCollectionNotFound) + s.ErrorIs(OldCodeToMerr(commonpb.ErrorCode_IllegalArgument), ErrParameterInvalid) + s.ErrorIs(OldCodeToMerr(commonpb.ErrorCode_NodeIDNotMatch), ErrNodeNotMatch) + s.ErrorIs(OldCodeToMerr(commonpb.ErrorCode_InsufficientMemoryToLoad), ErrServiceMemoryLimitExceeded) + s.ErrorIs(OldCodeToMerr(commonpb.ErrorCode_MemoryQuotaExhausted), ErrServiceMemoryLimitExceeded) + s.ErrorIs(OldCodeToMerr(commonpb.ErrorCode_DiskQuotaExhausted), ErrServiceDiskLimitExceeded) + s.ErrorIs(OldCodeToMerr(commonpb.ErrorCode_RateLimit), ErrServiceRateLimit) + s.ErrorIs(OldCodeToMerr(commonpb.ErrorCode_ForceDeny), ErrServiceForceDeny) + s.ErrorIs(OldCodeToMerr(commonpb.ErrorCode_UnexpectedError), errUnexpected) +} + +func (s *ErrSuite) TestCombine() { + var ( + errFirst = errors.New("first") + errSecond = errors.New("second") + errThird = errors.New("third") + ) + + err := Combine(errFirst, errSecond) + s.True(errors.Is(err, errFirst)) + s.True(errors.Is(err, errSecond)) + s.False(errors.Is(err, errThird)) + + s.Equal("first: second", err.Error()) +} + +func (s *ErrSuite) TestCombineWithNil() { + err := errors.New("non-nil") + + err = Combine(nil, err) + s.NotNil(err) +} + +func (s *ErrSuite) TestCombineOnlyNil() { + err := Combine(nil, nil) + s.Nil(err) +} + +func TestErrors(t *testing.T) { + suite.Run(t, new(ErrSuite)) +} diff --git a/merr/utils.go b/merr/utils.go new file mode 100644 index 000000000..c724730ca --- /dev/null +++ b/merr/utils.go @@ -0,0 +1,230 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package merr + +import ( + "context" + "strings" + + "github.com/cockroachdb/errors" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" +) + +// Code returns the error code of the given error, +// WARN: DO NOT use this for now +func Code(err error) int32 { + if err == nil { + return 0 + } + + cause := errors.Cause(err) + switch cause := cause.(type) { + case milvusError: + return cause.code() + + default: + if errors.Is(cause, context.Canceled) { + return CanceledCode + } else if errors.Is(cause, context.DeadlineExceeded) { + return TimeoutCode + } else { + return errUnexpected.code() + } + } +} + +func IsRetryableErr(err error) bool { + if err, ok := err.(milvusError); ok { + return err.retriable + } + + return false +} + +func IsCanceledOrTimeout(err error) bool { + return errors.IsAny(err, context.Canceled, context.DeadlineExceeded) +} + +// Status returns a status according to the given err, +// returns Success status if err is nil +func Status(err error) *commonpb.Status { + if err == nil { + return &commonpb.Status{} + } + + code := Code(err) + return &commonpb.Status{ + Code: code, + Reason: previousLastError(err).Error(), + // Deprecated, for compatibility + ErrorCode: oldCode(code), + Retriable: IsRetryableErr(err), + Detail: err.Error(), + } +} + +func previousLastError(err error) error { + lastErr := err + for { + nextErr := errors.Unwrap(err) + if nextErr == nil { + break + } + lastErr = err + err = nextErr + } + return lastErr +} + +func CheckRPCCall(resp interface{}, err error) error { + if err != nil { + return err + } + if resp == nil { + return errUnexpected + } + switch resp := resp.(type) { + case interface{ GetStatus() *commonpb.Status }: + return Error(resp.GetStatus()) + case *commonpb.Status: + return Error(resp) + } + return nil +} + +func Success(reason ...string) *commonpb.Status { + status := Status(nil) + // NOLINT + status.Reason = strings.Join(reason, " ") + return status +} + +// Deprecated +func StatusWithErrorCode(err error, code commonpb.ErrorCode) *commonpb.Status { + if err == nil { + return &commonpb.Status{} + } + + return &commonpb.Status{ + Code: Code(err), + Reason: err.Error(), + ErrorCode: code, + } +} + +func oldCode(code int32) commonpb.ErrorCode { + switch code { + case ErrServiceNotReady.code(): + return commonpb.ErrorCode_NotReadyServe + + case ErrCollectionNotFound.code(): + return commonpb.ErrorCode_CollectionNotExists + + case ErrParameterInvalid.code(): + return commonpb.ErrorCode_IllegalArgument + + case ErrNodeNotMatch.code(): + return commonpb.ErrorCode_NodeIDNotMatch + + case ErrCollectionNotFound.code(), ErrPartitionNotFound.code(), ErrReplicaNotFound.code(): + return commonpb.ErrorCode_MetaFailed + + case ErrReplicaNotAvailable.code(), ErrChannelNotAvailable.code(), ErrNodeNotAvailable.code(): + return commonpb.ErrorCode_NoReplicaAvailable + + case ErrServiceMemoryLimitExceeded.code(): + return commonpb.ErrorCode_InsufficientMemoryToLoad + + case ErrServiceRateLimit.code(): + return commonpb.ErrorCode_RateLimit + + case ErrServiceForceDeny.code(): + return commonpb.ErrorCode_ForceDeny + + case ErrIndexNotFound.code(): + return commonpb.ErrorCode_IndexNotExist + + case ErrSegmentNotFound.code(): + return commonpb.ErrorCode_SegmentNotFound + + case ErrChannelLack.code(): + return commonpb.ErrorCode_MetaFailed + + default: + return commonpb.ErrorCode_UnexpectedError + } +} + +func OldCodeToMerr(code commonpb.ErrorCode) error { + switch code { + case commonpb.ErrorCode_NotReadyServe: + return ErrServiceNotReady + + case commonpb.ErrorCode_CollectionNotExists: + return ErrCollectionNotFound + + case commonpb.ErrorCode_IllegalArgument: + return ErrParameterInvalid + + case commonpb.ErrorCode_NodeIDNotMatch: + return ErrNodeNotMatch + + case commonpb.ErrorCode_InsufficientMemoryToLoad, commonpb.ErrorCode_MemoryQuotaExhausted: + return ErrServiceMemoryLimitExceeded + + case commonpb.ErrorCode_DiskQuotaExhausted: + return ErrServiceDiskLimitExceeded + + case commonpb.ErrorCode_RateLimit: + return ErrServiceRateLimit + + case commonpb.ErrorCode_ForceDeny: + return ErrServiceForceDeny + + case commonpb.ErrorCode_IndexNotExist: + return ErrIndexNotFound + + case commonpb.ErrorCode_SegmentNotFound: + return ErrSegmentNotFound + + case commonpb.ErrorCode_MetaFailed: + return ErrChannelNotFound + + default: + return errUnexpected + } +} + +func Ok(status *commonpb.Status) bool { + return status.GetErrorCode() == commonpb.ErrorCode_Success && status.GetCode() == 0 +} + +// Error returns a error according to the given status, +// returns nil if the status is a success status +func Error(status *commonpb.Status) error { + if Ok(status) { + return nil + } + + // use code first + code := status.GetCode() + if code == 0 { + return newMilvusErrorWithDetail(status.GetReason(), status.GetDetail(), Code(OldCodeToMerr(status.GetErrorCode())), false) + } + return newMilvusErrorWithDetail(status.GetReason(), status.GetDetail(), code, status.GetRetriable()) +}