diff --git a/internal/controlplane/handlers_githubwebhooks_test.go b/internal/controlplane/handlers_githubwebhooks_test.go index 25b8bc5727..f89783b14b 100644 --- a/internal/controlplane/handlers_githubwebhooks_test.go +++ b/internal/controlplane/handlers_githubwebhooks_test.go @@ -91,7 +91,7 @@ func (s *UnitTestSuite) TestHandleWebHookPing() { srv := newDefaultServer(t, mockStore) defer srv.evt.Close() - pq := testqueue.NewPassthroughQueue() + pq := testqueue.NewPassthroughQueue(t) queued := pq.GetQueue() srv.evt.Register(engine.ExecuteEntityEventTopic, pq.Pass) @@ -144,7 +144,7 @@ func (s *UnitTestSuite) TestHandleWebHookUnexistentRepository() { srv := newDefaultServer(t, mockStore) defer srv.evt.Close() - pq := testqueue.NewPassthroughQueue() + pq := testqueue.NewPassthroughQueue(t) queued := pq.GetQueue() srv.evt.Register(engine.ExecuteEntityEventTopic, pq.Pass) @@ -210,7 +210,7 @@ func (s *UnitTestSuite) TestHandleWebHookRepository() { srv := newDefaultServer(t, mockStore) defer srv.evt.Close() - pq := testqueue.NewPassthroughQueue() + pq := testqueue.NewPassthroughQueue(t) queued := pq.GetQueue() srv.evt.Register(engine.ExecuteEntityEventTopic, pq.Pass) @@ -327,7 +327,7 @@ func (s *UnitTestSuite) TestHandleWebHookUnexistentRepoPackage() { srv := newDefaultServer(t, mockStore) defer srv.evt.Close() - pq := testqueue.NewPassthroughQueue() + pq := testqueue.NewPassthroughQueue(t) queued := pq.GetQueue() srv.evt.Register(engine.ExecuteEntityEventTopic, pq.Pass) diff --git a/internal/engine/executor_test.go b/internal/engine/executor_test.go index 76facd2817..a42a29a699 100644 --- a/internal/engine/executor_test.go +++ b/internal/engine/executor_test.go @@ -268,15 +268,16 @@ default allow = true`, }) require.NoError(t, err, "failed to setup eventer") + pq := testqueue.NewPassthroughQueue(t) + queued := pq.GetQueue() + go func() { t.Log("Running eventer") + evt.Register(engine.FlushEntityEventTopic, pq.Pass) err := evt.Run(context.Background()) require.NoError(t, err, "failed to run eventer") }() - pq := testqueue.NewPassthroughQueue() - queued := pq.GetQueue() - testTimeout := 5 * time.Second ctx, cancel := context.WithTimeout(context.Background(), testTimeout) defer cancel() @@ -286,8 +287,6 @@ default allow = true`, }, evt) require.NoError(t, err, "expected no error") - evt.Register(engine.FlushEntityEventTopic, pq.Pass) - eiw := engine.NewEntityInfoWrapper(). WithProvider(providerName). WithProjectID(projectID). @@ -301,20 +300,21 @@ default allow = true`, msg, err := eiw.BuildMessage() require.NoError(t, err, "expected no error") + t.Log("waiting for eventer to start") + <-evt.Running() + // Run in the background go func() { t.Log("Running entity event handler") require.NoError(t, e.HandleEntityEvent(msg), "expected no error") }() - t.Log("waiting for eventer to start") - <-evt.Running() - // expect flush t.Log("waiting for flush") require.NotNil(t, <-queued, "expected message") require.NoError(t, evt.Close(), "expected no error") + t.Log("waiting for executor to finish") e.Wait() } diff --git a/internal/util/testqueue/passthroughqueue.go b/internal/util/testqueue/passthroughqueue.go index bfbdcdcea5..cf806eff5c 100644 --- a/internal/util/testqueue/passthroughqueue.go +++ b/internal/util/testqueue/passthroughqueue.go @@ -16,18 +16,26 @@ // Package testqueue contains queue utilities for testing package testqueue -import "github.com/ThreeDotsLabs/watermill/message" +import ( + "testing" + + "github.com/ThreeDotsLabs/watermill/message" +) // PassthroughQueue is a queue that passes messages through. // It's only useful for testing. type PassthroughQueue struct { ch chan *message.Message + t *testing.T } // NewPassthroughQueue creates a new PassthroughQueue -func NewPassthroughQueue() *PassthroughQueue { +func NewPassthroughQueue(t *testing.T) *PassthroughQueue { + t.Helper() + return &PassthroughQueue{ ch: make(chan *message.Message), + t: t, } } @@ -38,6 +46,7 @@ func (q *PassthroughQueue) GetQueue() <-chan *message.Message { // Pass passes a message through the queue func (q *PassthroughQueue) Pass(msg *message.Message) error { + q.t.Logf("Passing message through queue: %s", msg.UUID) q.ch <- msg return nil }