Skip to content

Commit

Permalink
Add Attempts get abort Test integrations
Browse files Browse the repository at this point in the history
  • Loading branch information
agourdel committed Jul 6, 2024
1 parent 1d9999a commit c0b36b8
Show file tree
Hide file tree
Showing 24 changed files with 411 additions and 217 deletions.
3 changes: 1 addition & 2 deletions ee/webhooks/internal/commons/state.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package commons

import (

"time"
"github.com/formancehq/stack/libs/go-libs/sync/shared"
"time"
)

type State struct {
Expand Down
4 changes: 1 addition & 3 deletions ee/webhooks/internal/components/commons/webhook_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ func (wr *WebhookRunner) HandleFreshLogs(stopChan chan struct{}) {
}

func (wr *WebhookRunner) HandleFreshLog(log *commons.Log) {

e, err := commons.Event{}.FromPayload(log.Payload)
if err != nil {
message := fmt.Sprintf("WebhookRunner:HandleFreshLogs() - LogChannels : %s : Error while Event.FromPayload(log.payload): %x", wr.LogChannels, err)
Expand All @@ -94,7 +93,7 @@ func (wr *WebhookRunner) HandleFreshLog(log *commons.Log) {

case commons.NewHookType:
hook, err := wr.Database.GetHook(e.ID)

if err != nil {
message := fmt.Sprintf("WebhookRunner:HandleFreshLogs() - LogChannels : %s : Case NewHookType Error while attempting to reach the database: %x", wr.LogChannels, err)
logging.Error(message)
Expand All @@ -121,7 +120,6 @@ func (wr *WebhookRunner) HandleFreshLog(log *commons.Log) {
wr.State.UpdateHookRetry(e.ID, e.Value.(bool))

case commons.NewWaitingAttemptType:

attempt, err := wr.Database.GetAttempt(e.ID)
if err != nil {
message := fmt.Sprintf("WebhookRunner:HandleFreshLogs() - LogChannels : %s : Error while NewWaitingAttemptType : wr.Database.GetAttempt(e.ID): %x", wr.LogChannels, err)
Expand Down
14 changes: 8 additions & 6 deletions ee/webhooks/internal/components/webhook_collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (c *Collector) Init() {
}

func (c *Collector) HandleWaitingAttempts() {

if c.State.WaitingAttempts.Size() == 0 {
return
}
Expand Down Expand Up @@ -87,16 +87,16 @@ func (c *Collector) AsyncHandleSharedAttempt(sAttempt *commons.SharedAttempt, wg
}

statusCode, err := c.HandleRequest(context.Background(), sAttempt, sHook)

if err != nil {

c.State.WaitingAttempts.Add(sAttempt)
return
}

if(sAttempt.Val.HookEndpoint != sHook.Val.Endpoint){
if sAttempt.Val.HookEndpoint != sHook.Val.Endpoint {
sAttempt.Val.HookEndpoint = sHook.Val.Endpoint
go func(){
go func() {
_, err := c.Database.UpdateAttemptEndpoint(sAttempt.Val.ID, sAttempt.Val.HookEndpoint)
if err != nil {
message := fmt.Sprintf("Collector:AsyncHandleSharedAttempt:Database.UpdateAttemptEndpoint() : %x", err)
Expand All @@ -105,12 +105,14 @@ func (c *Collector) AsyncHandleSharedAttempt(sAttempt *commons.SharedAttempt, wg
}
}()
}

sAttempt.Val.LastHttpStatusCode = statusCode

if commons.IsHTTPRequestSuccess(statusCode) {

c.handleSuccess(sAttempt)
} else {

c.handleNextRetry(sAttempt)
}

Expand Down Expand Up @@ -145,7 +147,7 @@ func (c *Collector) handleNextRetry(sAttempt *commons.SharedAttempt) {
go func() {
_, err := c.Database.UpdateAttemptNextTry(sAttempt.Val.ID, sAttempt.Val.NextTry, sAttempt.Val.LastHttpStatusCode)
if err != nil {
message := fmt.Sprintf("Collector:handleNextRetry:Database.UpdateAttemptNextTry: %x", err)
message := fmt.Sprintf("Collector:handleNextRetry:Database.UpdateAttemptNextTry: %s", err)
logging.Error(message)
panic(message)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"net/url"
"strconv"
"strings"

)

type ErrorType string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func V1CreateHookController(database storeInterface.IStoreProvider, hook utils.V
newHook, err = controllersCommons.ActivateHook(database, newHook.ID)

logging.Infof("Webhook_Controller :: V1CreateHookController :: newHook Status : %s", newHook.Status)

if err != nil {
return utils.InternalErrorResp[utils.V1Hook](err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ func RegisterV2AttemptControllers(serverHttp serverInterfaces.IHTTPServer, datab
func V2GetWaitingAttemptsController(database storeInterface.IStoreProvider, filterCursor string) utils.Response[bunpaginate.Cursor[commons.Attempt]] {
hasMore := false

strPrevious := ""
strNext := ""
strPrevious := " "
strNext := " "

cursor, err := utils.ReadCursor(filterCursor)

Expand All @@ -164,6 +164,7 @@ func V2GetWaitingAttemptsController(database storeInterface.IStoreProvider, filt
}

Cursor := bunpaginate.Cursor[commons.Attempt]{
PageSize: pageSize,
HasMore: hasMore,
Previous: strPrevious,
Next: strNext,
Expand Down Expand Up @@ -197,6 +198,7 @@ func V2GetAbortedAttemptsController(database storeInterface.IStoreProvider, filt
}

Cursor := bunpaginate.Cursor[commons.Attempt]{
PageSize: pageSize,
HasMore: hasMore,
Previous: strPrevious,
Next: strNext,
Expand Down Expand Up @@ -265,6 +267,7 @@ func V2RetryWaitingAttemptController(database storeInterface.IStoreProvider, id
}

func V2AbortWaitingAttemptController(database storeInterface.IStoreProvider, id string) utils.Response[commons.Attempt] {

attempt, err := database.AbortAttempt(id, string(commons.AbortUser), true)

if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ func RegisterV2HookControllers(server serverInterfaces.IHTTPServer, database sto
server.Register(string(r.V2CreateHook.Method), r.V2CreateHook.Url, func(w http.ResponseWriter, r *http.Request) {

hookParams := commons.HookBodyParams{}
hookParams.Retry = true
hookParams.Retry = true

if err := utils.DecodeJSONBody(r, &hookParams); err != nil {
sharedapi.BadRequest(w, utils.ErrValidation, err)
return
Expand Down
14 changes: 8 additions & 6 deletions ee/webhooks/internal/components/webhook_worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,8 @@ func (w *Worker) HandleMessage(msg *message.Message) error {
event = strings.Join([]string{eventApp, event}, ".")
}



triggedSHooks := w.State.ActiveHooksByEvent.Get(event)

if triggedSHooks == nil || triggedSHooks.Size() == 0 {
return nil
}
Expand All @@ -103,6 +101,7 @@ func (w *Worker) HandlerTriggedHookFactory(ctx context.Context, event string, pa
defer wg.Done()

sAttempt := commons.NewSharedAttempt(sHook.Val.ID, sHook.Val.Name, sHook.Val.Endpoint, event, string(payload))
logging.Infof("WebhookWorker :: Waiting Attempt ID : %s", sAttempt.Val.ID)
hook := sHook.Val
attempt := sAttempt.Val
statusCode, err := w.HandleRequest(ctx, sAttempt, sHook)
Expand All @@ -122,14 +121,17 @@ func (w *Worker) HandleResponse(statusCode int, attempt *commons.Attempt, hook *
attempt.LastHttpStatusCode = statusCode
attempt.NbTry += 1
var err error



if commons.IsHTTPRequestSuccess(statusCode) {
commons.SetSuccesStatus(attempt)
err = w.Database.SaveAttempt(*attempt, false)
}

if hook.Retry && !attempt.IsSuccess() {
err = w.Database.SaveAttempt(*attempt, true)
}

if !hook.Retry && !attempt.IsSuccess(){
commons.SetAbortNoRetryModeStatus(attempt)
err = w.Database.SaveAttempt(*attempt, true)
}
Expand Down
16 changes: 10 additions & 6 deletions ee/webhooks/internal/services/storage/postgres/attempt_queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const (
insertAttemptQuery = "INSERT INTO attempts (id, webhook_id, hook_name, hook_endpoint, event, payload, status_code, date_occured, status, date_status, comment, next_retry_after) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) RETURNING *"
updateAttemptStatus = "UPDATE attempts SET status = ?, date_status = NOW(), comment = ? WHERE id = ? RETURNING *"
updateAttemptNextTry = "UPDATE attempts SET next_retry_after = ?, status_code = ? WHERE id = ? RETURNING *"
updateAttemptEndpoint = "UPDATE attempts SET hook_endpoint = ? WHERE id = ? RETURNING *"
updateAttemptEndpoint = "UPDATE attempts SET hook_endpoint = ? WHERE id = ? RETURNING *"
)

func (store PostgresStore) GetAttempt(index string) (commons.Attempt, error) {
Expand Down Expand Up @@ -105,22 +105,26 @@ func (store PostgresStore) ChangeAttemptStatus(index string, status commons.Atte
log, err := commons.LogFromEvent(event)

wrapQuery := wrapWithLogQuery(updateAttemptStatus)
_, err = store.db.NewRaw(wrapQuery,
query := store.db.NewRaw(wrapQuery,
string(status),
comment,
index,
log.ID,
log.Channel,
log.Payload,
log.CreatedAt,
).Exec(context.Background(), &attempt)
)

err = query.Scan(context.Background(), &attempt)

} else {
_, err = store.db.NewRaw(updateAttemptStatus,
query := store.db.NewRaw(updateAttemptStatus,
string(status),
comment,
index,
).Exec(context.Background(), &attempt)
)

err = query.Scan(context.Background(), &attempt)
}

if err == sql.ErrNoRows {
Expand All @@ -131,7 +135,7 @@ func (store PostgresStore) ChangeAttemptStatus(index string, status commons.Atte
return attempt, err
}

func (store PostgresStore) UpdateAttemptEndpoint(index string, endpoint string) (commons.Attempt, error){
func (store PostgresStore) UpdateAttemptEndpoint(index string, endpoint string) (commons.Attempt, error) {
var attempt commons.Attempt

_, err := store.db.NewRaw(updateAttemptEndpoint, endpoint, index).Exec(context.Background(), &attempt)
Expand Down
2 changes: 1 addition & 1 deletion ee/webhooks/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@ paths:
/v2/attempts/waiting/{attemptId}/abort:
put:
summary: Abort one waiting attempt
operationId: retryWaitingAttempt1
operationId: abortWaitingAttempt
description: Abort one waiting attempt
tags:
- Webhooks
Expand Down
2 changes: 1 addition & 1 deletion ee/webhooks/openapi/v2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ paths:
/v2/attempts/waiting/{attemptId}/abort:
put:
summary: Abort one waiting attempt
operationId: retryWaitingAttempt
operationId: abortWaitingAttempt
description: Abort one waiting attempt
tags:
- Webhooks
Expand Down
1 change: 0 additions & 1 deletion libs/go-libs/sync/shared/sharedarr.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package shared

import (
"sync"

)

type SharedArr[T any] struct {
Expand Down
1 change: 0 additions & 1 deletion libs/go-libs/sync/shared/sharedmap.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package shared


type SharedMap[T any] struct {
Shared[map[string]*Shared[T]]
}
Expand Down
4 changes: 1 addition & 3 deletions libs/go-libs/sync/shared/sharedmaparr.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
package shared


type SharedMapArr[T any] struct {
Shared[map[string]*SharedArr[T]]
}

func (s *SharedMapArr[T]) Add(index string, el *Shared[T]) {
s.WLock()
defer s.WUnlock()



if sharedArr, ok := (*s.Val)[index]; ok {
sharedArr.Add(el)
} else {
Expand Down
26 changes: 13 additions & 13 deletions releases/sdks/go/.speakeasy/gen.lock
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
lockVersion: 2.0.0
id: 7eac0a45-60a2-40bb-9e85-26bd77ec2a6d
management:
docChecksum: b03e791e9f3e760510270d7444f4bce3
docChecksum: c7b24185fbcd90d84a1020d6305d1524
docVersion: v0.0.0
speakeasyVersion: 1.292.0
generationVersion: 2.332.4
releaseVersion: 0.0.1
configChecksum: a4b5c74b8763c6fdc98e6adfde02c782
releaseVersion: v0.0.0
configChecksum: 28c1da66ee9378d9623ec2601b659dd5
features:
go:
additionalDependencies: 0.1.0
Expand Down Expand Up @@ -211,6 +211,7 @@ generatedFiles:
- /pkg/models/operations/updatewallet.go
- /pkg/models/operations/voidhold.go
- /pkg/models/operations/walletsgetserverinfo.go
- /pkg/models/operations/abortwaitingattempt.go
- /pkg/models/operations/activateconfig.go
- /pkg/models/operations/activatehook.go
- /pkg/models/operations/changeconfigsecret.go
Expand All @@ -226,7 +227,6 @@ generatedFiles:
- /pkg/models/operations/insertconfig.go
- /pkg/models/operations/inserthook.go
- /pkg/models/operations/retrywaitingattempt.go
- /pkg/models/operations/retrywaitingattempt1.go
- /pkg/models/operations/retrywaitingattempts.go
- /pkg/models/operations/testconfig.go
- /pkg/models/operations/testhook.go
Expand Down Expand Up @@ -568,19 +568,19 @@ generatedFiles:
- /pkg/models/shared/getwalletsummaryresponse.go
- /pkg/models/shared/listbalancesresponse.go
- /pkg/models/shared/listwalletsresponse.go
- /pkg/models/shared/v2attemptresponse.go
- /pkg/models/shared/v2attempt.go
- /pkg/models/shared/webhookserrorsenum.go
- /pkg/models/shared/configresponse.go
- /pkg/models/shared/webhooksconfig.go
- /pkg/models/shared/webhookserrorsenum.go
- /pkg/models/shared/v2hookresponse.go
- /pkg/models/shared/v2hook.go
- /pkg/models/shared/configchangesecret.go
- /pkg/models/shared/v2attemptcursorresponse.go
- /pkg/models/shared/v2attempt.go
- /pkg/models/shared/configsresponse.go
- /pkg/models/shared/v2hookcursorresponse.go
- /pkg/models/shared/configuser.go
- /pkg/models/shared/v2hookbodyparams.go
- /pkg/models/shared/v2attemptresponse.go
- /pkg/models/shared/attemptresponse.go
- /pkg/models/shared/attempt.go
- /pkg/models/shared/security.go
Expand Down Expand Up @@ -886,6 +886,8 @@ generatedFiles:
- docs/pkg/models/operations/voidholdrequest.md
- docs/pkg/models/operations/voidholdresponse.md
- docs/pkg/models/operations/walletsgetserverinforesponse.md
- docs/pkg/models/operations/abortwaitingattemptrequest.md
- docs/pkg/models/operations/abortwaitingattemptresponse.md
- docs/pkg/models/operations/activateconfigrequest.md
- docs/pkg/models/operations/activateconfigresponse.md
- docs/pkg/models/operations/activatehookrequest.md
Expand Down Expand Up @@ -914,8 +916,6 @@ generatedFiles:
- docs/pkg/models/operations/inserthookresponse.md
- docs/pkg/models/operations/retrywaitingattemptrequest.md
- docs/pkg/models/operations/retrywaitingattemptresponse.md
- docs/pkg/models/operations/retrywaitingattempt1request.md
- docs/pkg/models/operations/retrywaitingattempt1response.md
- docs/pkg/models/operations/retrywaitingattemptsresponse.md
- docs/pkg/models/operations/testconfigrequest.md
- docs/pkg/models/operations/testconfigresponse.md
Expand Down Expand Up @@ -1359,24 +1359,24 @@ generatedFiles:
- docs/pkg/models/shared/listbalancesresponse.md
- docs/pkg/models/shared/listwalletsresponsecursor.md
- docs/pkg/models/shared/listwalletsresponse.md
- docs/pkg/models/shared/v2attemptresponse.md
- docs/pkg/models/shared/v2attemptstatus.md
- docs/pkg/models/shared/v2attempt.md
- docs/pkg/models/shared/webhookserrorsenum.md
- docs/pkg/models/shared/configresponse.md
- docs/pkg/models/shared/webhooksconfig.md
- docs/pkg/models/shared/webhookserrorsenum.md
- docs/pkg/models/shared/v2hookresponse.md
- docs/pkg/models/shared/v2hookstatus.md
- docs/pkg/models/shared/v2hook.md
- docs/pkg/models/shared/configchangesecret.md
- docs/pkg/models/shared/v2attemptcursorresponsecursor.md
- docs/pkg/models/shared/v2attemptcursorresponse.md
- docs/pkg/models/shared/v2attemptstatus.md
- docs/pkg/models/shared/v2attempt.md
- docs/pkg/models/shared/configsresponsecursor.md
- docs/pkg/models/shared/configsresponse.md
- docs/pkg/models/shared/v2hookcursorresponsecursor.md
- docs/pkg/models/shared/v2hookcursorresponse.md
- docs/pkg/models/shared/configuser.md
- docs/pkg/models/shared/v2hookbodyparams.md
- docs/pkg/models/shared/v2attemptresponse.md
- docs/pkg/models/shared/attemptresponse.md
- docs/pkg/models/shared/attempt.md
- docs/pkg/models/shared/security.md
Expand Down
Loading

0 comments on commit c0b36b8

Please sign in to comment.