From 1d9999aef3118840469ca81d68ee0b883605fd61 Mon Sep 17 00:00:00 2001 From: agourdel Date: Sat, 6 Jul 2024 16:34:15 +0200 Subject: [PATCH] Integration test for Collector --- .../components/commons/webhook_runner.go | 2 + .../components/webhook_collector/collector.go | 4 +- .../controllers/v2/controllers-hooks.go | 3 +- .../components/webhook_worker/worker.go | 6 +- ...-ledger-committed-transaction-collector.go | 100 ++++++++++++++++ ...ks-ledger-committed-transaction-worker.go} | 0 ...-ledger-committed-transaction-collector.go | 110 ++++++++++++++++++ ...v2-ledger-committed-transaction-worker.go} | 0 8 files changed, 219 insertions(+), 6 deletions(-) create mode 100644 tests/integration/suite/webhooks-ledger-committed-transaction-collector.go rename tests/integration/suite/{webhooks-ledger-committed-transaction.go => webhooks-ledger-committed-transaction-worker.go} (100%) create mode 100644 tests/integration/suite/webhooks-v2-ledger-committed-transaction-collector.go rename tests/integration/suite/{webhooks-v2-ledger-committed-transaction.go => webhooks-v2-ledger-committed-transaction-worker.go} (100%) diff --git a/ee/webhooks/internal/components/commons/webhook_runner.go b/ee/webhooks/internal/components/commons/webhook_runner.go index 944d594a8b..891cb70f07 100644 --- a/ee/webhooks/internal/components/commons/webhook_runner.go +++ b/ee/webhooks/internal/components/commons/webhook_runner.go @@ -82,6 +82,7 @@ func (wr *WebhookRunner) HandleFreshLogs(stopChan chan struct{}) { } func (wr *WebhookRunner) HandleFreshLog(log *commons.Log) { + e, err := commons.Event{}.FromPayload(log.Payload) if err != nil { message := fmt.Sprintf("WebhookRunner:HandleFreshLogs() - LogChannels : %s : Error while Event.FromPayload(log.payload): %x", wr.LogChannels, err) @@ -120,6 +121,7 @@ func (wr *WebhookRunner) HandleFreshLog(log *commons.Log) { wr.State.UpdateHookRetry(e.ID, e.Value.(bool)) case commons.NewWaitingAttemptType: + attempt, err := wr.Database.GetAttempt(e.ID) if err != nil { message := fmt.Sprintf("WebhookRunner:HandleFreshLogs() - LogChannels : %s : Error while NewWaitingAttemptType : wr.Database.GetAttempt(e.ID): %x", wr.LogChannels, err) diff --git a/ee/webhooks/internal/components/webhook_collector/collector.go b/ee/webhooks/internal/components/webhook_collector/collector.go index 0c562e639d..fc97c91a98 100644 --- a/ee/webhooks/internal/components/webhook_collector/collector.go +++ b/ee/webhooks/internal/components/webhook_collector/collector.go @@ -87,9 +87,9 @@ func (c *Collector) AsyncHandleSharedAttempt(sAttempt *commons.SharedAttempt, wg } statusCode, err := c.HandleRequest(context.Background(), sAttempt, sHook) - logging.Infof("Webhook_Collector:AsyncHandleSharedAttempt:HandleRequest : Result :: statusCode : %d :: err :%s", statusCode, err) + if err != nil { - logging.Errorf("Collector:AsyncHandleSharedAttempt:HandleRequest : %x", err) + c.State.WaitingAttempts.Add(sAttempt) return } diff --git a/ee/webhooks/internal/components/webhook_controller/controllers/v2/controllers-hooks.go b/ee/webhooks/internal/components/webhook_controller/controllers/v2/controllers-hooks.go index 175c8471a9..2b50e6ef58 100644 --- a/ee/webhooks/internal/components/webhook_controller/controllers/v2/controllers-hooks.go +++ b/ee/webhooks/internal/components/webhook_controller/controllers/v2/controllers-hooks.go @@ -33,7 +33,8 @@ func RegisterV2HookControllers(server serverInterfaces.IHTTPServer, database sto server.Register(string(r.V2CreateHook.Method), r.V2CreateHook.Url, func(w http.ResponseWriter, r *http.Request) { hookParams := commons.HookBodyParams{} - + hookParams.Retry = true + if err := utils.DecodeJSONBody(r, &hookParams); err != nil { sharedapi.BadRequest(w, utils.ErrValidation, err) return diff --git a/ee/webhooks/internal/components/webhook_worker/worker.go b/ee/webhooks/internal/components/webhook_worker/worker.go index e8d62569b3..2b8c609c69 100644 --- a/ee/webhooks/internal/components/webhook_worker/worker.go +++ b/ee/webhooks/internal/components/webhook_worker/worker.go @@ -122,14 +122,14 @@ func (w *Worker) HandleResponse(statusCode int, attempt *commons.Attempt, hook * attempt.LastHttpStatusCode = statusCode attempt.NbTry += 1 var err error - + if commons.IsHTTPRequestSuccess(statusCode) { commons.SetSuccesStatus(attempt) err = w.Database.SaveAttempt(*attempt, false) } - - if !hook.Retry && !attempt.IsSuccess() { + + if hook.Retry && !attempt.IsSuccess() { commons.SetAbortNoRetryModeStatus(attempt) err = w.Database.SaveAttempt(*attempt, true) } diff --git a/tests/integration/suite/webhooks-ledger-committed-transaction-collector.go b/tests/integration/suite/webhooks-ledger-committed-transaction-collector.go new file mode 100644 index 0000000000..6651d856bf --- /dev/null +++ b/tests/integration/suite/webhooks-ledger-committed-transaction-collector.go @@ -0,0 +1,100 @@ +package suite + +import ( + "math/big" + "net/http" + "net/http/httptest" + "time" + "github.com/formancehq/stack/tests/integration/internal/modules" + + "github.com/formancehq/formance-sdk-go/v2/pkg/models/operations" + "github.com/formancehq/formance-sdk-go/v2/pkg/models/shared" + . "github.com/formancehq/stack/tests/integration/internal" + webhooks "github.com/formancehq/webhooks/pkg/utils" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = WithModules([]*Module{modules.Ledger, modules.Webhooks}, func() { + BeforeEach(func() { + createLedgerResponse, err := Client().Ledger.V2CreateLedger(TestContext(), operations.V2CreateLedgerRequest{ + Ledger: "default", + }) + Expect(err).To(BeNil()) + Expect(createLedgerResponse.StatusCode).To(Equal(http.StatusNoContent)) + }) + var ( + httpServer *httptest.Server + called chan struct{} + secret = webhooks.NewSecret() + count int + ) + + BeforeEach(func() { + called = make(chan struct{}) + count = 0 + httpServer = httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + count += 1 + + if count == 1 { + // FOR THE WORKER + w.WriteHeader(http.StatusUnauthorized) + w.Write([]byte("401 Unauthorized")) + } else if count == 2 { + // FOR THE COLLECTOR + w.WriteHeader(http.StatusOK) + w.Write([]byte("200 OK")) + defer close(called) + } + + })) + DeferCleanup(func() { + httpServer.Close() + }) + + response, err := Client().Webhooks.InsertConfig( + TestContext(), + shared.ConfigUser{ + Endpoint: httpServer.URL, + Secret: &secret, + EventTypes: []string{ + "ledger.committed_transactions", + }, + }, + ) + Expect(err).ToNot(HaveOccurred()) + Expect(response.StatusCode).To(Equal(http.StatusOK)) + + }) + + When("creating a transaction", func() { + + BeforeEach(func() { + time.Sleep(1*time.Second) + response, err := Client().Ledger.V2CreateTransaction( + TestContext(), + operations.V2CreateTransactionRequest{ + V2PostTransaction: shared.V2PostTransaction{ + Metadata: map[string]string{}, + Postings: []shared.V2Posting{ + { + Amount: big.NewInt(100), + Asset: "USD", + Source: "world", + Destination: "alice", + }, + }, + }, + Ledger: "default", + }, + ) + Expect(err).ToNot(HaveOccurred()) + Expect(response.StatusCode).To(Equal(http.StatusOK)) + }) + + It("should trigger a call to the webhook endpoint", func() { + Eventually(ChanClosed(called)).Should(BeTrue()) + }) + }) +}) diff --git a/tests/integration/suite/webhooks-ledger-committed-transaction.go b/tests/integration/suite/webhooks-ledger-committed-transaction-worker.go similarity index 100% rename from tests/integration/suite/webhooks-ledger-committed-transaction.go rename to tests/integration/suite/webhooks-ledger-committed-transaction-worker.go diff --git a/tests/integration/suite/webhooks-v2-ledger-committed-transaction-collector.go b/tests/integration/suite/webhooks-v2-ledger-committed-transaction-collector.go new file mode 100644 index 0000000000..6be5abd75d --- /dev/null +++ b/tests/integration/suite/webhooks-v2-ledger-committed-transaction-collector.go @@ -0,0 +1,110 @@ +package suite + +import ( + "math/big" + "net/http" + "net/http/httptest" + "time" + + "github.com/formancehq/stack/tests/integration/internal/modules" + + "github.com/formancehq/formance-sdk-go/v2/pkg/models/operations" + "github.com/formancehq/formance-sdk-go/v2/pkg/models/shared" + . "github.com/formancehq/stack/tests/integration/internal" + webhooks "github.com/formancehq/webhooks/pkg/utils" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = WithModules([]*Module{modules.Ledger, modules.Webhooks}, func() { + BeforeEach(func() { + createLedgerResponse, err := Client().Ledger.V2CreateLedger(TestContext(), operations.V2CreateLedgerRequest{ + Ledger: "default", + }) + Expect(err).To(BeNil()) + Expect(createLedgerResponse.StatusCode).To(Equal(http.StatusNoContent)) + }) + var ( + httpServer *httptest.Server + called chan struct{} + secret = webhooks.NewSecret() + hook1 shared.V2Hook + count int + ) + + BeforeEach(func() { + called = make(chan struct{}) + count = 0 + httpServer = httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + count += 1 + + if count == 1 { + // FOR THE WORKER + w.WriteHeader(http.StatusUnauthorized) + w.Write([]byte("401 Unauthorized")) + } else if count == 2 { + // FOR THE COLLECTOR + w.WriteHeader(http.StatusOK) + w.Write([]byte("200 OK")) + defer close(called) + } + })) + DeferCleanup(func() { + httpServer.Close() + }) + + + + response, err := Client().Webhooks.InsertHook( + TestContext(), + shared.V2HookBodyParams{ + Endpoint: httpServer.URL, + Secret: &secret, + Events: []string{ + "ledger.committed_transactions", + }, + }, + ) + Expect(err).ToNot(HaveOccurred()) + Expect(response.StatusCode).To(Equal(http.StatusOK)) + hook1 = response.V2HookResponse.Data + _, err = Client().Webhooks.ActivateHook( + TestContext(), + operations.ActivateHookRequest{ + HookID: hook1.ID, + }, + ) + Expect(err).ToNot(HaveOccurred()) + + }) + + When("creating a transaction", func() { + BeforeEach(func() { + time.Sleep(1*time.Second) + response, err := Client().Ledger.V2CreateTransaction( + TestContext(), + operations.V2CreateTransactionRequest{ + V2PostTransaction: shared.V2PostTransaction{ + Metadata: map[string]string{}, + Postings: []shared.V2Posting{ + { + Amount: big.NewInt(100), + Asset: "USD", + Source: "world", + Destination: "alice", + }, + }, + }, + Ledger: "default", + }, + ) + Expect(err).ToNot(HaveOccurred()) + Expect(response.StatusCode).To(Equal(http.StatusOK)) + }) + + It("should trigger a call to the webhook endpoint", func() { + Eventually(ChanClosed(called)).Should(BeTrue()) + }) + }) +}) diff --git a/tests/integration/suite/webhooks-v2-ledger-committed-transaction.go b/tests/integration/suite/webhooks-v2-ledger-committed-transaction-worker.go similarity index 100% rename from tests/integration/suite/webhooks-v2-ledger-committed-transaction.go rename to tests/integration/suite/webhooks-v2-ledger-committed-transaction-worker.go