Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions core/resource/resource_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ func (r *Resource) MarkFailure() error {
return nil
case StatusToUpdate:
r.status = StatusUpdateFailure
case StatusToDelete:
r.status = StatusDeleteFailure
return nil
}
msg := fmt.Sprintf("status transition for [%s] from status [%s] to status failure is not allowed", r.FullName(), r.status)
Expand All @@ -91,6 +93,9 @@ func (r *Resource) MarkSuccess() error {
case StatusToCreate, StatusToUpdate:
r.status = StatusSuccess
return nil
case StatusToDelete:
r.status = StatusDeleted
return nil
}
msg := fmt.Sprintf("status transition for [%s] from status [%s] to status success is not allowed", r.FullName(), r.status)
return errors.InvalidStateTransition(EntityResource, msg)
Expand Down
27 changes: 27 additions & 0 deletions core/resource/service/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const (
type DataStore interface {
Create(context.Context, *resource.Resource) error
Update(context.Context, *resource.Resource) error
Delete(context.Context, *resource.Resource) error
BatchUpdate(context.Context, []*resource.Resource) error
Validate(*resource.Resource) error
GetURN(res *resource.Resource) (resource.URN, error)
Expand Down Expand Up @@ -70,6 +71,32 @@ func (m *ResourceMgr) CreateResource(ctx context.Context, res *resource.Resource
return me.ToErr()
}

// DeleteResource This should only work for MaxCompute Resource deleteion
func (m *ResourceMgr) DeleteResource(ctx context.Context, res *resource.Resource) error {
store := res.Store()

datastore, ok := m.datastoreMap[store]
if !ok {
msg := fmt.Sprintf("datastore [%s] for resource [%s] is not found", store.String(), res.FullName())
m.logger.Error(msg)
return errors.InternalError(resource.EntityResource, msg, nil)
}

me := errors.NewMultiError("error in Delete resource")
err := datastore.Delete(ctx, res)
if err != nil {
m.logger.Error("error Deleting resource [%s] from datastore [%s]: %s", res.FullName(), store.String(), err)

me.Append(err)
me.Append(res.MarkFailure())
} else {
me.Append(res.MarkSuccess())
}

me.Append(m.repo.UpdateStatus(ctx, res))
return me.ToErr()
}

func (m *ResourceMgr) UpdateResource(ctx context.Context, res *resource.Resource) error {
store := res.Store()
datastore, ok := m.datastoreMap[store]
Expand Down
6 changes: 6 additions & 0 deletions core/resource/service/resource_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,12 @@ func (_m *DataStore) Update(_a0 context.Context, _a1 *resource.Resource) error {
return r0
}

// Delete provides a mock function with given fields: _a0, _a1
func (_m *DataStore) Delete(_a0 context.Context, _a1 *resource.Resource) error {
ret := _m.Called(_a0, _a1)
return ret.Error(0)
}

// Validate provides a mock function with given fields: _a0
func (_m *DataStore) Validate(_a0 *resource.Resource) error {
ret := _m.Called(_a0)
Expand Down
11 changes: 10 additions & 1 deletion core/resource/service/resource_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
type ResourceManager interface {
CreateResource(ctx context.Context, res *resource.Resource) error
UpdateResource(ctx context.Context, res *resource.Resource) error
DeleteResource(ctx context.Context, res *resource.Resource) error
SyncResource(ctx context.Context, res *resource.Resource) error
BatchUpdate(ctx context.Context, store resource.Store, resources []*resource.Resource) error
Validate(res *resource.Resource) error
Expand Down Expand Up @@ -360,11 +361,19 @@
}
_ = existing.MarkToDelete()

if existing.Store().Is(resource.MaxCompute) && existing.Kind() == KindExternalTable {
err = rs.mgr.DeleteResource(ctx, existing)
if err != nil {
rs.logger.Error("error deleting resource [%s] from manager: %s", existing.FullName(), err)
return nil, err
}
}

if err = rs.repo.Delete(ctx, existing); err != nil {
rs.logger.Error("error soft-delete resource [%s] to db: %s", existing.FullName(), err)
return nil, err
}

Check failure on line 376 in core/resource/service/resource_service.go

View workflow job for this annotation

GitHub Actions / lint

File is not properly formatted (gci)
res := &resource.DeleteResponse{Resource: existing}
if strings.TrimSpace(jobNames) != "" {
res.DownstreamJobs = strings.Split(jobNames, ", ")
Expand Down
6 changes: 6 additions & 0 deletions core/resource/service/resource_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2028,6 +2028,12 @@ func (_m *ResourceManager) UpdateResource(ctx context.Context, res *resource.Res
return r0
}

// DeleteResource provides a mock function with given fields: ctx, res
func (_m *ResourceManager) DeleteResource(ctx context.Context, res *resource.Resource) error {
ret := _m.Called(ctx, res)
return ret.Error(0)
}

// Validate provides a mock function with given fields: res
func (_m *ResourceManager) Validate(res *resource.Resource) error {
ret := _m.Called(res)
Expand Down
1 change: 1 addition & 0 deletions core/resource/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const (
StatusSkipped Status = "skipped"
StatusCreateFailure Status = "create_failure"
StatusUpdateFailure Status = "update_failure"
StatusDeleteFailure Status = "delete_failure"
StatusExistInStore Status = "exist_in_store"
StatusSuccess Status = "success"

Expand Down
4 changes: 4 additions & 0 deletions core/resource/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ func (s Store) String() string {
return string(s)
}

func (s Store) Is(target Store) bool {
return s.String() == target.String()
}

func FromStringToStore(name string) (Store, error) {
switch name {
case string(Bigquery):
Expand Down
5 changes: 5 additions & 0 deletions ext/store/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@
clientProvider ClientProvider
}

func (s Store) Delete(_ context.Context, _ *resource.Resource) error {

Check failure on line 65 in ext/store/bigquery/bigquery.go

View workflow job for this annotation

GitHub Actions / lint

unused-receiver: method receiver 's' is not referenced in method's body, consider removing or renaming it as _ (revive)
// Optimus does not suppoer deleting BQ resources
return nil
}

func (s Store) Create(ctx context.Context, res *resource.Resource) error {
spanCtx, span := startChildSpan(ctx, "bigquery/CreateResource")
defer span.End()
Expand Down
10 changes: 10 additions & 0 deletions ext/store/maxcompute/external_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@ func addQuoteSerde(serdeProperties map[string]string) map[string]string {
return serdeProperties
}

func (e ExternalTableHandle) Delete(res *resource.Resource) error {
et, err := ConvertSpecTo[ExternalTable](res)
if err != nil {
return err
}

e.mcSQLExecutor.SetCurrentSchemaName(et.Database)
return e.mcExternalTable.Delete(et.Name, true)
}

func (e ExternalTableHandle) Create(res *resource.Resource) error {
et, err := ConvertSpecTo[ExternalTable](res)
if err != nil {
Expand Down
30 changes: 30 additions & 0 deletions ext/store/maxcompute/maxcompute.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const (
type ResourceHandle interface {
Create(res *resource.Resource) error
Update(res *resource.Resource) error
Delete(res *resource.Resource) error
Exists(tableName string) bool
}

Expand Down Expand Up @@ -70,6 +71,35 @@ type MaxCompute struct {
maxSyncDelayTolerance time.Duration
}

func (m MaxCompute) Delete(ctx context.Context, res *resource.Resource) error {
spanCtx, span := startChildSpan(ctx, "maxcompute/CreateResource")
defer span.End()

odpsClient, err := m.initializeClient(spanCtx, res.Tenant(), accountKey)
if err != nil {
return err
}

projectSchema, _, err := getCompleteComponentName(res)
if err != nil {
return err
}

switch res.Kind() {
case KindExternalTable:
maskingPolicyClient, err := m.initializeClient(spanCtx, res.Tenant(), accountMaskPolicyKey)
if err != nil {
maskingPolicyClient = odpsClient
}
maskingPolicyHandle := maskingPolicyClient.TableMaskingPolicyHandleFrom(projectSchema)

handle := odpsClient.ExternalTableHandleFrom(projectSchema, m.tenantGetter, maskingPolicyHandle)
return handle.Delete(res)
default:
return errors.InvalidArgument(store, "invalid kind for maxcompute resource deletion "+res.Kind())
}
}

func (m MaxCompute) Create(ctx context.Context, res *resource.Resource) error {
spanCtx, span := startChildSpan(ctx, "maxcompute/CreateResource")
defer span.End()
Expand Down
5 changes: 5 additions & 0 deletions ext/store/maxcompute/maxcompute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,11 @@ func (m *mockTableResourceHandle) Update(res *resource.Resource) error {
return args.Error(0)
}

func (m *mockTableResourceHandle) Delete(res *resource.Resource) error {
args := m.Called(res)
return args.Error(0)
}

func (m *mockTableResourceHandle) Exists(tableName string) bool {
args := m.Called(tableName)
return args.Get(0).(bool)
Expand Down
4 changes: 4 additions & 0 deletions ext/store/maxcompute/schema_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
return nil
}

func (sh SchemaHandle) Delete(res *resource.Resource) error {

Check failure on line 30 in ext/store/maxcompute/schema_handle.go

View workflow job for this annotation

GitHub Actions / lint

unused-receiver: method receiver 'sh' is not referenced in method's body, consider removing or renaming it as _ (revive)
return nil
}

func (sh SchemaHandle) Update(res *resource.Resource) error {
schemaDetails, err := ConvertSpecToSchemaDetails(res)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions ext/store/maxcompute/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@
return nil
}

func (t TableHandle) Delete(res *resource.Resource) error {

Check failure on line 79 in ext/store/maxcompute/table.go

View workflow job for this annotation

GitHub Actions / lint

unused-receiver: method receiver 't' is not referenced in method's body, consider removing or renaming it as _ (revive)
return nil
}

func (t TableHandle) Update(res *resource.Resource) error {
table, err := ConvertSpecTo[Table](res)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions ext/store/maxcompute/view.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
viewTable ViewTable
}

func (v ViewHandle) Delete(res *resource.Resource) error {

Check failure on line 32 in ext/store/maxcompute/view.go

View workflow job for this annotation

GitHub Actions / lint

unused-receiver: method receiver 'v' is not referenced in method's body, consider removing or renaming it as _ (revive)
return nil
}

func (v ViewHandle) Create(res *resource.Resource) error {
view, err := ConvertSpecTo[View](res)
if err != nil {
Expand Down
Loading