diff --git a/ee/webhooks/internal/commons/state.go b/ee/webhooks/internal/commons/state.go index 2cc483a2a6..2791756f00 100644 --- a/ee/webhooks/internal/commons/state.go +++ b/ee/webhooks/internal/commons/state.go @@ -1,9 +1,8 @@ package commons import ( - - "time" "github.com/formancehq/stack/libs/go-libs/sync/shared" + "time" ) type State struct { diff --git a/ee/webhooks/internal/components/commons/webhook_runner.go b/ee/webhooks/internal/components/commons/webhook_runner.go index 891cb70f07..5c3ad0ecc2 100644 --- a/ee/webhooks/internal/components/commons/webhook_runner.go +++ b/ee/webhooks/internal/components/commons/webhook_runner.go @@ -82,7 +82,6 @@ func (wr *WebhookRunner) HandleFreshLogs(stopChan chan struct{}) { } func (wr *WebhookRunner) HandleFreshLog(log *commons.Log) { - e, err := commons.Event{}.FromPayload(log.Payload) if err != nil { message := fmt.Sprintf("WebhookRunner:HandleFreshLogs() - LogChannels : %s : Error while Event.FromPayload(log.payload): %x", wr.LogChannels, err) @@ -94,7 +93,7 @@ func (wr *WebhookRunner) HandleFreshLog(log *commons.Log) { case commons.NewHookType: hook, err := wr.Database.GetHook(e.ID) - + if err != nil { message := fmt.Sprintf("WebhookRunner:HandleFreshLogs() - LogChannels : %s : Case NewHookType Error while attempting to reach the database: %x", wr.LogChannels, err) logging.Error(message) @@ -121,7 +120,6 @@ func (wr *WebhookRunner) HandleFreshLog(log *commons.Log) { wr.State.UpdateHookRetry(e.ID, e.Value.(bool)) case commons.NewWaitingAttemptType: - attempt, err := wr.Database.GetAttempt(e.ID) if err != nil { message := fmt.Sprintf("WebhookRunner:HandleFreshLogs() - LogChannels : %s : Error while NewWaitingAttemptType : wr.Database.GetAttempt(e.ID): %x", wr.LogChannels, err) diff --git a/ee/webhooks/internal/components/webhook_collector/collector.go b/ee/webhooks/internal/components/webhook_collector/collector.go index fc97c91a98..f0f5fc3181 100644 --- a/ee/webhooks/internal/components/webhook_collector/collector.go +++ b/ee/webhooks/internal/components/webhook_collector/collector.go @@ -47,7 +47,7 @@ func (c *Collector) Init() { } func (c *Collector) HandleWaitingAttempts() { - + if c.State.WaitingAttempts.Size() == 0 { return } @@ -87,16 +87,16 @@ func (c *Collector) AsyncHandleSharedAttempt(sAttempt *commons.SharedAttempt, wg } statusCode, err := c.HandleRequest(context.Background(), sAttempt, sHook) - + if err != nil { c.State.WaitingAttempts.Add(sAttempt) return } - if(sAttempt.Val.HookEndpoint != sHook.Val.Endpoint){ + if sAttempt.Val.HookEndpoint != sHook.Val.Endpoint { sAttempt.Val.HookEndpoint = sHook.Val.Endpoint - go func(){ + go func() { _, err := c.Database.UpdateAttemptEndpoint(sAttempt.Val.ID, sAttempt.Val.HookEndpoint) if err != nil { message := fmt.Sprintf("Collector:AsyncHandleSharedAttempt:Database.UpdateAttemptEndpoint() : %x", err) @@ -105,12 +105,14 @@ func (c *Collector) AsyncHandleSharedAttempt(sAttempt *commons.SharedAttempt, wg } }() } - + sAttempt.Val.LastHttpStatusCode = statusCode if commons.IsHTTPRequestSuccess(statusCode) { + c.handleSuccess(sAttempt) } else { + c.handleNextRetry(sAttempt) } @@ -145,7 +147,7 @@ func (c *Collector) handleNextRetry(sAttempt *commons.SharedAttempt) { go func() { _, err := c.Database.UpdateAttemptNextTry(sAttempt.Val.ID, sAttempt.Val.NextTry, sAttempt.Val.LastHttpStatusCode) if err != nil { - message := fmt.Sprintf("Collector:handleNextRetry:Database.UpdateAttemptNextTry: %x", err) + message := fmt.Sprintf("Collector:handleNextRetry:Database.UpdateAttemptNextTry: %s", err) logging.Error(message) panic(message) } diff --git a/ee/webhooks/internal/components/webhook_controller/controllers/utils/utils.go b/ee/webhooks/internal/components/webhook_controller/controllers/utils/utils.go index f4c0b75f74..2b0221c535 100644 --- a/ee/webhooks/internal/components/webhook_controller/controllers/utils/utils.go +++ b/ee/webhooks/internal/components/webhook_controller/controllers/utils/utils.go @@ -11,7 +11,6 @@ import ( "net/url" "strconv" "strings" - ) type ErrorType string diff --git a/ee/webhooks/internal/components/webhook_controller/controllers/v1/controllers-hook.go b/ee/webhooks/internal/components/webhook_controller/controllers/v1/controllers-hook.go index d12d7ba1f7..7a7c2e0f7a 100644 --- a/ee/webhooks/internal/components/webhook_controller/controllers/v1/controllers-hook.go +++ b/ee/webhooks/internal/components/webhook_controller/controllers/v1/controllers-hook.go @@ -262,7 +262,7 @@ func V1CreateHookController(database storeInterface.IStoreProvider, hook utils.V newHook, err = controllersCommons.ActivateHook(database, newHook.ID) logging.Infof("Webhook_Controller :: V1CreateHookController :: newHook Status : %s", newHook.Status) - + if err != nil { return utils.InternalErrorResp[utils.V1Hook](err) } diff --git a/ee/webhooks/internal/components/webhook_controller/controllers/v2/controllers-attempts.go b/ee/webhooks/internal/components/webhook_controller/controllers/v2/controllers-attempts.go index d964f6d67c..010074c4f5 100644 --- a/ee/webhooks/internal/components/webhook_controller/controllers/v2/controllers-attempts.go +++ b/ee/webhooks/internal/components/webhook_controller/controllers/v2/controllers-attempts.go @@ -143,8 +143,8 @@ func RegisterV2AttemptControllers(serverHttp serverInterfaces.IHTTPServer, datab func V2GetWaitingAttemptsController(database storeInterface.IStoreProvider, filterCursor string) utils.Response[bunpaginate.Cursor[commons.Attempt]] { hasMore := false - strPrevious := "" - strNext := "" + strPrevious := " " + strNext := " " cursor, err := utils.ReadCursor(filterCursor) @@ -164,6 +164,7 @@ func V2GetWaitingAttemptsController(database storeInterface.IStoreProvider, filt } Cursor := bunpaginate.Cursor[commons.Attempt]{ + PageSize: pageSize, HasMore: hasMore, Previous: strPrevious, Next: strNext, @@ -197,6 +198,7 @@ func V2GetAbortedAttemptsController(database storeInterface.IStoreProvider, filt } Cursor := bunpaginate.Cursor[commons.Attempt]{ + PageSize: pageSize, HasMore: hasMore, Previous: strPrevious, Next: strNext, @@ -265,6 +267,7 @@ func V2RetryWaitingAttemptController(database storeInterface.IStoreProvider, id } func V2AbortWaitingAttemptController(database storeInterface.IStoreProvider, id string) utils.Response[commons.Attempt] { + attempt, err := database.AbortAttempt(id, string(commons.AbortUser), true) if err != nil { diff --git a/ee/webhooks/internal/components/webhook_controller/controllers/v2/controllers-hooks.go b/ee/webhooks/internal/components/webhook_controller/controllers/v2/controllers-hooks.go index 2b50e6ef58..358238d20c 100644 --- a/ee/webhooks/internal/components/webhook_controller/controllers/v2/controllers-hooks.go +++ b/ee/webhooks/internal/components/webhook_controller/controllers/v2/controllers-hooks.go @@ -33,8 +33,8 @@ func RegisterV2HookControllers(server serverInterfaces.IHTTPServer, database sto server.Register(string(r.V2CreateHook.Method), r.V2CreateHook.Url, func(w http.ResponseWriter, r *http.Request) { hookParams := commons.HookBodyParams{} - hookParams.Retry = true - + hookParams.Retry = true + if err := utils.DecodeJSONBody(r, &hookParams); err != nil { sharedapi.BadRequest(w, utils.ErrValidation, err) return diff --git a/ee/webhooks/internal/components/webhook_worker/worker.go b/ee/webhooks/internal/components/webhook_worker/worker.go index 2b8c609c69..d275e7b7ed 100644 --- a/ee/webhooks/internal/components/webhook_worker/worker.go +++ b/ee/webhooks/internal/components/webhook_worker/worker.go @@ -75,10 +75,8 @@ func (w *Worker) HandleMessage(msg *message.Message) error { event = strings.Join([]string{eventApp, event}, ".") } - - triggedSHooks := w.State.ActiveHooksByEvent.Get(event) - + if triggedSHooks == nil || triggedSHooks.Size() == 0 { return nil } @@ -103,6 +101,7 @@ func (w *Worker) HandlerTriggedHookFactory(ctx context.Context, event string, pa defer wg.Done() sAttempt := commons.NewSharedAttempt(sHook.Val.ID, sHook.Val.Name, sHook.Val.Endpoint, event, string(payload)) + logging.Infof("WebhookWorker :: Waiting Attempt ID : %s", sAttempt.Val.ID) hook := sHook.Val attempt := sAttempt.Val statusCode, err := w.HandleRequest(ctx, sAttempt, sHook) @@ -122,14 +121,17 @@ func (w *Worker) HandleResponse(statusCode int, attempt *commons.Attempt, hook * attempt.LastHttpStatusCode = statusCode attempt.NbTry += 1 var err error - - + if commons.IsHTTPRequestSuccess(statusCode) { commons.SetSuccesStatus(attempt) err = w.Database.SaveAttempt(*attempt, false) } - + if hook.Retry && !attempt.IsSuccess() { + err = w.Database.SaveAttempt(*attempt, true) + } + + if !hook.Retry && !attempt.IsSuccess(){ commons.SetAbortNoRetryModeStatus(attempt) err = w.Database.SaveAttempt(*attempt, true) } diff --git a/ee/webhooks/internal/services/storage/postgres/attempt_queries.go b/ee/webhooks/internal/services/storage/postgres/attempt_queries.go index 9bb4c31747..d807e80201 100644 --- a/ee/webhooks/internal/services/storage/postgres/attempt_queries.go +++ b/ee/webhooks/internal/services/storage/postgres/attempt_queries.go @@ -15,7 +15,7 @@ const ( insertAttemptQuery = "INSERT INTO attempts (id, webhook_id, hook_name, hook_endpoint, event, payload, status_code, date_occured, status, date_status, comment, next_retry_after) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) RETURNING *" updateAttemptStatus = "UPDATE attempts SET status = ?, date_status = NOW(), comment = ? WHERE id = ? RETURNING *" updateAttemptNextTry = "UPDATE attempts SET next_retry_after = ?, status_code = ? WHERE id = ? RETURNING *" - updateAttemptEndpoint = "UPDATE attempts SET hook_endpoint = ? WHERE id = ? RETURNING *" + updateAttemptEndpoint = "UPDATE attempts SET hook_endpoint = ? WHERE id = ? RETURNING *" ) func (store PostgresStore) GetAttempt(index string) (commons.Attempt, error) { @@ -105,7 +105,7 @@ func (store PostgresStore) ChangeAttemptStatus(index string, status commons.Atte log, err := commons.LogFromEvent(event) wrapQuery := wrapWithLogQuery(updateAttemptStatus) - _, err = store.db.NewRaw(wrapQuery, + query := store.db.NewRaw(wrapQuery, string(status), comment, index, @@ -113,14 +113,18 @@ func (store PostgresStore) ChangeAttemptStatus(index string, status commons.Atte log.Channel, log.Payload, log.CreatedAt, - ).Exec(context.Background(), &attempt) + ) + + err = query.Scan(context.Background(), &attempt) } else { - _, err = store.db.NewRaw(updateAttemptStatus, + query := store.db.NewRaw(updateAttemptStatus, string(status), comment, index, - ).Exec(context.Background(), &attempt) + ) + + err = query.Scan(context.Background(), &attempt) } if err == sql.ErrNoRows { @@ -131,7 +135,7 @@ func (store PostgresStore) ChangeAttemptStatus(index string, status commons.Atte return attempt, err } -func (store PostgresStore) UpdateAttemptEndpoint(index string, endpoint string) (commons.Attempt, error){ +func (store PostgresStore) UpdateAttemptEndpoint(index string, endpoint string) (commons.Attempt, error) { var attempt commons.Attempt _, err := store.db.NewRaw(updateAttemptEndpoint, endpoint, index).Exec(context.Background(), &attempt) diff --git a/ee/webhooks/openapi.yaml b/ee/webhooks/openapi.yaml index ad02c7dd4e..ace561d685 100644 --- a/ee/webhooks/openapi.yaml +++ b/ee/webhooks/openapi.yaml @@ -633,7 +633,7 @@ paths: /v2/attempts/waiting/{attemptId}/abort: put: summary: Abort one waiting attempt - operationId: retryWaitingAttempt1 + operationId: abortWaitingAttempt description: Abort one waiting attempt tags: - Webhooks diff --git a/ee/webhooks/openapi/v2.yaml b/ee/webhooks/openapi/v2.yaml index 7cae80c30a..2c0da5c586 100644 --- a/ee/webhooks/openapi/v2.yaml +++ b/ee/webhooks/openapi/v2.yaml @@ -420,7 +420,7 @@ paths: /v2/attempts/waiting/{attemptId}/abort: put: summary: Abort one waiting attempt - operationId: retryWaitingAttempt + operationId: abortWaitingAttempt description: Abort one waiting attempt tags: - Webhooks diff --git a/libs/go-libs/sync/shared/sharedarr.go b/libs/go-libs/sync/shared/sharedarr.go index 3f649780eb..6a6a31413e 100644 --- a/libs/go-libs/sync/shared/sharedarr.go +++ b/libs/go-libs/sync/shared/sharedarr.go @@ -2,7 +2,6 @@ package shared import ( "sync" - ) type SharedArr[T any] struct { diff --git a/libs/go-libs/sync/shared/sharedmap.go b/libs/go-libs/sync/shared/sharedmap.go index de72adc1ae..532cc8b738 100644 --- a/libs/go-libs/sync/shared/sharedmap.go +++ b/libs/go-libs/sync/shared/sharedmap.go @@ -1,6 +1,5 @@ package shared - type SharedMap[T any] struct { Shared[map[string]*Shared[T]] } diff --git a/libs/go-libs/sync/shared/sharedmaparr.go b/libs/go-libs/sync/shared/sharedmaparr.go index 3f06c70c4a..114bd68345 100644 --- a/libs/go-libs/sync/shared/sharedmaparr.go +++ b/libs/go-libs/sync/shared/sharedmaparr.go @@ -1,6 +1,5 @@ package shared - type SharedMapArr[T any] struct { Shared[map[string]*SharedArr[T]] } @@ -8,8 +7,7 @@ type SharedMapArr[T any] struct { func (s *SharedMapArr[T]) Add(index string, el *Shared[T]) { s.WLock() defer s.WUnlock() - - + if sharedArr, ok := (*s.Val)[index]; ok { sharedArr.Add(el) } else { diff --git a/releases/sdks/go/.speakeasy/gen.lock b/releases/sdks/go/.speakeasy/gen.lock index a773720b59..d5108b06ef 100755 --- a/releases/sdks/go/.speakeasy/gen.lock +++ b/releases/sdks/go/.speakeasy/gen.lock @@ -1,12 +1,12 @@ lockVersion: 2.0.0 id: 7eac0a45-60a2-40bb-9e85-26bd77ec2a6d management: - docChecksum: b03e791e9f3e760510270d7444f4bce3 + docChecksum: c7b24185fbcd90d84a1020d6305d1524 docVersion: v0.0.0 speakeasyVersion: 1.292.0 generationVersion: 2.332.4 - releaseVersion: 0.0.1 - configChecksum: a4b5c74b8763c6fdc98e6adfde02c782 + releaseVersion: v0.0.0 + configChecksum: 28c1da66ee9378d9623ec2601b659dd5 features: go: additionalDependencies: 0.1.0 @@ -211,6 +211,7 @@ generatedFiles: - /pkg/models/operations/updatewallet.go - /pkg/models/operations/voidhold.go - /pkg/models/operations/walletsgetserverinfo.go + - /pkg/models/operations/abortwaitingattempt.go - /pkg/models/operations/activateconfig.go - /pkg/models/operations/activatehook.go - /pkg/models/operations/changeconfigsecret.go @@ -226,7 +227,6 @@ generatedFiles: - /pkg/models/operations/insertconfig.go - /pkg/models/operations/inserthook.go - /pkg/models/operations/retrywaitingattempt.go - - /pkg/models/operations/retrywaitingattempt1.go - /pkg/models/operations/retrywaitingattempts.go - /pkg/models/operations/testconfig.go - /pkg/models/operations/testhook.go @@ -568,19 +568,19 @@ generatedFiles: - /pkg/models/shared/getwalletsummaryresponse.go - /pkg/models/shared/listbalancesresponse.go - /pkg/models/shared/listwalletsresponse.go + - /pkg/models/shared/v2attemptresponse.go + - /pkg/models/shared/v2attempt.go + - /pkg/models/shared/webhookserrorsenum.go - /pkg/models/shared/configresponse.go - /pkg/models/shared/webhooksconfig.go - - /pkg/models/shared/webhookserrorsenum.go - /pkg/models/shared/v2hookresponse.go - /pkg/models/shared/v2hook.go - /pkg/models/shared/configchangesecret.go - /pkg/models/shared/v2attemptcursorresponse.go - - /pkg/models/shared/v2attempt.go - /pkg/models/shared/configsresponse.go - /pkg/models/shared/v2hookcursorresponse.go - /pkg/models/shared/configuser.go - /pkg/models/shared/v2hookbodyparams.go - - /pkg/models/shared/v2attemptresponse.go - /pkg/models/shared/attemptresponse.go - /pkg/models/shared/attempt.go - /pkg/models/shared/security.go @@ -886,6 +886,8 @@ generatedFiles: - docs/pkg/models/operations/voidholdrequest.md - docs/pkg/models/operations/voidholdresponse.md - docs/pkg/models/operations/walletsgetserverinforesponse.md + - docs/pkg/models/operations/abortwaitingattemptrequest.md + - docs/pkg/models/operations/abortwaitingattemptresponse.md - docs/pkg/models/operations/activateconfigrequest.md - docs/pkg/models/operations/activateconfigresponse.md - docs/pkg/models/operations/activatehookrequest.md @@ -914,8 +916,6 @@ generatedFiles: - docs/pkg/models/operations/inserthookresponse.md - docs/pkg/models/operations/retrywaitingattemptrequest.md - docs/pkg/models/operations/retrywaitingattemptresponse.md - - docs/pkg/models/operations/retrywaitingattempt1request.md - - docs/pkg/models/operations/retrywaitingattempt1response.md - docs/pkg/models/operations/retrywaitingattemptsresponse.md - docs/pkg/models/operations/testconfigrequest.md - docs/pkg/models/operations/testconfigresponse.md @@ -1359,24 +1359,24 @@ generatedFiles: - docs/pkg/models/shared/listbalancesresponse.md - docs/pkg/models/shared/listwalletsresponsecursor.md - docs/pkg/models/shared/listwalletsresponse.md + - docs/pkg/models/shared/v2attemptresponse.md + - docs/pkg/models/shared/v2attemptstatus.md + - docs/pkg/models/shared/v2attempt.md + - docs/pkg/models/shared/webhookserrorsenum.md - docs/pkg/models/shared/configresponse.md - docs/pkg/models/shared/webhooksconfig.md - - docs/pkg/models/shared/webhookserrorsenum.md - docs/pkg/models/shared/v2hookresponse.md - docs/pkg/models/shared/v2hookstatus.md - docs/pkg/models/shared/v2hook.md - docs/pkg/models/shared/configchangesecret.md - docs/pkg/models/shared/v2attemptcursorresponsecursor.md - docs/pkg/models/shared/v2attemptcursorresponse.md - - docs/pkg/models/shared/v2attemptstatus.md - - docs/pkg/models/shared/v2attempt.md - docs/pkg/models/shared/configsresponsecursor.md - docs/pkg/models/shared/configsresponse.md - docs/pkg/models/shared/v2hookcursorresponsecursor.md - docs/pkg/models/shared/v2hookcursorresponse.md - docs/pkg/models/shared/configuser.md - docs/pkg/models/shared/v2hookbodyparams.md - - docs/pkg/models/shared/v2attemptresponse.md - docs/pkg/models/shared/attemptresponse.md - docs/pkg/models/shared/attempt.md - docs/pkg/models/shared/security.md diff --git a/releases/sdks/go/README.md b/releases/sdks/go/README.md index bc67609ac9..d5ee95615c 100644 --- a/releases/sdks/go/README.md +++ b/releases/sdks/go/README.md @@ -247,6 +247,7 @@ func main() { ### [Webhooks](docs/sdks/webhooks/README.md) +* [AbortWaitingAttempt](docs/sdks/webhooks/README.md#abortwaitingattempt) - Abort one waiting attempt * [ActivateConfig](docs/sdks/webhooks/README.md#activateconfig) - Activate one config * [ActivateHook](docs/sdks/webhooks/README.md#activatehook) - Activate one Hook * [ChangeConfigSecret](docs/sdks/webhooks/README.md#changeconfigsecret) - Change the signing secret of a config @@ -262,7 +263,6 @@ func main() { * [InsertConfig](docs/sdks/webhooks/README.md#insertconfig) - Insert a new config * [InsertHook](docs/sdks/webhooks/README.md#inserthook) - Insert new Hook * [RetryWaitingAttempt](docs/sdks/webhooks/README.md#retrywaitingattempt) - Retry one waiting Attempt -* [RetryWaitingAttempt1](docs/sdks/webhooks/README.md#retrywaitingattempt1) - Abort one waiting attempt * [RetryWaitingAttempts](docs/sdks/webhooks/README.md#retrywaitingattempts) - Retry all the waiting attempts * [TestConfig](docs/sdks/webhooks/README.md#testconfig) - Test one config * [TestHook](docs/sdks/webhooks/README.md#testhook) - Test one Hook diff --git a/releases/sdks/go/docs/pkg/models/operations/retrywaitingattempt1request.md b/releases/sdks/go/docs/pkg/models/operations/abortwaitingattemptrequest.md similarity index 95% rename from releases/sdks/go/docs/pkg/models/operations/retrywaitingattempt1request.md rename to releases/sdks/go/docs/pkg/models/operations/abortwaitingattemptrequest.md index b80a7129e3..118769b49b 100644 --- a/releases/sdks/go/docs/pkg/models/operations/retrywaitingattempt1request.md +++ b/releases/sdks/go/docs/pkg/models/operations/abortwaitingattemptrequest.md @@ -1,4 +1,4 @@ -# RetryWaitingAttempt1Request +# AbortWaitingAttemptRequest ## Fields diff --git a/releases/sdks/go/docs/pkg/models/operations/retrywaitingattempt1response.md b/releases/sdks/go/docs/pkg/models/operations/abortwaitingattemptresponse.md similarity index 98% rename from releases/sdks/go/docs/pkg/models/operations/retrywaitingattempt1response.md rename to releases/sdks/go/docs/pkg/models/operations/abortwaitingattemptresponse.md index dd7d78d6ae..de0e125347 100644 --- a/releases/sdks/go/docs/pkg/models/operations/retrywaitingattempt1response.md +++ b/releases/sdks/go/docs/pkg/models/operations/abortwaitingattemptresponse.md @@ -1,4 +1,4 @@ -# RetryWaitingAttempt1Response +# AbortWaitingAttemptResponse ## Fields diff --git a/releases/sdks/go/docs/sdks/webhooks/README.md b/releases/sdks/go/docs/sdks/webhooks/README.md index ecd92b68cb..0d42af4920 100644 --- a/releases/sdks/go/docs/sdks/webhooks/README.md +++ b/releases/sdks/go/docs/sdks/webhooks/README.md @@ -3,6 +3,7 @@ ### Available Operations +* [AbortWaitingAttempt](#abortwaitingattempt) - Abort one waiting attempt * [ActivateConfig](#activateconfig) - Activate one config * [ActivateHook](#activatehook) - Activate one Hook * [ChangeConfigSecret](#changeconfigsecret) - Change the signing secret of a config @@ -18,7 +19,6 @@ * [InsertConfig](#insertconfig) - Insert a new config * [InsertHook](#inserthook) - Insert new Hook * [RetryWaitingAttempt](#retrywaitingattempt) - Retry one waiting Attempt -* [RetryWaitingAttempt1](#retrywaitingattempt1) - Abort one waiting attempt * [RetryWaitingAttempts](#retrywaitingattempts) - Retry all the waiting attempts * [TestConfig](#testconfig) - Test one config * [TestHook](#testhook) - Test one Hook @@ -26,6 +26,61 @@ * [UpdateRetryHook](#updateretryhook) - Change the retry attribute of one Hook * [UpdateSecretHook](#updatesecrethook) - Change the secret of one Hook +## AbortWaitingAttempt + +Abort one waiting attempt + +### Example Usage + +```go +package main + +import( + "github.com/formancehq/formance-sdk-go/v2/pkg/models/shared" + "github.com/formancehq/formance-sdk-go/v2" + "github.com/formancehq/formance-sdk-go/v2/pkg/models/operations" + "context" + "log" +) + +func main() { + s := v2.New( + v2.WithSecurity(shared.Security{ + Authorization: "", + }), + ) + + request := operations.AbortWaitingAttemptRequest{ + AttemptID: "4997257d-dfb6-445b-929c-cbe2ab182818", + } + + ctx := context.Background() + res, err := s.Webhooks.AbortWaitingAttempt(ctx, request) + if err != nil { + log.Fatal(err) + } + if res.V2AttemptResponse != nil { + // handle response + } +} +``` + +### Parameters + +| Parameter | Type | Required | Description | +| -------------------------------------------------------------------------------------------------- | -------------------------------------------------------------------------------------------------- | -------------------------------------------------------------------------------------------------- | -------------------------------------------------------------------------------------------------- | +| `ctx` | [context.Context](https://pkg.go.dev/context#Context) | :heavy_check_mark: | The context to use for the request. | +| `request` | [operations.AbortWaitingAttemptRequest](../../pkg/models/operations/abortwaitingattemptrequest.md) | :heavy_check_mark: | The request object to use for the request. | + + +### Response + +**[*operations.AbortWaitingAttemptResponse](../../pkg/models/operations/abortwaitingattemptresponse.md), error** +| Error Object | Status Code | Content Type | +| ------------------------------- | ------------------------------- | ------------------------------- | +| sdkerrors.WebhooksErrorResponse | default | application/json | +| sdkerrors.SDKError | 4xx-5xx | */* | + ## ActivateConfig Activate a webhooks config by ID, to start receiving webhooks to its endpoint. @@ -871,61 +926,6 @@ func main() { | sdkerrors.WebhooksErrorResponse | default | application/json | | sdkerrors.SDKError | 4xx-5xx | */* | -## RetryWaitingAttempt1 - -Abort one waiting attempt - -### Example Usage - -```go -package main - -import( - "github.com/formancehq/formance-sdk-go/v2/pkg/models/shared" - "github.com/formancehq/formance-sdk-go/v2" - "github.com/formancehq/formance-sdk-go/v2/pkg/models/operations" - "context" - "log" -) - -func main() { - s := v2.New( - v2.WithSecurity(shared.Security{ - Authorization: "", - }), - ) - - request := operations.RetryWaitingAttempt1Request{ - AttemptID: "4997257d-dfb6-445b-929c-cbe2ab182818", - } - - ctx := context.Background() - res, err := s.Webhooks.RetryWaitingAttempt1(ctx, request) - if err != nil { - log.Fatal(err) - } - if res.V2AttemptResponse != nil { - // handle response - } -} -``` - -### Parameters - -| Parameter | Type | Required | Description | -| ---------------------------------------------------------------------------------------------------- | ---------------------------------------------------------------------------------------------------- | ---------------------------------------------------------------------------------------------------- | ---------------------------------------------------------------------------------------------------- | -| `ctx` | [context.Context](https://pkg.go.dev/context#Context) | :heavy_check_mark: | The context to use for the request. | -| `request` | [operations.RetryWaitingAttempt1Request](../../pkg/models/operations/retrywaitingattempt1request.md) | :heavy_check_mark: | The request object to use for the request. | - - -### Response - -**[*operations.RetryWaitingAttempt1Response](../../pkg/models/operations/retrywaitingattempt1response.md), error** -| Error Object | Status Code | Content Type | -| ------------------------------- | ------------------------------- | ------------------------------- | -| sdkerrors.WebhooksErrorResponse | default | application/json | -| sdkerrors.SDKError | 4xx-5xx | */* | - ## RetryWaitingAttempts Flush all waiting attempts diff --git a/releases/sdks/go/formance.go b/releases/sdks/go/formance.go index a2c3553434..8f31e0851d 100644 --- a/releases/sdks/go/formance.go +++ b/releases/sdks/go/formance.go @@ -165,9 +165,9 @@ func New(opts ...SDKOption) *Formance { sdkConfiguration: sdkConfiguration{ Language: "go", OpenAPIDocVersion: "v0.0.0", - SDKVersion: "0.0.1", + SDKVersion: "v0.0.0", GenVersion: "2.332.4", - UserAgent: "speakeasy-sdk/go 0.0.1 2.332.4 v0.0.0 github.com/formancehq/formance-sdk-go/v2", + UserAgent: "speakeasy-sdk/go v0.0.0 2.332.4 v0.0.0 github.com/formancehq/formance-sdk-go/v2", Hooks: hooks.New(), }, } diff --git a/releases/sdks/go/gen.yaml b/releases/sdks/go/gen.yaml index 2fac8cb187..1d599a9502 100755 --- a/releases/sdks/go/gen.yaml +++ b/releases/sdks/go/gen.yaml @@ -11,7 +11,7 @@ generation: oAuth2ClientCredentialsEnabled: false telemetryEnabled: false go: - version: 0.0.1 + version: v0.0.0 additionalDependencies: {} author: Formance clientServerStatusCodesAsErrors: true diff --git a/releases/sdks/go/pkg/models/operations/retrywaitingattempt1.go b/releases/sdks/go/pkg/models/operations/abortwaitingattempt.go similarity index 65% rename from releases/sdks/go/pkg/models/operations/retrywaitingattempt1.go rename to releases/sdks/go/pkg/models/operations/abortwaitingattempt.go index e047eac610..9327b8cd55 100644 --- a/releases/sdks/go/pkg/models/operations/retrywaitingattempt1.go +++ b/releases/sdks/go/pkg/models/operations/abortwaitingattempt.go @@ -7,19 +7,19 @@ import ( "net/http" ) -type RetryWaitingAttempt1Request struct { +type AbortWaitingAttemptRequest struct { // Attempt ID AttemptID string `pathParam:"style=simple,explode=false,name=attemptId"` } -func (o *RetryWaitingAttempt1Request) GetAttemptID() string { +func (o *AbortWaitingAttemptRequest) GetAttemptID() string { if o == nil { return "" } return o.AttemptID } -type RetryWaitingAttempt1Response struct { +type AbortWaitingAttemptResponse struct { // HTTP response content type for this operation ContentType string // HTTP response status code for this operation @@ -30,28 +30,28 @@ type RetryWaitingAttempt1Response struct { V2AttemptResponse *shared.V2AttemptResponse } -func (o *RetryWaitingAttempt1Response) GetContentType() string { +func (o *AbortWaitingAttemptResponse) GetContentType() string { if o == nil { return "" } return o.ContentType } -func (o *RetryWaitingAttempt1Response) GetStatusCode() int { +func (o *AbortWaitingAttemptResponse) GetStatusCode() int { if o == nil { return 0 } return o.StatusCode } -func (o *RetryWaitingAttempt1Response) GetRawResponse() *http.Response { +func (o *AbortWaitingAttemptResponse) GetRawResponse() *http.Response { if o == nil { return nil } return o.RawResponse } -func (o *RetryWaitingAttempt1Response) GetV2AttemptResponse() *shared.V2AttemptResponse { +func (o *AbortWaitingAttemptResponse) GetV2AttemptResponse() *shared.V2AttemptResponse { if o == nil { return nil } diff --git a/releases/sdks/go/webhooks.go b/releases/sdks/go/webhooks.go index 80351b644a..b89d92ece3 100644 --- a/releases/sdks/go/webhooks.go +++ b/releases/sdks/go/webhooks.go @@ -26,6 +26,105 @@ func newWebhooks(sdkConfig sdkConfiguration) *Webhooks { } } +// AbortWaitingAttempt - Abort one waiting attempt +// Abort one waiting attempt +func (s *Webhooks) AbortWaitingAttempt(ctx context.Context, request operations.AbortWaitingAttemptRequest) (*operations.AbortWaitingAttemptResponse, error) { + hookCtx := hooks.HookContext{ + Context: ctx, + OperationID: "abortWaitingAttempt", + OAuth2Scopes: []string{}, + SecuritySource: s.sdkConfiguration.Security, + } + + baseURL := utils.ReplaceParameters(s.sdkConfiguration.GetServerDetails()) + opURL, err := utils.GenerateURL(ctx, baseURL, "/api/webhooks/v2/attempts/waiting/{attemptId}/abort", request, nil) + if err != nil { + return nil, fmt.Errorf("error generating URL: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, "PUT", opURL, nil) + if err != nil { + return nil, fmt.Errorf("error creating request: %w", err) + } + req.Header.Set("Accept", "application/json") + req.Header.Set("User-Agent", s.sdkConfiguration.UserAgent) + + if err := utils.PopulateSecurity(ctx, req, s.sdkConfiguration.Security); err != nil { + return nil, err + } + + req, err = s.sdkConfiguration.Hooks.BeforeRequest(hooks.BeforeRequestContext{HookContext: hookCtx}, req) + if err != nil { + return nil, err + } + + httpRes, err := s.sdkConfiguration.Client.Do(req) + if err != nil || httpRes == nil { + if err != nil { + err = fmt.Errorf("error sending request: %w", err) + } else { + err = fmt.Errorf("error sending request: no response") + } + + _, err = s.sdkConfiguration.Hooks.AfterError(hooks.AfterErrorContext{HookContext: hookCtx}, nil, err) + return nil, err + } else if utils.MatchStatusCodes([]string{"default"}, httpRes.StatusCode) { + _httpRes, err := s.sdkConfiguration.Hooks.AfterError(hooks.AfterErrorContext{HookContext: hookCtx}, httpRes, nil) + if err != nil { + return nil, err + } else if _httpRes != nil { + httpRes = _httpRes + } + } else { + httpRes, err = s.sdkConfiguration.Hooks.AfterSuccess(hooks.AfterSuccessContext{HookContext: hookCtx}, httpRes) + if err != nil { + return nil, err + } + } + + res := &operations.AbortWaitingAttemptResponse{ + StatusCode: httpRes.StatusCode, + ContentType: httpRes.Header.Get("Content-Type"), + RawResponse: httpRes, + } + + rawBody, err := io.ReadAll(httpRes.Body) + if err != nil { + return nil, fmt.Errorf("error reading response body: %w", err) + } + httpRes.Body.Close() + httpRes.Body = io.NopCloser(bytes.NewBuffer(rawBody)) + + switch { + case httpRes.StatusCode == 200: + switch { + case utils.MatchContentType(httpRes.Header.Get("Content-Type"), `application/json`): + var out shared.V2AttemptResponse + if err := utils.UnmarshalJsonFromResponseBody(bytes.NewBuffer(rawBody), &out, ""); err != nil { + return nil, err + } + + res.V2AttemptResponse = &out + default: + return nil, sdkerrors.NewSDKError(fmt.Sprintf("unknown content-type received: %s", httpRes.Header.Get("Content-Type")), httpRes.StatusCode, string(rawBody), httpRes) + } + default: + switch { + case utils.MatchContentType(httpRes.Header.Get("Content-Type"), `application/json`): + var out sdkerrors.WebhooksErrorResponse + if err := utils.UnmarshalJsonFromResponseBody(bytes.NewBuffer(rawBody), &out, ""); err != nil { + return nil, err + } + + return nil, &out + default: + return nil, sdkerrors.NewSDKError(fmt.Sprintf("unknown content-type received: %s", httpRes.Header.Get("Content-Type")), httpRes.StatusCode, string(rawBody), httpRes) + } + } + + return res, nil +} + // ActivateConfig - Activate one config // Activate a webhooks config by ID, to start receiving webhooks to its endpoint. func (s *Webhooks) ActivateConfig(ctx context.Context, request operations.ActivateConfigRequest) (*operations.ActivateConfigResponse, error) { @@ -1534,105 +1633,6 @@ func (s *Webhooks) RetryWaitingAttempt(ctx context.Context, request operations.R return res, nil } -// RetryWaitingAttempt1 - Abort one waiting attempt -// Abort one waiting attempt -func (s *Webhooks) RetryWaitingAttempt1(ctx context.Context, request operations.RetryWaitingAttempt1Request) (*operations.RetryWaitingAttempt1Response, error) { - hookCtx := hooks.HookContext{ - Context: ctx, - OperationID: "retryWaitingAttempt1", - OAuth2Scopes: []string{}, - SecuritySource: s.sdkConfiguration.Security, - } - - baseURL := utils.ReplaceParameters(s.sdkConfiguration.GetServerDetails()) - opURL, err := utils.GenerateURL(ctx, baseURL, "/api/webhooks/v2/attempts/waiting/{attemptId}/abort", request, nil) - if err != nil { - return nil, fmt.Errorf("error generating URL: %w", err) - } - - req, err := http.NewRequestWithContext(ctx, "PUT", opURL, nil) - if err != nil { - return nil, fmt.Errorf("error creating request: %w", err) - } - req.Header.Set("Accept", "application/json") - req.Header.Set("User-Agent", s.sdkConfiguration.UserAgent) - - if err := utils.PopulateSecurity(ctx, req, s.sdkConfiguration.Security); err != nil { - return nil, err - } - - req, err = s.sdkConfiguration.Hooks.BeforeRequest(hooks.BeforeRequestContext{HookContext: hookCtx}, req) - if err != nil { - return nil, err - } - - httpRes, err := s.sdkConfiguration.Client.Do(req) - if err != nil || httpRes == nil { - if err != nil { - err = fmt.Errorf("error sending request: %w", err) - } else { - err = fmt.Errorf("error sending request: no response") - } - - _, err = s.sdkConfiguration.Hooks.AfterError(hooks.AfterErrorContext{HookContext: hookCtx}, nil, err) - return nil, err - } else if utils.MatchStatusCodes([]string{"default"}, httpRes.StatusCode) { - _httpRes, err := s.sdkConfiguration.Hooks.AfterError(hooks.AfterErrorContext{HookContext: hookCtx}, httpRes, nil) - if err != nil { - return nil, err - } else if _httpRes != nil { - httpRes = _httpRes - } - } else { - httpRes, err = s.sdkConfiguration.Hooks.AfterSuccess(hooks.AfterSuccessContext{HookContext: hookCtx}, httpRes) - if err != nil { - return nil, err - } - } - - res := &operations.RetryWaitingAttempt1Response{ - StatusCode: httpRes.StatusCode, - ContentType: httpRes.Header.Get("Content-Type"), - RawResponse: httpRes, - } - - rawBody, err := io.ReadAll(httpRes.Body) - if err != nil { - return nil, fmt.Errorf("error reading response body: %w", err) - } - httpRes.Body.Close() - httpRes.Body = io.NopCloser(bytes.NewBuffer(rawBody)) - - switch { - case httpRes.StatusCode == 200: - switch { - case utils.MatchContentType(httpRes.Header.Get("Content-Type"), `application/json`): - var out shared.V2AttemptResponse - if err := utils.UnmarshalJsonFromResponseBody(bytes.NewBuffer(rawBody), &out, ""); err != nil { - return nil, err - } - - res.V2AttemptResponse = &out - default: - return nil, sdkerrors.NewSDKError(fmt.Sprintf("unknown content-type received: %s", httpRes.Header.Get("Content-Type")), httpRes.StatusCode, string(rawBody), httpRes) - } - default: - switch { - case utils.MatchContentType(httpRes.Header.Get("Content-Type"), `application/json`): - var out sdkerrors.WebhooksErrorResponse - if err := utils.UnmarshalJsonFromResponseBody(bytes.NewBuffer(rawBody), &out, ""); err != nil { - return nil, err - } - - return nil, &out - default: - return nil, sdkerrors.NewSDKError(fmt.Sprintf("unknown content-type received: %s", httpRes.Header.Get("Content-Type")), httpRes.StatusCode, string(rawBody), httpRes) - } - } - - return res, nil -} - // RetryWaitingAttempts - Retry all the waiting attempts // Flush all waiting attempts func (s *Webhooks) RetryWaitingAttempts(ctx context.Context) (*operations.RetryWaitingAttemptsResponse, error) { diff --git a/tests/integration/suite/webhooks-v2-waiting-attempt-get-abort-flush.go b/tests/integration/suite/webhooks-v2-waiting-attempt-get-abort-flush.go new file mode 100644 index 0000000000..0cc8f92c67 --- /dev/null +++ b/tests/integration/suite/webhooks-v2-waiting-attempt-get-abort-flush.go @@ -0,0 +1,191 @@ +package suite + +import ( + "math/big" + "net/http" + "net/http/httptest" + "time" + + "github.com/formancehq/stack/tests/integration/internal/modules" + + "github.com/formancehq/formance-sdk-go/v2/pkg/models/operations" + "github.com/formancehq/formance-sdk-go/v2/pkg/models/shared" + . "github.com/formancehq/stack/tests/integration/internal" + webhooks "github.com/formancehq/webhooks/pkg/utils" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = WithModules([]*Module{modules.Ledger, modules.Webhooks}, func() { + + + Describe("Try to manage the waiting attempts ", Ordered, func(){ + + var ( + httpBadServer *httptest.Server + httpGoodServer * httptest.Server + called chan struct{} + secret = webhooks.NewSecret() + hook1 shared.V2Hook + waitinAttempt shared.V2Attempt + + ) + + BeforeAll(func(){ + // CREATE LEDGER + createLedgerResponse, err := Client().Ledger.V2CreateLedger(TestContext(), operations.V2CreateLedgerRequest{ + Ledger: "default", + }) + Expect(err).To(BeNil()) + Expect(createLedgerResponse.StatusCode).To(Equal(http.StatusNoContent)) + + // CREATE FAKE WEB SERVER FOR ENDPOINT + httpBadServer = httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // ALWAYS SEND BAD RESPONSE + w.WriteHeader(http.StatusUnauthorized) + w.Write([]byte("401 Unauthorized")) + + })) + DeferCleanup(func() { + httpBadServer.Close() + }) + + called = make(chan struct{}) + httpGoodServer = httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // ALWAYS SEND BAD RESPONSE + w.WriteHeader(http.StatusOK) + w.Write([]byte("Success")) + defer close(called) + })) + DeferCleanup(func() { + httpGoodServer.Close() + }) + + // CREATE HOOK + response, err := Client().Webhooks.InsertHook( + TestContext(), + shared.V2HookBodyParams{ + Endpoint: httpBadServer.URL, + Secret: &secret, + Events: []string{ + "ledger.committed_transactions", + }, + }, + ) + Expect(err).ToNot(HaveOccurred()) + Expect(response.StatusCode).To(Equal(http.StatusOK)) + hook1 = response.V2HookResponse.Data + + // ACTIVATE HOOK + _, err = Client().Webhooks.ActivateHook( + TestContext(), + operations.ActivateHookRequest{ + HookID: hook1.ID, + }, + ) + Expect(err).ToNot(HaveOccurred()) + + // NEED THIS TO LET CACHES REFRESH INSIDE WEBHOOKS_WORKER & WEBHOOKS_COLLECTOR + time.Sleep(1*time.Second) + + + // CREATE A TRANSACTION INSIDE THE LEDGER + resp2, err := Client().Ledger.V2CreateTransaction( + TestContext(), + operations.V2CreateTransactionRequest{ + V2PostTransaction: shared.V2PostTransaction{ + Metadata: map[string]string{}, + Postings: []shared.V2Posting{ + { + Amount: big.NewInt(100), + Asset: "USD", + Source: "world", + Destination: "alice", + }, + }, + }, + Ledger: "default", + }, + ) + Expect(err).ToNot(HaveOccurred()) + Expect(resp2.StatusCode).To(Equal(http.StatusOK)) + + // RIGHT KNOW, A waiting attempt should be handle by the Collector because + // Worker didn't successfully reach the endpoint (HttpBadServer) + }) + + It("should have a waiting attempt", func() { + + time.Sleep(1*time.Second) + + response, err := Client().Webhooks.GetWaitingAttempts( + TestContext(), + operations.GetWaitingAttemptsRequest{}, + ) + Expect(err).ToNot(HaveOccurred()) + Expect(response.V2AttemptCursorResponse.Cursor.HasMore).To(BeFalse()) + Expect(response.V2AttemptCursorResponse.Cursor.Data).To(HaveLen(1)) + waitinAttempt = response.V2AttemptCursorResponse.Cursor.Data[0] + time.Sleep(1*time.Second) + + // Abort the waiting attempt + resp, err := Client().Webhooks.AbortWaitingAttempt( + TestContext(), + operations.AbortWaitingAttemptRequest{ + AttemptID: waitinAttempt.ID, + }, + ) + Expect(err).ToNot(HaveOccurred()) + Expect(resp.V2AttemptResponse.Data.Status).To(Equal(shared.V2AttemptStatusAbort)) + // Check if no Waiting Attempts anymore. + resp2, err := Client().Webhooks.GetWaitingAttempts( + TestContext(), + operations.GetWaitingAttemptsRequest{}, + ) + Expect(err).ToNot(HaveOccurred()) + Expect(resp2.V2AttemptCursorResponse.Cursor.HasMore).To(BeFalse()) + Expect(resp2.V2AttemptCursorResponse.Cursor.Data).To(HaveLen(0)) + + + // But We should have one in AbortedAttempts + + resp4, err := Client().Webhooks.GetAbortedAttempts( + TestContext(), + operations.GetAbortedAttemptsRequest{}, + ) + + Expect(err).ToNot(HaveOccurred()) + Expect(resp4.V2AttemptCursorResponse.Cursor.HasMore).To(BeFalse()) + Expect(resp4.V2AttemptCursorResponse.Cursor.Data).To(HaveLen(1)) + + + + // Change the endpoint of the Hook + + _ , err = Client().Webhooks.UpdateEndpointHook( + TestContext(), + operations.UpdateEndpointHookRequest{ + HookID: hook1.ID, + RequestBody: operations.UpdateEndpointHookRequestBody{ + Endpoint : &httpGoodServer.URL, + }, + }, + ) + + Expect(err).ToNot(HaveOccurred()) + + //Wait for Cache refresh... + time.Sleep(2*time.Second) + + // Chan should be still open because no Waiting Attempt in Cache + Eventually(ChanClosed(called)).Should(BeFalse()) + }) + + + }) + + + }) +