Skip to content

Commit

Permalink
Integration test for Collector
Browse files Browse the repository at this point in the history
  • Loading branch information
agourdel committed Jul 6, 2024
1 parent a3ce971 commit 1d9999a
Show file tree
Hide file tree
Showing 8 changed files with 219 additions and 6 deletions.
2 changes: 2 additions & 0 deletions ee/webhooks/internal/components/commons/webhook_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func (wr *WebhookRunner) HandleFreshLogs(stopChan chan struct{}) {
}

func (wr *WebhookRunner) HandleFreshLog(log *commons.Log) {

e, err := commons.Event{}.FromPayload(log.Payload)
if err != nil {
message := fmt.Sprintf("WebhookRunner:HandleFreshLogs() - LogChannels : %s : Error while Event.FromPayload(log.payload): %x", wr.LogChannels, err)
Expand Down Expand Up @@ -120,6 +121,7 @@ func (wr *WebhookRunner) HandleFreshLog(log *commons.Log) {
wr.State.UpdateHookRetry(e.ID, e.Value.(bool))

case commons.NewWaitingAttemptType:

attempt, err := wr.Database.GetAttempt(e.ID)
if err != nil {
message := fmt.Sprintf("WebhookRunner:HandleFreshLogs() - LogChannels : %s : Error while NewWaitingAttemptType : wr.Database.GetAttempt(e.ID): %x", wr.LogChannels, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ func (c *Collector) AsyncHandleSharedAttempt(sAttempt *commons.SharedAttempt, wg
}

statusCode, err := c.HandleRequest(context.Background(), sAttempt, sHook)
logging.Infof("Webhook_Collector:AsyncHandleSharedAttempt:HandleRequest : Result :: statusCode : %d :: err :%s", statusCode, err)

if err != nil {
logging.Errorf("Collector:AsyncHandleSharedAttempt:HandleRequest : %x", err)

c.State.WaitingAttempts.Add(sAttempt)
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ func RegisterV2HookControllers(server serverInterfaces.IHTTPServer, database sto
server.Register(string(r.V2CreateHook.Method), r.V2CreateHook.Url, func(w http.ResponseWriter, r *http.Request) {

hookParams := commons.HookBodyParams{}

hookParams.Retry = true

if err := utils.DecodeJSONBody(r, &hookParams); err != nil {
sharedapi.BadRequest(w, utils.ErrValidation, err)
return
Expand Down
6 changes: 3 additions & 3 deletions ee/webhooks/internal/components/webhook_worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,14 @@ func (w *Worker) HandleResponse(statusCode int, attempt *commons.Attempt, hook *
attempt.LastHttpStatusCode = statusCode
attempt.NbTry += 1
var err error


if commons.IsHTTPRequestSuccess(statusCode) {
commons.SetSuccesStatus(attempt)
err = w.Database.SaveAttempt(*attempt, false)
}

if !hook.Retry && !attempt.IsSuccess() {
if hook.Retry && !attempt.IsSuccess() {
commons.SetAbortNoRetryModeStatus(attempt)
err = w.Database.SaveAttempt(*attempt, true)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package suite

import (
"math/big"
"net/http"
"net/http/httptest"
"time"
"github.com/formancehq/stack/tests/integration/internal/modules"

"github.com/formancehq/formance-sdk-go/v2/pkg/models/operations"
"github.com/formancehq/formance-sdk-go/v2/pkg/models/shared"
. "github.com/formancehq/stack/tests/integration/internal"
webhooks "github.com/formancehq/webhooks/pkg/utils"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

var _ = WithModules([]*Module{modules.Ledger, modules.Webhooks}, func() {
BeforeEach(func() {
createLedgerResponse, err := Client().Ledger.V2CreateLedger(TestContext(), operations.V2CreateLedgerRequest{
Ledger: "default",
})
Expect(err).To(BeNil())
Expect(createLedgerResponse.StatusCode).To(Equal(http.StatusNoContent))
})
var (
httpServer *httptest.Server
called chan struct{}
secret = webhooks.NewSecret()
count int
)

BeforeEach(func() {
called = make(chan struct{})
count = 0
httpServer = httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
count += 1

if count == 1 {
// FOR THE WORKER
w.WriteHeader(http.StatusUnauthorized)
w.Write([]byte("401 Unauthorized"))
} else if count == 2 {
// FOR THE COLLECTOR
w.WriteHeader(http.StatusOK)
w.Write([]byte("200 OK"))
defer close(called)
}

}))
DeferCleanup(func() {
httpServer.Close()
})

response, err := Client().Webhooks.InsertConfig(
TestContext(),
shared.ConfigUser{
Endpoint: httpServer.URL,
Secret: &secret,
EventTypes: []string{
"ledger.committed_transactions",
},
},
)
Expect(err).ToNot(HaveOccurred())
Expect(response.StatusCode).To(Equal(http.StatusOK))

})

When("creating a transaction", func() {

BeforeEach(func() {
time.Sleep(1*time.Second)
response, err := Client().Ledger.V2CreateTransaction(
TestContext(),
operations.V2CreateTransactionRequest{
V2PostTransaction: shared.V2PostTransaction{
Metadata: map[string]string{},
Postings: []shared.V2Posting{
{
Amount: big.NewInt(100),
Asset: "USD",
Source: "world",
Destination: "alice",
},
},
},
Ledger: "default",
},
)
Expect(err).ToNot(HaveOccurred())
Expect(response.StatusCode).To(Equal(http.StatusOK))
})

It("should trigger a call to the webhook endpoint", func() {
Eventually(ChanClosed(called)).Should(BeTrue())
})
})
})
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package suite

import (
"math/big"
"net/http"
"net/http/httptest"
"time"

"github.com/formancehq/stack/tests/integration/internal/modules"

"github.com/formancehq/formance-sdk-go/v2/pkg/models/operations"
"github.com/formancehq/formance-sdk-go/v2/pkg/models/shared"
. "github.com/formancehq/stack/tests/integration/internal"
webhooks "github.com/formancehq/webhooks/pkg/utils"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

var _ = WithModules([]*Module{modules.Ledger, modules.Webhooks}, func() {
BeforeEach(func() {
createLedgerResponse, err := Client().Ledger.V2CreateLedger(TestContext(), operations.V2CreateLedgerRequest{
Ledger: "default",
})
Expect(err).To(BeNil())
Expect(createLedgerResponse.StatusCode).To(Equal(http.StatusNoContent))
})
var (
httpServer *httptest.Server
called chan struct{}
secret = webhooks.NewSecret()
hook1 shared.V2Hook
count int
)

BeforeEach(func() {
called = make(chan struct{})
count = 0
httpServer = httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
count += 1

if count == 1 {
// FOR THE WORKER
w.WriteHeader(http.StatusUnauthorized)
w.Write([]byte("401 Unauthorized"))
} else if count == 2 {
// FOR THE COLLECTOR
w.WriteHeader(http.StatusOK)
w.Write([]byte("200 OK"))
defer close(called)
}
}))
DeferCleanup(func() {
httpServer.Close()
})



response, err := Client().Webhooks.InsertHook(
TestContext(),
shared.V2HookBodyParams{
Endpoint: httpServer.URL,
Secret: &secret,
Events: []string{
"ledger.committed_transactions",
},
},
)
Expect(err).ToNot(HaveOccurred())
Expect(response.StatusCode).To(Equal(http.StatusOK))
hook1 = response.V2HookResponse.Data
_, err = Client().Webhooks.ActivateHook(
TestContext(),
operations.ActivateHookRequest{
HookID: hook1.ID,
},
)
Expect(err).ToNot(HaveOccurred())

})

When("creating a transaction", func() {
BeforeEach(func() {
time.Sleep(1*time.Second)
response, err := Client().Ledger.V2CreateTransaction(
TestContext(),
operations.V2CreateTransactionRequest{
V2PostTransaction: shared.V2PostTransaction{
Metadata: map[string]string{},
Postings: []shared.V2Posting{
{
Amount: big.NewInt(100),
Asset: "USD",
Source: "world",
Destination: "alice",
},
},
},
Ledger: "default",
},
)
Expect(err).ToNot(HaveOccurred())
Expect(response.StatusCode).To(Equal(http.StatusOK))
})

It("should trigger a call to the webhook endpoint", func() {
Eventually(ChanClosed(called)).Should(BeTrue())
})
})
})

0 comments on commit 1d9999a

Please sign in to comment.