Skip to content

Commit

Permalink
feat(webhooks): V2 : Last Integration test about configs and Hooks
Browse files Browse the repository at this point in the history
  • Loading branch information
agourdel committed Jul 6, 2024
1 parent 6a07021 commit a3ce971
Show file tree
Hide file tree
Showing 12 changed files with 43 additions and 24 deletions.
2 changes: 1 addition & 1 deletion ee/webhooks/internal/commons/state.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package commons

import (
"time"

"time"
"github.com/formancehq/stack/libs/go-libs/sync/shared"
)

Expand Down
7 changes: 1 addition & 6 deletions ee/webhooks/internal/components/commons/webhook_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ func (wr *WebhookRunner) StartHandleFreshLogs() {
}

func (wr *WebhookRunner) HandleFreshLogs(stopChan chan struct{}) {

delay := time.Duration(wr.RunnerParams.DelayPull) * time.Second
ticker := time.NewTicker(time.Duration(delay))
var last_time time.Time = time.Now()
Expand All @@ -65,7 +64,6 @@ func (wr *WebhookRunner) HandleFreshLogs(stopChan chan struct{}) {
case <-stopChan:
return
case <-ticker.C:
logging.Infof("Webhook_Runner :: HandleFreshLogs() - Case Ticker C : ")
freezeTime := time.Now()
logs, err := wr.Database.GetFreshLogs(wr.LogChannels, last_time)
last_time = freezeTime
Expand All @@ -76,7 +74,6 @@ func (wr *WebhookRunner) HandleFreshLogs(stopChan chan struct{}) {
}

for _, log := range *logs {
logging.Infof("Webhook_Runner :: HandleFreshLogs() - NewLog ")
wr.HandleFreshLog(log)
}
}
Expand All @@ -85,19 +82,18 @@ func (wr *WebhookRunner) HandleFreshLogs(stopChan chan struct{}) {
}

func (wr *WebhookRunner) HandleFreshLog(log *commons.Log) {
logging.Infof("Webhook_Runner :: HandleFreshLog() - Log.Payload : %s", log.Payload)
e, err := commons.Event{}.FromPayload(log.Payload)
if err != nil {
message := fmt.Sprintf("WebhookRunner:HandleFreshLogs() - LogChannels : %s : Error while Event.FromPayload(log.payload): %x", wr.LogChannels, err)
logging.Error(message)
panic(message)
}
eventType := commons.TypeFromEvent(e)

switch eventType {

case commons.NewHookType:
hook, err := wr.Database.GetHook(e.ID)

if err != nil {
message := fmt.Sprintf("WebhookRunner:HandleFreshLogs() - LogChannels : %s : Case NewHookType Error while attempting to reach the database: %x", wr.LogChannels, err)
logging.Error(message)
Expand Down Expand Up @@ -130,7 +126,6 @@ func (wr *WebhookRunner) HandleFreshLog(log *commons.Log) {
logging.Error(message)
panic(message)
}

wr.State.AddNewAttempt(&attempt)
case commons.FlushWaitingAttemptType:
wr.State.FlushAttempt(e.ID)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (c *Collector) AsyncHandleSharedAttempt(sAttempt *commons.SharedAttempt, wg
}

statusCode, err := c.HandleRequest(context.Background(), sAttempt, sHook)

logging.Infof("Webhook_Collector:AsyncHandleSharedAttempt:HandleRequest : Result :: statusCode : %d :: err :%s", statusCode, err)
if err != nil {
logging.Errorf("Collector:AsyncHandleSharedAttempt:HandleRequest : %x", err)
c.State.WaitingAttempts.Add(sAttempt)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"net/url"
"strconv"
"strings"

)

type ErrorType string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/http"

"github.com/formancehq/stack/libs/go-libs/bun/bunpaginate"
"github.com/formancehq/stack/libs/go-libs/logging"
"github.com/go-chi/chi/v5"

sharedapi "github.com/formancehq/stack/libs/go-libs/api"
Expand Down Expand Up @@ -258,6 +259,14 @@ func V1CreateHookController(database storeInterface.IStoreProvider, hook utils.V
return utils.InternalErrorResp[utils.V1Hook](err)
}

newHook, err = controllersCommons.ActivateHook(database, newHook.ID)

logging.Infof("Webhook_Controller :: V1CreateHookController :: newHook Status : %s", newHook.Status)

if err != nil {
return utils.InternalErrorResp[utils.V1Hook](err)
}

return utils.SuccessResp(utils.ToV1Hook(newHook))

}
Expand Down
12 changes: 6 additions & 6 deletions ee/webhooks/internal/components/webhook_worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ func (w *Worker) Init() {
}

func (w *Worker) HandleMessage(msg *message.Message) error {
logging.Infof("Handling Message From Webhook : %s", msg.Payload)
var ev *publish.EventMessage
span, ev, err := publish.UnmarshalMessage(msg)
if err != nil {
Expand Down Expand Up @@ -75,7 +74,9 @@ func (w *Worker) HandleMessage(msg *message.Message) error {
if eventApp != "" {
event = strings.Join([]string{eventApp, event}, ".")
}
logging.Infof("Webhook_Worker :: Handling The Event : %s", event)



triggedSHooks := w.State.ActiveHooksByEvent.Get(event)

if triggedSHooks == nil || triggedSHooks.Size() == 0 {
Expand All @@ -91,7 +92,6 @@ func (w *Worker) HandleMessage(msg *message.Message) error {
var globalError error = nil

triggedSHooks.AsyncApply(w.HandlerTriggedHookFactory(traceCtx, event, string(payload), globalError))
logging.Infof("End Message From Webhook : %s", msg.Payload)
return globalError

}
Expand All @@ -106,7 +106,6 @@ func (w *Worker) HandlerTriggedHookFactory(ctx context.Context, event string, pa
hook := sHook.Val
attempt := sAttempt.Val
statusCode, err := w.HandleRequest(ctx, sAttempt, sHook)

if err != nil {
message := fmt.Sprintf("Worker:triggedSHooks.AsyncApply() - HandleTriggedHookFactory() - func(sHook *commons.SharedHook,wg *sync.WaitGroup) - w.HandleRequest - Something Went wrong while trying to make http request: %x", err)
logging.Error(message)
Expand All @@ -124,14 +123,15 @@ func (w *Worker) HandleResponse(statusCode int, attempt *commons.Attempt, hook *
attempt.NbTry += 1
var err error


if commons.IsHTTPRequestSuccess(statusCode) {
commons.SetSuccesStatus(attempt)
err = w.Database.SaveAttempt(*attempt, true)
err = w.Database.SaveAttempt(*attempt, false)
}

if !hook.Retry && !attempt.IsSuccess() {
commons.SetAbortNoRetryModeStatus(attempt)
err = w.Database.SaveAttempt(*attempt, false)
err = w.Database.SaveAttempt(*attempt, true)
}

return err
Expand Down
4 changes: 2 additions & 2 deletions ee/webhooks/internal/services/storage/postgres/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ func StrArray(arr []string) string {
var sb strings.Builder

for i, str := range arr {
sb.WriteString("'")
sb.WriteString("")
sb.WriteString(str)
sb.WriteString("'")
sb.WriteString("")
if i < len(arr)-1 {
sb.WriteString(",")
}
Expand Down
6 changes: 4 additions & 2 deletions libs/go-libs/sync/shared/sharedarr.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package shared

import "sync"
import (
"sync"

)

type SharedArr[T any] struct {
Shared[[]*Shared[T]]
Expand All @@ -15,7 +18,6 @@ func (s *SharedArr[T]) Add(new *Shared[T]) {

func (s *SharedArr[T]) UnsafeAdd(new *Shared[T]) {
idx := s.UnsafeFind(new)

if idx < 0 {
*s.Val = append(*s.Val, new)
}
Expand Down
1 change: 1 addition & 0 deletions libs/go-libs/sync/shared/sharedmap.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package shared


type SharedMap[T any] struct {
Shared[map[string]*Shared[T]]
}
Expand Down
5 changes: 3 additions & 2 deletions libs/go-libs/sync/shared/sharedmaparr.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package shared


type SharedMapArr[T any] struct {
Shared[map[string]*SharedArr[T]]
}

func (s *SharedMapArr[T]) Add(index string, el *Shared[T]) {
s.WLock()
defer s.WUnlock()



if sharedArr, ok := (*s.Val)[index]; ok {
sharedArr.Add(el)
} else {
Expand Down Expand Up @@ -44,7 +46,6 @@ func (s *SharedMapArr[T]) Removes(idxs []string, el *Shared[T]) {
func (s *SharedMapArr[T]) Get(index string) *SharedArr[T] {
s.RLock()
defer s.RUnlock()

if sharedArr, ok := (*s.Val)[index]; ok {
return sharedArr
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package suite

import (
"github.com/formancehq/stack/tests/integration/internal/modules"
"math/big"
"net/http"
"net/http/httptest"
"time"

"github.com/formancehq/stack/tests/integration/internal/modules"

"github.com/formancehq/formance-sdk-go/v2/pkg/models/operations"
"github.com/formancehq/formance-sdk-go/v2/pkg/models/shared"
Expand All @@ -26,6 +28,7 @@ var _ = WithModules([]*Module{modules.Ledger, modules.Webhooks}, func() {
httpServer *httptest.Server
called chan struct{}
secret = webhooks.NewSecret()

)

BeforeEach(func() {
Expand All @@ -50,10 +53,13 @@ var _ = WithModules([]*Module{modules.Ledger, modules.Webhooks}, func() {
)
Expect(err).ToNot(HaveOccurred())
Expect(response.StatusCode).To(Equal(http.StatusOK))

})

When("creating a transaction", func() {

BeforeEach(func() {
time.Sleep(1*time.Second)
response, err := Client().Ledger.V2CreateTransaction(
TestContext(),
operations.V2CreateTransactionRequest{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package suite

import (
"fmt"
"math/big"
"net/http"
"net/http/httptest"
"time"

"github.com/formancehq/stack/tests/integration/internal/modules"

Expand Down Expand Up @@ -35,9 +35,11 @@ var _ = WithModules([]*Module{modules.Ledger, modules.Webhooks}, func() {
called = make(chan struct{})
httpServer = httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Println("INSIDE SERVER ;")

defer close(called)
return



}))
DeferCleanup(func() {
httpServer.Close()
Expand Down Expand Up @@ -65,10 +67,12 @@ var _ = WithModules([]*Module{modules.Ledger, modules.Webhooks}, func() {
},
)
Expect(err).ToNot(HaveOccurred())

})

When("creating a transaction", func() {
BeforeEach(func() {
time.Sleep(1*time.Second)
response, err := Client().Ledger.V2CreateTransaction(
TestContext(),
operations.V2CreateTransactionRequest{
Expand Down

0 comments on commit a3ce971

Please sign in to comment.