Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(go-sdk): prefix action names with workflow name #655

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/webhook/main_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func TestWebhook(t *testing.T) {
handler := func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodPut {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(fmt.Sprintf(`{"actions": ["default:%s"]}`, "webhook-failure-step-one")))
_, _ = w.Write([]byte(fmt.Sprintf(`{"actions": ["%s:default:%s"]}`, workflow, "webhook-failure-step-one")))
return
}
w.WriteHeader(http.StatusInternalServerError) // simulate a failure
Expand Down
29 changes: 13 additions & 16 deletions pkg/client/types/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,15 @@ type Action struct {
// Required. The verb to perform.
Verb string

// Optional. A way to unique identify the step.
Subresource string
// The workflow name. Optional for compatibility reasons.
Workflow string
}

func (o Action) String() string {
if o.Subresource != "" {
return fmt.Sprintf("%s:%s:%s", o.Service, o.Verb, o.Subresource)
if o.Workflow != "" {
return fmt.Sprintf("%s:%s:%s", o.Workflow, o.Service, o.Verb)
}

return o.IntegrationVerbString()
}

func (o Action) IntegrationVerbString() string {
return fmt.Sprintf("%s:%s", o.Service, o.Verb)
}

Expand All @@ -39,18 +35,19 @@ func ParseActionID(actionID string) (Action, error) {
return Action{}, fmt.Errorf("invalid action id %s, must have at least 2 strings separated : (colon)", actionID)
}

Service := firstToLower(parts[0])
verb := strings.ToLower(parts[1])

var subresource string
var workflow string
if numParts == 3 {
subresource = firstToLower(parts[2])
workflow = firstToLower(parts[0])
parts = parts[1:]
}

service := firstToLower(parts[0])
verb := strings.ToLower(parts[1])

return Action{
Service: Service,
Verb: verb,
Subresource: subresource,
Service: service,
Verb: verb,
Workflow: workflow,
}, nil
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/worker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (s *Service) On(t triggerConverter, workflow workflowConverter) error {
}
}

err = s.worker.registerAction(parsedAction.Service, parsedAction.Verb, fn)
err = s.worker.registerAction(apiWorkflow.Name, parsedAction.Service, parsedAction.Verb, fn)

if err != nil {
return err
Expand Down Expand Up @@ -89,7 +89,7 @@ func (s *Service) RegisterAction(fn any, opts ...RegisterActionOpt) error {
fnOpts.name = getFnName(fn)
}

return s.worker.registerAction(s.Name, fnOpts.name, fn)
return s.worker.registerAction("", s.Name, fnOpts.name, fn)
}

func (s *Service) Call(verb string) *WorkflowStep {
Expand Down
8 changes: 6 additions & 2 deletions pkg/worker/webhook_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,12 @@ func (w *Worker) WebhookHttpHandler(opts WebhookHandlerOptions, workflows ...wor
func (w *Worker) webhookProcess(ctx HatchetContext) (interface{}, error) {
var do Action
for _, action := range w.actions {
split := strings.Split(action.Name(), ":") // service:action
if split[1] == ctx.StepName() {
split := strings.Split(action.Name(), ":") // service:action or workflow:service:action
if len(split) == 3 && split[3-1] == ctx.StepName() {
do = action
break
}
if len(split) == 2 && split[2-1] == ctx.StepName() { // compatibility with old actions
do = action
break
}
Expand Down
15 changes: 11 additions & 4 deletions pkg/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func NewWorker(fs ...WorkerOpt) (*Worker, error) {
for _, integrationAction := range actions {
action := fmt.Sprintf("%s:%s", integrationId, integrationAction)

err := w.registerAction(integrationId, action, integration.ActionHandler(integrationAction))
err := w.registerAction("integration", integrationId, action, integration.ActionHandler(integrationAction))

if err != nil {
return nil, fmt.Errorf("could not register integration action %s: %w", action, err)
Expand Down Expand Up @@ -288,11 +288,18 @@ func (w *Worker) RegisterAction(actionId string, method any) error {
return fmt.Errorf("could not parse action id: %w", err)
}

return w.registerAction(action.Service, action.Verb, method)
return w.registerAction("", action.Service, action.Verb, method)
}

func (w *Worker) registerAction(service, verb string, method any) error {
actionId := fmt.Sprintf("%s:%s", service, verb)
func (w *Worker) registerAction(wf, service, verb string, method any) error {
wf = convertWorkflowNameToAction(wf)

var actionId string
if wf != "" {
actionId = fmt.Sprintf("%s:%s:%s", wf, service, verb)
} else {
actionId = fmt.Sprintf("%s:%s", service, verb)
}

// if the service is "concurrency", then this is a special action
if service == "concurrency" {
Expand Down
24 changes: 15 additions & 9 deletions pkg/worker/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (j *WorkflowJob) ToWorkflow(svcName string, namespace string) types.Workflo

if j.Concurrency != nil {
w.Concurrency = &types.WorkflowConcurrency{
ActionID: "concurrency:" + getFnName(j.Concurrency.fn), // TODO this should also be namespaced
ActionID: w.Name + ":concurrency:" + getFnName(j.Concurrency.fn),
}

if j.Concurrency.maxRuns != nil {
Expand All @@ -234,7 +234,7 @@ func (j *WorkflowJob) ToWorkflowJob(svcName string, namespace string) (*types.Wo

for i := range j.Steps {

newStep, err := j.Steps[i].ToWorkflowStep(svcName, i, namespace)
newStep, err := j.Steps[i].ToWorkflowStep(j.Name, svcName, i, namespace)

if err != nil {
return nil, err
Expand All @@ -254,13 +254,13 @@ func (j *WorkflowJob) ToActionMap(svcName string) map[string]any {
res := map[string]any{}

for i, step := range j.Steps {
actionId := step.GetActionId(svcName, i)
actionId := step.GetActionId(j.Name, svcName, i)

res[actionId] = step.Function
}

if j.Concurrency != nil {
res["concurrency:"+getFnName(j.Concurrency.fn)] = j.Concurrency.fn
res[j.Name+":concurrency:"+getFnName(j.Concurrency.fn)] = j.Concurrency.fn
}

if j.OnFailure != nil {
Expand Down Expand Up @@ -364,7 +364,7 @@ func (w *WorkflowStep) ToActionMap(svcName string) map[string]any {
step := *w

return map[string]any{
step.GetActionId(svcName, 0): w.Function,
step.GetActionId(w.Name, svcName, 0): w.Function,
}
}

Expand All @@ -380,7 +380,7 @@ type Step struct {
APIStep types.WorkflowStep
}

func (w *WorkflowStep) ToWorkflowStep(svcName string, index int, namespace string) (*Step, error) {
func (w *WorkflowStep) ToWorkflowStep(wfName, svcName string, index int, namespace string) (*Step, error) {
fnType := reflect.TypeOf(w.Function)

res := &Step{}
Expand All @@ -391,7 +391,7 @@ func (w *WorkflowStep) ToWorkflowStep(svcName string, index int, namespace strin
Name: res.Id,
ID: w.GetStepId(index),
Timeout: w.Timeout,
ActionID: w.GetActionId(svcName, index),
ActionID: w.GetActionId(wfName, svcName, index),
Parents: []string{},
Retries: w.Retries,
DesiredLabels: w.DesiredLabels,
Expand Down Expand Up @@ -460,10 +460,12 @@ func (w *WorkflowStep) GetStepId(index int) string {
return stepId
}

func (w *WorkflowStep) GetActionId(svcName string, index int) string {
func (w *WorkflowStep) GetActionId(wfName, svcName string, index int) string {
stepId := w.GetStepId(index)

return fmt.Sprintf("%s:%s", svcName, stepId)
wf := convertWorkflowNameToAction(wfName)

return fmt.Sprintf("%s:%s:%s", wf, svcName, stepId)
}

func getFnName(fn any) string {
Expand All @@ -482,3 +484,7 @@ func getFnName(fn any) string {

return strings.ReplaceAll(fnName, ".", "-")
}

func convertWorkflowNameToAction(workflowName string) string {
return strings.ReplaceAll(strings.ToLower(workflowName), " ", "-")
}
Loading