diff --git a/examples/webhook/main_e2e_test.go b/examples/webhook/main_e2e_test.go index 04acafe2f..3dd6612c6 100644 --- a/examples/webhook/main_e2e_test.go +++ b/examples/webhook/main_e2e_test.go @@ -156,7 +156,7 @@ func TestWebhook(t *testing.T) { handler := func(w http.ResponseWriter, r *http.Request) { if r.Method == http.MethodPut { w.WriteHeader(http.StatusOK) - _, _ = w.Write([]byte(fmt.Sprintf(`{"actions": ["default:%s"]}`, "webhook-failure-step-one"))) + _, _ = w.Write([]byte(fmt.Sprintf(`{"actions": ["%s:default:%s"]}`, workflow, "webhook-failure-step-one"))) return } w.WriteHeader(http.StatusInternalServerError) // simulate a failure diff --git a/pkg/client/types/action.go b/pkg/client/types/action.go index d96d9b612..d7f101543 100644 --- a/pkg/client/types/action.go +++ b/pkg/client/types/action.go @@ -14,19 +14,15 @@ type Action struct { // Required. The verb to perform. Verb string - // Optional. A way to unique identify the step. - Subresource string + // The workflow name. Optional for compatibility reasons. + Workflow string } func (o Action) String() string { - if o.Subresource != "" { - return fmt.Sprintf("%s:%s:%s", o.Service, o.Verb, o.Subresource) + if o.Workflow != "" { + return fmt.Sprintf("%s:%s:%s", o.Workflow, o.Service, o.Verb) } - return o.IntegrationVerbString() -} - -func (o Action) IntegrationVerbString() string { return fmt.Sprintf("%s:%s", o.Service, o.Verb) } @@ -39,18 +35,19 @@ func ParseActionID(actionID string) (Action, error) { return Action{}, fmt.Errorf("invalid action id %s, must have at least 2 strings separated : (colon)", actionID) } - Service := firstToLower(parts[0]) - verb := strings.ToLower(parts[1]) - - var subresource string + var workflow string if numParts == 3 { - subresource = firstToLower(parts[2]) + workflow = firstToLower(parts[0]) + parts = parts[1:] } + service := firstToLower(parts[0]) + verb := strings.ToLower(parts[1]) + return Action{ - Service: Service, - Verb: verb, - Subresource: subresource, + Service: service, + Verb: verb, + Workflow: workflow, }, nil } diff --git a/pkg/worker/service.go b/pkg/worker/service.go index 5bf71c5f5..38076993e 100644 --- a/pkg/worker/service.go +++ b/pkg/worker/service.go @@ -56,7 +56,7 @@ func (s *Service) On(t triggerConverter, workflow workflowConverter) error { } } - err = s.worker.registerAction(parsedAction.Service, parsedAction.Verb, fn) + err = s.worker.registerAction(apiWorkflow.Name, parsedAction.Service, parsedAction.Verb, fn) if err != nil { return err @@ -89,7 +89,7 @@ func (s *Service) RegisterAction(fn any, opts ...RegisterActionOpt) error { fnOpts.name = getFnName(fn) } - return s.worker.registerAction(s.Name, fnOpts.name, fn) + return s.worker.registerAction("", s.Name, fnOpts.name, fn) } func (s *Service) Call(verb string) *WorkflowStep { diff --git a/pkg/worker/webhook_handler.go b/pkg/worker/webhook_handler.go index 1a21d4276..5708e7aa8 100644 --- a/pkg/worker/webhook_handler.go +++ b/pkg/worker/webhook_handler.go @@ -152,8 +152,12 @@ func (w *Worker) WebhookHttpHandler(opts WebhookHandlerOptions, workflows ...wor func (w *Worker) webhookProcess(ctx HatchetContext) (interface{}, error) { var do Action for _, action := range w.actions { - split := strings.Split(action.Name(), ":") // service:action - if split[1] == ctx.StepName() { + split := strings.Split(action.Name(), ":") // service:action or workflow:service:action + if len(split) == 3 && split[3-1] == ctx.StepName() { + do = action + break + } + if len(split) == 2 && split[2-1] == ctx.StepName() { // compatibility with old actions do = action break } diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 6ff2e6365..76c2959b2 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -216,7 +216,7 @@ func NewWorker(fs ...WorkerOpt) (*Worker, error) { for _, integrationAction := range actions { action := fmt.Sprintf("%s:%s", integrationId, integrationAction) - err := w.registerAction(integrationId, action, integration.ActionHandler(integrationAction)) + err := w.registerAction("integration", integrationId, action, integration.ActionHandler(integrationAction)) if err != nil { return nil, fmt.Errorf("could not register integration action %s: %w", action, err) @@ -288,11 +288,18 @@ func (w *Worker) RegisterAction(actionId string, method any) error { return fmt.Errorf("could not parse action id: %w", err) } - return w.registerAction(action.Service, action.Verb, method) + return w.registerAction("", action.Service, action.Verb, method) } -func (w *Worker) registerAction(service, verb string, method any) error { - actionId := fmt.Sprintf("%s:%s", service, verb) +func (w *Worker) registerAction(wf, service, verb string, method any) error { + wf = convertWorkflowNameToAction(wf) + + var actionId string + if wf != "" { + actionId = fmt.Sprintf("%s:%s:%s", wf, service, verb) + } else { + actionId = fmt.Sprintf("%s:%s", service, verb) + } // if the service is "concurrency", then this is a special action if service == "concurrency" { diff --git a/pkg/worker/workflow.go b/pkg/worker/workflow.go index 06d982a16..b9c33f701 100644 --- a/pkg/worker/workflow.go +++ b/pkg/worker/workflow.go @@ -207,7 +207,7 @@ func (j *WorkflowJob) ToWorkflow(svcName string, namespace string) types.Workflo if j.Concurrency != nil { w.Concurrency = &types.WorkflowConcurrency{ - ActionID: "concurrency:" + getFnName(j.Concurrency.fn), // TODO this should also be namespaced + ActionID: w.Name + ":concurrency:" + getFnName(j.Concurrency.fn), } if j.Concurrency.maxRuns != nil { @@ -234,7 +234,7 @@ func (j *WorkflowJob) ToWorkflowJob(svcName string, namespace string) (*types.Wo for i := range j.Steps { - newStep, err := j.Steps[i].ToWorkflowStep(svcName, i, namespace) + newStep, err := j.Steps[i].ToWorkflowStep(j.Name, svcName, i, namespace) if err != nil { return nil, err @@ -254,13 +254,13 @@ func (j *WorkflowJob) ToActionMap(svcName string) map[string]any { res := map[string]any{} for i, step := range j.Steps { - actionId := step.GetActionId(svcName, i) + actionId := step.GetActionId(j.Name, svcName, i) res[actionId] = step.Function } if j.Concurrency != nil { - res["concurrency:"+getFnName(j.Concurrency.fn)] = j.Concurrency.fn + res[j.Name+":concurrency:"+getFnName(j.Concurrency.fn)] = j.Concurrency.fn } if j.OnFailure != nil { @@ -364,7 +364,7 @@ func (w *WorkflowStep) ToActionMap(svcName string) map[string]any { step := *w return map[string]any{ - step.GetActionId(svcName, 0): w.Function, + step.GetActionId(w.Name, svcName, 0): w.Function, } } @@ -380,7 +380,7 @@ type Step struct { APIStep types.WorkflowStep } -func (w *WorkflowStep) ToWorkflowStep(svcName string, index int, namespace string) (*Step, error) { +func (w *WorkflowStep) ToWorkflowStep(wfName, svcName string, index int, namespace string) (*Step, error) { fnType := reflect.TypeOf(w.Function) res := &Step{} @@ -391,7 +391,7 @@ func (w *WorkflowStep) ToWorkflowStep(svcName string, index int, namespace strin Name: res.Id, ID: w.GetStepId(index), Timeout: w.Timeout, - ActionID: w.GetActionId(svcName, index), + ActionID: w.GetActionId(wfName, svcName, index), Parents: []string{}, Retries: w.Retries, DesiredLabels: w.DesiredLabels, @@ -460,10 +460,12 @@ func (w *WorkflowStep) GetStepId(index int) string { return stepId } -func (w *WorkflowStep) GetActionId(svcName string, index int) string { +func (w *WorkflowStep) GetActionId(wfName, svcName string, index int) string { stepId := w.GetStepId(index) - return fmt.Sprintf("%s:%s", svcName, stepId) + wf := convertWorkflowNameToAction(wfName) + + return fmt.Sprintf("%s:%s:%s", wf, svcName, stepId) } func getFnName(fn any) string { @@ -482,3 +484,7 @@ func getFnName(fn any) string { return strings.ReplaceAll(fnName, ".", "-") } + +func convertWorkflowNameToAction(workflowName string) string { + return strings.ReplaceAll(strings.ToLower(workflowName), " ", "-") +}