diff --git a/core/resource/resource_status.go b/core/resource/resource_status.go index c9a2153601..e9a712587d 100644 --- a/core/resource/resource_status.go +++ b/core/resource/resource_status.go @@ -80,6 +80,8 @@ func (r *Resource) MarkFailure() error { return nil case StatusToUpdate: r.status = StatusUpdateFailure + case StatusToDelete: + r.status = StatusDeleteFailure return nil } msg := fmt.Sprintf("status transition for [%s] from status [%s] to status failure is not allowed", r.FullName(), r.status) @@ -91,6 +93,9 @@ func (r *Resource) MarkSuccess() error { case StatusToCreate, StatusToUpdate: r.status = StatusSuccess return nil + case StatusToDelete: + r.status = StatusDeleted + return nil } msg := fmt.Sprintf("status transition for [%s] from status [%s] to status success is not allowed", r.FullName(), r.status) return errors.InvalidStateTransition(EntityResource, msg) diff --git a/core/resource/service/resource_manager.go b/core/resource/service/resource_manager.go index 205c0720d2..301a4b2cee 100644 --- a/core/resource/service/resource_manager.go +++ b/core/resource/service/resource_manager.go @@ -20,6 +20,7 @@ const ( type DataStore interface { Create(context.Context, *resource.Resource) error Update(context.Context, *resource.Resource) error + Delete(context.Context, *resource.Resource) error BatchUpdate(context.Context, []*resource.Resource) error Validate(*resource.Resource) error GetURN(res *resource.Resource) (resource.URN, error) @@ -70,6 +71,32 @@ func (m *ResourceMgr) CreateResource(ctx context.Context, res *resource.Resource return me.ToErr() } +// DeleteResource This should only work for MaxCompute Resource deleteion +func (m *ResourceMgr) DeleteResource(ctx context.Context, res *resource.Resource) error { + store := res.Store() + + datastore, ok := m.datastoreMap[store] + if !ok { + msg := fmt.Sprintf("datastore [%s] for resource [%s] is not found", store.String(), res.FullName()) + m.logger.Error(msg) + return errors.InternalError(resource.EntityResource, msg, nil) + } + + me := errors.NewMultiError("error in Delete resource") + err := datastore.Delete(ctx, res) + if err != nil { + m.logger.Error("error Deleting resource [%s] from datastore [%s]: %s", res.FullName(), store.String(), err) + + me.Append(err) + me.Append(res.MarkFailure()) + } else { + me.Append(res.MarkSuccess()) + } + + me.Append(m.repo.UpdateStatus(ctx, res)) + return me.ToErr() +} + func (m *ResourceMgr) UpdateResource(ctx context.Context, res *resource.Resource) error { store := res.Store() datastore, ok := m.datastoreMap[store] diff --git a/core/resource/service/resource_manager_test.go b/core/resource/service/resource_manager_test.go index 32d853c638..cf20abf7bf 100644 --- a/core/resource/service/resource_manager_test.go +++ b/core/resource/service/resource_manager_test.go @@ -730,6 +730,12 @@ func (_m *DataStore) Update(_a0 context.Context, _a1 *resource.Resource) error { return r0 } +// Delete provides a mock function with given fields: _a0, _a1 +func (_m *DataStore) Delete(_a0 context.Context, _a1 *resource.Resource) error { + ret := _m.Called(_a0, _a1) + return ret.Error(0) +} + // Validate provides a mock function with given fields: _a0 func (_m *DataStore) Validate(_a0 *resource.Resource) error { ret := _m.Called(_a0) diff --git a/core/resource/service/resource_service.go b/core/resource/service/resource_service.go index d144fb2906..4532c82898 100644 --- a/core/resource/service/resource_service.go +++ b/core/resource/service/resource_service.go @@ -50,6 +50,7 @@ type Syncer interface { type ResourceManager interface { CreateResource(ctx context.Context, res *resource.Resource) error UpdateResource(ctx context.Context, res *resource.Resource) error + DeleteResource(ctx context.Context, res *resource.Resource) error SyncResource(ctx context.Context, res *resource.Resource) error BatchUpdate(ctx context.Context, store resource.Store, resources []*resource.Resource) error Validate(res *resource.Resource) error @@ -360,11 +361,19 @@ func (rs ResourceService) Delete(ctx context.Context, req *resource.DeleteReques } _ = existing.MarkToDelete() + if existing.Store().Is(resource.MaxCompute) && existing.Kind() == KindExternalTable { + err = rs.mgr.DeleteResource(ctx, existing) + if err != nil { + rs.logger.Error("error deleting resource [%s] from manager: %s", existing.FullName(), err) + return nil, err + } + } + if err = rs.repo.Delete(ctx, existing); err != nil { rs.logger.Error("error soft-delete resource [%s] to db: %s", existing.FullName(), err) return nil, err } - + res := &resource.DeleteResponse{Resource: existing} if strings.TrimSpace(jobNames) != "" { res.DownstreamJobs = strings.Split(jobNames, ", ") diff --git a/core/resource/service/resource_service_test.go b/core/resource/service/resource_service_test.go index 1754fcfa6f..13e9553576 100644 --- a/core/resource/service/resource_service_test.go +++ b/core/resource/service/resource_service_test.go @@ -2028,6 +2028,12 @@ func (_m *ResourceManager) UpdateResource(ctx context.Context, res *resource.Res return r0 } +// DeleteResource provides a mock function with given fields: ctx, res +func (_m *ResourceManager) DeleteResource(ctx context.Context, res *resource.Resource) error { + ret := _m.Called(ctx, res) + return ret.Error(0) +} + // Validate provides a mock function with given fields: res func (_m *ResourceManager) Validate(res *resource.Resource) error { ret := _m.Called(res) diff --git a/core/resource/status.go b/core/resource/status.go index 2d91902ba2..087838c414 100644 --- a/core/resource/status.go +++ b/core/resource/status.go @@ -32,6 +32,7 @@ const ( StatusSkipped Status = "skipped" StatusCreateFailure Status = "create_failure" StatusUpdateFailure Status = "update_failure" + StatusDeleteFailure Status = "delete_failure" StatusExistInStore Status = "exist_in_store" StatusSuccess Status = "success" diff --git a/core/resource/store.go b/core/resource/store.go index b74238df1c..eb0919fddd 100644 --- a/core/resource/store.go +++ b/core/resource/store.go @@ -15,6 +15,10 @@ func (s Store) String() string { return string(s) } +func (s Store) Is(target Store) bool { + return s.String() == target.String() +} + func FromStringToStore(name string) (Store, error) { switch name { case string(Bigquery): diff --git a/ext/store/bigquery/bigquery.go b/ext/store/bigquery/bigquery.go index 0fa3883cb6..78071a8152 100644 --- a/ext/store/bigquery/bigquery.go +++ b/ext/store/bigquery/bigquery.go @@ -62,6 +62,11 @@ type Store struct { clientProvider ClientProvider } +func (s Store) Delete(_ context.Context, _ *resource.Resource) error { + // Optimus does not suppoer deleting BQ resources + return nil +} + func (s Store) Create(ctx context.Context, res *resource.Resource) error { spanCtx, span := startChildSpan(ctx, "bigquery/CreateResource") defer span.End() diff --git a/ext/store/maxcompute/external_table.go b/ext/store/maxcompute/external_table.go index 4954c5f192..e300e4fe4b 100644 --- a/ext/store/maxcompute/external_table.go +++ b/ext/store/maxcompute/external_table.go @@ -43,6 +43,16 @@ func addQuoteSerde(serdeProperties map[string]string) map[string]string { return serdeProperties } +func (e ExternalTableHandle) Delete(res *resource.Resource) error { + et, err := ConvertSpecTo[ExternalTable](res) + if err != nil { + return err + } + + e.mcSQLExecutor.SetCurrentSchemaName(et.Database) + return e.mcExternalTable.Delete(et.Name, true) +} + func (e ExternalTableHandle) Create(res *resource.Resource) error { et, err := ConvertSpecTo[ExternalTable](res) if err != nil { diff --git a/ext/store/maxcompute/maxcompute.go b/ext/store/maxcompute/maxcompute.go index 6ded637d6c..2b44073df4 100644 --- a/ext/store/maxcompute/maxcompute.go +++ b/ext/store/maxcompute/maxcompute.go @@ -26,6 +26,7 @@ const ( type ResourceHandle interface { Create(res *resource.Resource) error Update(res *resource.Resource) error + Delete(res *resource.Resource) error Exists(tableName string) bool } @@ -70,6 +71,35 @@ type MaxCompute struct { maxSyncDelayTolerance time.Duration } +func (m MaxCompute) Delete(ctx context.Context, res *resource.Resource) error { + spanCtx, span := startChildSpan(ctx, "maxcompute/CreateResource") + defer span.End() + + odpsClient, err := m.initializeClient(spanCtx, res.Tenant(), accountKey) + if err != nil { + return err + } + + projectSchema, _, err := getCompleteComponentName(res) + if err != nil { + return err + } + + switch res.Kind() { + case KindExternalTable: + maskingPolicyClient, err := m.initializeClient(spanCtx, res.Tenant(), accountMaskPolicyKey) + if err != nil { + maskingPolicyClient = odpsClient + } + maskingPolicyHandle := maskingPolicyClient.TableMaskingPolicyHandleFrom(projectSchema) + + handle := odpsClient.ExternalTableHandleFrom(projectSchema, m.tenantGetter, maskingPolicyHandle) + return handle.Delete(res) + default: + return errors.InvalidArgument(store, "invalid kind for maxcompute resource deletion "+res.Kind()) + } +} + func (m MaxCompute) Create(ctx context.Context, res *resource.Resource) error { spanCtx, span := startChildSpan(ctx, "maxcompute/CreateResource") defer span.End() diff --git a/ext/store/maxcompute/maxcompute_test.go b/ext/store/maxcompute/maxcompute_test.go index 820bfdd639..58af8166cd 100644 --- a/ext/store/maxcompute/maxcompute_test.go +++ b/ext/store/maxcompute/maxcompute_test.go @@ -707,6 +707,11 @@ func (m *mockTableResourceHandle) Update(res *resource.Resource) error { return args.Error(0) } +func (m *mockTableResourceHandle) Delete(res *resource.Resource) error { + args := m.Called(res) + return args.Error(0) +} + func (m *mockTableResourceHandle) Exists(tableName string) bool { args := m.Called(tableName) return args.Get(0).(bool) diff --git a/ext/store/maxcompute/schema_handle.go b/ext/store/maxcompute/schema_handle.go index a5037d8169..77fa429c04 100644 --- a/ext/store/maxcompute/schema_handle.go +++ b/ext/store/maxcompute/schema_handle.go @@ -27,6 +27,10 @@ func (sh SchemaHandle) Create(res *resource.Resource) error { return nil } +func (sh SchemaHandle) Delete(res *resource.Resource) error { + return nil +} + func (sh SchemaHandle) Update(res *resource.Resource) error { schemaDetails, err := ConvertSpecToSchemaDetails(res) if err != nil { diff --git a/ext/store/maxcompute/table.go b/ext/store/maxcompute/table.go index e353e25859..9b4b1715a8 100644 --- a/ext/store/maxcompute/table.go +++ b/ext/store/maxcompute/table.go @@ -76,6 +76,10 @@ func (t TableHandle) Create(res *resource.Resource) error { return nil } +func (t TableHandle) Delete(res *resource.Resource) error { + return nil +} + func (t TableHandle) Update(res *resource.Resource) error { table, err := ConvertSpecTo[Table](res) if err != nil { diff --git a/ext/store/maxcompute/view.go b/ext/store/maxcompute/view.go index a56e6e6ed4..7bab7d6058 100644 --- a/ext/store/maxcompute/view.go +++ b/ext/store/maxcompute/view.go @@ -29,6 +29,10 @@ type ViewHandle struct { viewTable ViewTable } +func (v ViewHandle) Delete(res *resource.Resource) error { + return nil +} + func (v ViewHandle) Create(res *resource.Resource) error { view, err := ConvertSpecTo[View](res) if err != nil {