From 582fa2fac1d31367b86754bcaf96c1fa7cd1af59 Mon Sep 17 00:00:00 2001 From: Juan Antonio Osorio Date: Fri, 22 Dec 2023 13:11:45 +0200 Subject: [PATCH] entity event handler test fixes (#2026) * Wait for eventer to start before running an execution The idea is that the eventer should be ready to issue events before we try to parse and process an event. This also adds another log line to know if the test is blocked because of the last `Wait` call. Closes https://github.com/stacklok/minder/issues/1710 * Add logs for the mock message queue receiver for testing * Ensure topic is registered before eventer runs --- .../controlplane/handlers_githubwebhooks_test.go | 8 ++++---- internal/engine/executor_test.go | 16 ++++++++-------- internal/util/testqueue/passthroughqueue.go | 13 +++++++++++-- 3 files changed, 23 insertions(+), 14 deletions(-) diff --git a/internal/controlplane/handlers_githubwebhooks_test.go b/internal/controlplane/handlers_githubwebhooks_test.go index 25b8bc5727..f89783b14b 100644 --- a/internal/controlplane/handlers_githubwebhooks_test.go +++ b/internal/controlplane/handlers_githubwebhooks_test.go @@ -91,7 +91,7 @@ func (s *UnitTestSuite) TestHandleWebHookPing() { srv := newDefaultServer(t, mockStore) defer srv.evt.Close() - pq := testqueue.NewPassthroughQueue() + pq := testqueue.NewPassthroughQueue(t) queued := pq.GetQueue() srv.evt.Register(engine.ExecuteEntityEventTopic, pq.Pass) @@ -144,7 +144,7 @@ func (s *UnitTestSuite) TestHandleWebHookUnexistentRepository() { srv := newDefaultServer(t, mockStore) defer srv.evt.Close() - pq := testqueue.NewPassthroughQueue() + pq := testqueue.NewPassthroughQueue(t) queued := pq.GetQueue() srv.evt.Register(engine.ExecuteEntityEventTopic, pq.Pass) @@ -210,7 +210,7 @@ func (s *UnitTestSuite) TestHandleWebHookRepository() { srv := newDefaultServer(t, mockStore) defer srv.evt.Close() - pq := testqueue.NewPassthroughQueue() + pq := testqueue.NewPassthroughQueue(t) queued := pq.GetQueue() srv.evt.Register(engine.ExecuteEntityEventTopic, pq.Pass) @@ -327,7 +327,7 @@ func (s *UnitTestSuite) TestHandleWebHookUnexistentRepoPackage() { srv := newDefaultServer(t, mockStore) defer srv.evt.Close() - pq := testqueue.NewPassthroughQueue() + pq := testqueue.NewPassthroughQueue(t) queued := pq.GetQueue() srv.evt.Register(engine.ExecuteEntityEventTopic, pq.Pass) diff --git a/internal/engine/executor_test.go b/internal/engine/executor_test.go index 76facd2817..a42a29a699 100644 --- a/internal/engine/executor_test.go +++ b/internal/engine/executor_test.go @@ -268,15 +268,16 @@ default allow = true`, }) require.NoError(t, err, "failed to setup eventer") + pq := testqueue.NewPassthroughQueue(t) + queued := pq.GetQueue() + go func() { t.Log("Running eventer") + evt.Register(engine.FlushEntityEventTopic, pq.Pass) err := evt.Run(context.Background()) require.NoError(t, err, "failed to run eventer") }() - pq := testqueue.NewPassthroughQueue() - queued := pq.GetQueue() - testTimeout := 5 * time.Second ctx, cancel := context.WithTimeout(context.Background(), testTimeout) defer cancel() @@ -286,8 +287,6 @@ default allow = true`, }, evt) require.NoError(t, err, "expected no error") - evt.Register(engine.FlushEntityEventTopic, pq.Pass) - eiw := engine.NewEntityInfoWrapper(). WithProvider(providerName). WithProjectID(projectID). @@ -301,20 +300,21 @@ default allow = true`, msg, err := eiw.BuildMessage() require.NoError(t, err, "expected no error") + t.Log("waiting for eventer to start") + <-evt.Running() + // Run in the background go func() { t.Log("Running entity event handler") require.NoError(t, e.HandleEntityEvent(msg), "expected no error") }() - t.Log("waiting for eventer to start") - <-evt.Running() - // expect flush t.Log("waiting for flush") require.NotNil(t, <-queued, "expected message") require.NoError(t, evt.Close(), "expected no error") + t.Log("waiting for executor to finish") e.Wait() } diff --git a/internal/util/testqueue/passthroughqueue.go b/internal/util/testqueue/passthroughqueue.go index bfbdcdcea5..cf806eff5c 100644 --- a/internal/util/testqueue/passthroughqueue.go +++ b/internal/util/testqueue/passthroughqueue.go @@ -16,18 +16,26 @@ // Package testqueue contains queue utilities for testing package testqueue -import "github.com/ThreeDotsLabs/watermill/message" +import ( + "testing" + + "github.com/ThreeDotsLabs/watermill/message" +) // PassthroughQueue is a queue that passes messages through. // It's only useful for testing. type PassthroughQueue struct { ch chan *message.Message + t *testing.T } // NewPassthroughQueue creates a new PassthroughQueue -func NewPassthroughQueue() *PassthroughQueue { +func NewPassthroughQueue(t *testing.T) *PassthroughQueue { + t.Helper() + return &PassthroughQueue{ ch: make(chan *message.Message), + t: t, } } @@ -38,6 +46,7 @@ func (q *PassthroughQueue) GetQueue() <-chan *message.Message { // Pass passes a message through the queue func (q *PassthroughQueue) Pass(msg *message.Message) error { + q.t.Logf("Passing message through queue: %s", msg.UUID) q.ch <- msg return nil }