Skip to content

Commit

Permalink
entity event handler test fixes (#2026)
Browse files Browse the repository at this point in the history
* Wait for eventer to start  before running an execution

The idea is that the eventer should be ready to issue events before we
try to parse and process an event.

This also adds another log line to know if the test is blocked because of the
last `Wait` call.

Closes #1710

* Add logs for the mock message queue receiver for testing

* Ensure topic is registered before eventer runs
  • Loading branch information
JAORMX authored Dec 22, 2023
1 parent c338f48 commit 582fa2f
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 14 deletions.
8 changes: 4 additions & 4 deletions internal/controlplane/handlers_githubwebhooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (s *UnitTestSuite) TestHandleWebHookPing() {
srv := newDefaultServer(t, mockStore)
defer srv.evt.Close()

pq := testqueue.NewPassthroughQueue()
pq := testqueue.NewPassthroughQueue(t)
queued := pq.GetQueue()

srv.evt.Register(engine.ExecuteEntityEventTopic, pq.Pass)
Expand Down Expand Up @@ -144,7 +144,7 @@ func (s *UnitTestSuite) TestHandleWebHookUnexistentRepository() {
srv := newDefaultServer(t, mockStore)
defer srv.evt.Close()

pq := testqueue.NewPassthroughQueue()
pq := testqueue.NewPassthroughQueue(t)
queued := pq.GetQueue()

srv.evt.Register(engine.ExecuteEntityEventTopic, pq.Pass)
Expand Down Expand Up @@ -210,7 +210,7 @@ func (s *UnitTestSuite) TestHandleWebHookRepository() {
srv := newDefaultServer(t, mockStore)
defer srv.evt.Close()

pq := testqueue.NewPassthroughQueue()
pq := testqueue.NewPassthroughQueue(t)
queued := pq.GetQueue()

srv.evt.Register(engine.ExecuteEntityEventTopic, pq.Pass)
Expand Down Expand Up @@ -327,7 +327,7 @@ func (s *UnitTestSuite) TestHandleWebHookUnexistentRepoPackage() {
srv := newDefaultServer(t, mockStore)
defer srv.evt.Close()

pq := testqueue.NewPassthroughQueue()
pq := testqueue.NewPassthroughQueue(t)
queued := pq.GetQueue()

srv.evt.Register(engine.ExecuteEntityEventTopic, pq.Pass)
Expand Down
16 changes: 8 additions & 8 deletions internal/engine/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,15 +268,16 @@ default allow = true`,
})
require.NoError(t, err, "failed to setup eventer")

pq := testqueue.NewPassthroughQueue(t)
queued := pq.GetQueue()

go func() {
t.Log("Running eventer")
evt.Register(engine.FlushEntityEventTopic, pq.Pass)
err := evt.Run(context.Background())
require.NoError(t, err, "failed to run eventer")
}()

pq := testqueue.NewPassthroughQueue()
queued := pq.GetQueue()

testTimeout := 5 * time.Second
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
Expand All @@ -286,8 +287,6 @@ default allow = true`,
}, evt)
require.NoError(t, err, "expected no error")

evt.Register(engine.FlushEntityEventTopic, pq.Pass)

eiw := engine.NewEntityInfoWrapper().
WithProvider(providerName).
WithProjectID(projectID).
Expand All @@ -301,20 +300,21 @@ default allow = true`,
msg, err := eiw.BuildMessage()
require.NoError(t, err, "expected no error")

t.Log("waiting for eventer to start")
<-evt.Running()

// Run in the background
go func() {
t.Log("Running entity event handler")
require.NoError(t, e.HandleEntityEvent(msg), "expected no error")
}()

t.Log("waiting for eventer to start")
<-evt.Running()

// expect flush
t.Log("waiting for flush")
require.NotNil(t, <-queued, "expected message")

require.NoError(t, evt.Close(), "expected no error")

t.Log("waiting for executor to finish")
e.Wait()
}
13 changes: 11 additions & 2 deletions internal/util/testqueue/passthroughqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,26 @@
// Package testqueue contains queue utilities for testing
package testqueue

import "github.com/ThreeDotsLabs/watermill/message"
import (
"testing"

"github.com/ThreeDotsLabs/watermill/message"
)

// PassthroughQueue is a queue that passes messages through.
// It's only useful for testing.
type PassthroughQueue struct {
ch chan *message.Message
t *testing.T
}

// NewPassthroughQueue creates a new PassthroughQueue
func NewPassthroughQueue() *PassthroughQueue {
func NewPassthroughQueue(t *testing.T) *PassthroughQueue {
t.Helper()

return &PassthroughQueue{
ch: make(chan *message.Message),
t: t,
}
}

Expand All @@ -38,6 +46,7 @@ func (q *PassthroughQueue) GetQueue() <-chan *message.Message {

// Pass passes a message through the queue
func (q *PassthroughQueue) Pass(msg *message.Message) error {
q.t.Logf("Passing message through queue: %s", msg.UUID)
q.ch <- msg
return nil
}

0 comments on commit 582fa2f

Please sign in to comment.