Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: soft delete a workflow, integration tests, fctl integration #379

Merged
merged 14 commits into from
Jul 7, 2023
94 changes: 94 additions & 0 deletions components/fctl/cmd/orchestration/workflows/delete.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package workflows

import (
"fmt"

fctl "github.com/formancehq/fctl/pkg"
"github.com/formancehq/formance-sdk-go/pkg/models/operations"
"github.com/pkg/errors"
"github.com/pterm/pterm"
"github.com/spf13/cobra"
)

type WorkflowsDeleteStore struct {
WorkflowId string `json:"workflowId"`
Success bool `json:"success"`
}
type WorkflowsDeleteController struct {
store *WorkflowsDeleteStore
}

var _ fctl.Controller[*WorkflowsDeleteStore] = (*WorkflowsDeleteController)(nil)

func NewDefaultWorkflowsDeleteStore() *WorkflowsDeleteStore {
return &WorkflowsDeleteStore{}
}

func NewWorkflowsDeleteController() *WorkflowsDeleteController {
return &WorkflowsDeleteController{
store: NewDefaultWorkflowsDeleteStore(),
}
}
func NewDeleteCommand() *cobra.Command {
return fctl.NewCommand("delete <workflow-id>",
fctl.WithAliases("del", "d"),
fctl.WithShortDescription("Soft delete a workflow"),
fctl.WithArgs(cobra.ExactArgs(1)),
fctl.WithController[*WorkflowsDeleteStore](NewWorkflowsDeleteController()),
)
}

func (c *WorkflowsDeleteController) GetStore() *WorkflowsDeleteStore {
return c.store
}

func (c *WorkflowsDeleteController) Run(cmd *cobra.Command, args []string) (fctl.Renderable, error) {
cfg, err := fctl.GetConfig(cmd)
if err != nil {
return nil, errors.Wrap(err, "retrieving config")
}

organizationID, err := fctl.ResolveOrganizationID(cmd, cfg)
if err != nil {
return nil, err
}

stack, err := fctl.ResolveStack(cmd, cfg, organizationID)
if err != nil {
return nil, err
}

client, err := fctl.NewStackClient(cmd, cfg, stack)
if err != nil {
return nil, errors.Wrap(err, "creating stack client")
}

response, err := client.Orchestration.DeleteWorkflow(
cmd.Context(),
operations.DeleteWorkflowRequest{
FlowID: args[0],
},
)

if err != nil {
return nil, err
}

if response.Error != nil {
return nil, fmt.Errorf("%s: %s", response.Error.ErrorCode, response.Error.ErrorMessage)
}

if response.StatusCode >= 300 {
return nil, fmt.Errorf("unexpected status code: %d", response.StatusCode)
}

c.store.WorkflowId = args[0]
c.store.Success = true

return c, nil
}

func (c *WorkflowsDeleteController) Render(cmd *cobra.Command, args []string) error {
pterm.Success.WithShowLineNumber().Printfln("Workflow %s Deleted!", c.store.WorkflowId)
return nil
}
1 change: 1 addition & 0 deletions components/fctl/cmd/orchestration/workflows/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ func NewCommand() *cobra.Command {
NewCreateCommand(),
NewRunCommand(),
NewShowCommand(),
NewDeleteCommand(),
),
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package api

import (
"errors"
"net/http"

"github.com/formancehq/orchestration/internal/workflow"
"github.com/formancehq/stack/libs/go-libs/api"
"github.com/go-playground/validator/v10"
)

var (
ErrEmptyID = errors.New("ID is empty")
)

func deleteWorkflow(m *workflow.Manager) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
id := workflowID(r)

err := validator.New().Var(id, "required,uuid")
if err != nil {
api.BadRequest(w, "VALIDATION", err)
return
}

err = m.DeleteWorkflow(r.Context(), workflowID(r))

if errors.Is(err, workflow.ErrWorkflowNotFound) {
api.NotFound(w)
return
}

if err != nil {
api.InternalServerError(w, r, err)
return
}

api.NoContent(w)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package api

import (
"bytes"
"fmt"
"net/http"
"net/http/httptest"
"testing"

"github.com/formancehq/orchestration/internal/workflow"
"github.com/formancehq/stack/libs/go-libs/api/apitesting"
"github.com/go-chi/chi/v5"
"github.com/stretchr/testify/require"

"github.com/uptrace/bun"
)

func TestDeleteWorkflow(t *testing.T) {
test(t, func(router *chi.Mux, m *workflow.Manager, db *bun.DB) {
// Create a workflow
req := httptest.NewRequest(http.MethodPost, "/workflows", bytes.NewBufferString(`{"stages": []}`))
rec := httptest.NewRecorder()

router.ServeHTTP(rec, req)

require.Equal(t, http.StatusCreated, rec.Result().StatusCode)

workflow := workflow.Workflow{}
apitesting.ReadResponse(t, rec, &workflow)

require.NotEmpty(t, workflow.ID)

// Delete the workflow
req = httptest.NewRequest(http.MethodDelete, fmt.Sprintf("/workflows/%s/", workflow.ID), nil)
rec = httptest.NewRecorder()

router.ServeHTTP(rec, req)

require.Equal(t, http.StatusNoContent, rec.Result().StatusCode)
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package api

import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"testing"
Expand All @@ -16,7 +17,7 @@ import (

func TestListInstances(t *testing.T) {
test(t, func(router *chi.Mux, m *workflow.Manager, db *bun.DB) {

// Create a workflow with 10 instances
w := workflow.New(workflow.Config{})
_, err := db.NewInsert().Model(&w).Exec(context.TODO())
require.NoError(t, err)
Expand All @@ -34,8 +35,9 @@ func TestListInstances(t *testing.T) {
rec := httptest.NewRecorder()

router.ServeHTTP(rec, req)

require.Equal(t, http.StatusOK, rec.Result().StatusCode)

// Retrieve only running instances
instances := make([]workflow.Instance, 0)
apitesting.ReadResponse(t, rec, &instances)
require.Len(t, instances, 10)
Expand All @@ -48,5 +50,25 @@ func TestListInstances(t *testing.T) {
require.Equal(t, http.StatusOK, rec.Result().StatusCode)
apitesting.ReadResponse(t, rec, &instances)
require.Len(t, instances, 6)

// Delete the workflow
req = httptest.NewRequest(http.MethodDelete, fmt.Sprintf("/workflows/%s/", w.ID), nil)
rec = httptest.NewRecorder()

router.ServeHTTP(rec, req)

require.Equal(t, http.StatusNoContent, rec.Result().StatusCode)

// Try to retrieve instances for the deleted workflow
req = httptest.NewRequest(http.MethodGet, "/instances", nil)
rec = httptest.NewRecorder()

router.ServeHTTP(rec, req)

require.Equal(t, http.StatusOK, rec.Result().StatusCode)
instances = make([]workflow.Instance, 0)
apitesting.ReadResponse(t, rec, &instances)
require.Len(t, instances, 0)

})
}
1 change: 1 addition & 0 deletions components/orchestration/internal/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func newRouter(m *workflow.Manager, info ServiceInfo, healthController *health.H
r.Get("/", listWorkflows(m))
r.Post("/", createWorkflow(m))
r.Route("/{workflowId}", func(r chi.Router) {
r.Delete("/", deleteWorkflow(m))
r.Get("/", readWorkflow(m))
r.Route("/instances", func(r chi.Router) {
r.Post("/", runWorkflow(m))
Expand Down
10 changes: 10 additions & 0 deletions components/orchestration/internal/storage/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,5 +69,15 @@ func registerMigrations(migrator *migrations.Migrator) {
return nil
},
},
migrations.Migration{
Up: func(tx bun.Tx) error {
if _, err := tx.Exec(`
alter table "workflows" add column deleted_at timestamp default null;
`); err != nil {
return err
}
return nil
},
},
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package migrations

import (
"database/sql"

"github.com/pressly/goose/v3"
)

func init() {
goose.AddMigration(upAddColumnDeletedAt, downRemoveColumnDeletedAt)
}

func upAddColumnDeletedAt(tx *sql.Tx) error {
if _, err := tx.Exec(`
alter table "workflows" add column deleted_at timestamp default null;
`); err != nil {
return err
}
return nil
}

func downRemoveColumnDeletedAt(tx *sql.Tx) error {
if _, err := tx.Exec(`
alter table "workflows" drop column deleted_at;
`); err != nil {
return err
}
return nil
}
31 changes: 29 additions & 2 deletions components/orchestration/internal/workflow/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

var (
ErrInstanceNotFound = errors.New("Instance not found")
ErrWorkflowNotFound = errors.New("Workflow not found")
)

const (
Expand Down Expand Up @@ -51,6 +52,27 @@ func (m *Manager) Create(ctx context.Context, config Config) (*Workflow, error)
return &workflow, nil
}

func (m *Manager) DeleteWorkflow(ctx context.Context, id string) error {

var workflow Workflow

res, err := m.db.NewUpdate().Model(&workflow).Where("id = ?", id).Set("deleted_at = ?", time.Now()).Exec(ctx)

if err != nil {
return err
}

r, err := res.RowsAffected()
if err != nil {
return err
}
if r == 0 {
return ErrWorkflowNotFound
}

return nil
}

func (m *Manager) RunWorkflow(ctx context.Context, id string, variables map[string]string) (Instance, error) {

workflow := Workflow{}
Expand Down Expand Up @@ -101,6 +123,7 @@ func (m *Manager) ListWorkflows(ctx context.Context) ([]Workflow, error) {
workflows := make([]Workflow, 0)
if err := m.db.NewSelect().
Model(&workflows).
Where("deleted_at IS NULL").
Scan(ctx); err != nil {
return nil, err
}
Expand Down Expand Up @@ -152,12 +175,16 @@ func (m *Manager) AbortRun(ctx context.Context, instanceID string) error {
func (m *Manager) ListInstances(ctx context.Context, workflowID string, running bool) ([]Instance, error) {
instances := make([]Instance, 0)
query := m.db.NewSelect().Model(&instances)

query.Join("JOIN workflows ON workflows.id = u.workflow_id").Where("workflows.deleted_at IS NULL")

if workflowID != "" {
query = query.Where("workflow_id = ?", workflowID)
query = query.Where("workflows.workflow_id = ?", workflowID)
}
if running {
query = query.Where("terminated = false")
query = query.Where("u.terminated = false")
}

if err := query.Scan(ctx); err != nil {
return nil, errors.Wrap(err, "retrieving workflow")
}
Expand Down
9 changes: 5 additions & 4 deletions components/orchestration/internal/workflow/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ import (

type Workflow struct {
bun.BaseModel `bun:"table:workflows"`
ID string `json:"id" bun:",pk"`
Config Config `json:"config"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
ID string `json:"id" bun:",pk"`
Config Config `json:"config"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
DeletedAt *time.Time `json:"deletedAt"`
}

func New(config Config) Workflow {
Expand Down
13 changes: 12 additions & 1 deletion components/orchestration/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,18 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/GetWorkflowResponse'

delete:
summary: Delete a flow by id
tags:
- Orchestration
description: Delete a flow by id
operationId: deleteWorkflow
responses:
default:
$ref: '#/components/responses/ErrorResponse'
204:
description: No content

/workflows/{workflowID}/instances:
parameters:
- in: path
Expand Down
Loading