From 7b6669c078f05c8cf7e47b8fcf650534773906f9 Mon Sep 17 00:00:00 2001 From: Robert Laszczak Date: Fri, 22 Feb 2019 10:07:00 +0100 Subject: [PATCH] Fixed Kafka races (#54) * fixed some race conditions * speed up of uuid short tests --- internal/norace.go | 5 +++++ internal/race.go | 5 +++++ message/infrastructure/kafka/pubsub_test.go | 4 +++- message/message.go | 4 ++-- uuid_test.go | 5 +++++ 5 files changed, 20 insertions(+), 3 deletions(-) create mode 100644 internal/norace.go create mode 100644 internal/race.go diff --git a/internal/norace.go b/internal/norace.go new file mode 100644 index 000000000..62068860c --- /dev/null +++ b/internal/norace.go @@ -0,0 +1,5 @@ +// +build !race + +package internal + +const RaceEnabled = false diff --git a/internal/race.go b/internal/race.go new file mode 100644 index 000000000..b9a4ba9ac --- /dev/null +++ b/internal/race.go @@ -0,0 +1,5 @@ +// +build race + +package internal + +const RaceEnabled = true diff --git a/message/infrastructure/kafka/pubsub_test.go b/message/infrastructure/kafka/pubsub_test.go index d9028d347..7baaafa8a 100644 --- a/message/infrastructure/kafka/pubsub_test.go +++ b/message/infrastructure/kafka/pubsub_test.go @@ -6,6 +6,8 @@ import ( "testing" "time" + "github.com/ThreeDotsLabs/watermill/internal" + "github.com/Shopify/sarama" "github.com/ThreeDotsLabs/watermill" @@ -102,7 +104,7 @@ func TestPublishSubscribe(t *testing.T) { Persistent: true, } - if testing.Short() { + if testing.Short() && !internal.RaceEnabled { // Kafka tests are a bit slow, so let's run only basic test // todo - speed up t.Log("Running only TestPublishSubscribe for Kafka with -short flag") diff --git a/message/message.go b/message/message.go index 043095be1..5b0cd4fca 100644 --- a/message/message.go +++ b/message/message.go @@ -60,7 +60,7 @@ const ( ) // Equals compare, that two messages are equal. Acks/Nacks are not compared. -func (m Message) Equals(toCompare *Message) bool { +func (m *Message) Equals(toCompare *Message) bool { if m.UUID != toCompare.UUID { return false } @@ -173,7 +173,7 @@ func (m *Message) SetContext(ctx context.Context) { // Copy copies all message without Acks/Nacks. // The context is not propagated to the copy. -func (m Message) Copy() *Message { +func (m *Message) Copy() *Message { msg := NewMessage(m.UUID, m.Payload) msg.Metadata = m.Metadata return msg diff --git a/uuid_test.go b/uuid_test.go index 73825305b..c3d8c4f37 100644 --- a/uuid_test.go +++ b/uuid_test.go @@ -11,6 +11,11 @@ func testuUniqness(t *testing.T, genFunc func() string) { producers := 100 uuidsPerProducer := 10000 + if testing.Short() { + producers = 10 + uuidsPerProducer = 1000 + } + uuidsCount := producers * uuidsPerProducer uuids := make(chan string, uuidsCount)