Skip to content

Commit

Permalink
Fix tests running concurrent publish with the same messages (#479)
Browse files Browse the repository at this point in the history
  • Loading branch information
m110 authored Aug 26, 2024
1 parent 84f2ad4 commit f7167cc
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 17 deletions.
11 changes: 8 additions & 3 deletions message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,13 @@ type Payload []byte

// Message is the basic transfer unit.
// Messages are emitted by Publishers and received by Subscribers.
//
// A publisher can modify the message during publishing, e.g. can alter the metadata.
// Avoid modifying the message in parallel with publishing, as it can lead to data races.
// In general, a message should be passed to a single Publish and then considered immutable.
// If needed, use the Copy method to create a new message.
type Message struct {
// UUID is a unique identifier of message.
// UUID is a unique identifier of the message.
//
// It is only used by Watermill for debugging.
// UUID can be empty.
Expand All @@ -35,9 +40,9 @@ type Message struct {
// Payload is the message's payload.
Payload Payload

// ack is closed, when acknowledge is received.
// ack is closed when acknowledge is received.
ack chan struct{}
// noACk is closed, when negative acknowledge is received.
// noACk is closed when negative acknowledge is received.
noAck chan struct{}

ackMutex sync.Mutex
Expand Down
23 changes: 13 additions & 10 deletions message/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,35 @@ import (

// Publisher is the emitting part of a Pub/Sub.
type Publisher interface {
// Publish publishes provided messages to given topic.
// Publish publishes provided messages to the given topic.
//
// Publish can be synchronous or asynchronous - it depends on the implementation.
//
// Most publishers implementations don't support atomic publishing of messages.
// Most publisher implementations don't support atomic publishing of messages.
// This means that if publishing one of the messages fails, the next messages will not be published.
//
// Publish does not work with a single Context.
// Use the Context() method of each message instead.
//
// Publish must be thread safe.
Publish(topic string, messages ...*Message) error
// Close should flush unsent messages, if publisher is async.
// Close should flush unsent messages if publisher is async.
Close() error
}

// Subscriber is the consuming part of the Pub/Sub.
type Subscriber interface {
// Subscribe returns output channel with messages from provided topic.
// Channel is closed, when Close() was called on the subscriber.
// Subscribe returns an output channel with messages from the provided topic.
// The channel is closed after Close() is called on the subscriber.
//
// To receive the next message, `Ack()` must be called on the received message.
// If message processing failed and message should be redelivered `Nack()` should be called.
// If message processing fails and the message should be redelivered `Nack()` should be called instead.
//
// When provided ctx is cancelled, subscriber will close subscribe and close output channel.
// Provided ctx is set to all produced messages.
// When Nack or Ack is called on the message, context of the message is canceled.
// When the provided ctx is canceled, the subscriber closes the subscription and the output channel.
// The provided ctx is passed to all produced messages.
// When Nack or Ack is called on the message, the context of the message is canceled.
Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
// Close closes all subscriptions with their output channels and flush offsets etc. when needed.
// Close closes all subscriptions with their output channels and flushes offsets etc. when needed.
Close() error
}

Expand Down
2 changes: 1 addition & 1 deletion message/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1356,7 +1356,7 @@ func TestRouter_context_cancel_does_not_log_error(t *testing.T) {

require.Eventually(t, func() bool {
return r.IsClosed()
}, 1*time.Second, 1*time.Millisecond, "Router should be closed after all handlers are stopped")
}, 3*time.Second, 5*time.Millisecond, "Router should be closed after all handlers are stopped")

assert.Empty(t, logger.Captured()[watermill.ErrorLogLevel], "No error should be logged when context is canceled")
}
Expand Down
12 changes: 9 additions & 3 deletions pubsub/tests/test_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,12 @@ func TestConcurrentSubscribeMultipleTopics(
for i := 0; i < topicsCount; i++ {
topicName := testTopicName(tCtx.TestID) + fmt.Sprintf("-%d", i)

var messagesToPublishForTopic []*message.Message
for _, msg := range messagesToPublish {
newMsg := msg.Copy()
messagesToPublishForTopic = append(messagesToPublishForTopic, newMsg)
}

go func() {
defer subsWg.Done()

Expand All @@ -344,7 +350,7 @@ func TestConcurrentSubscribeMultipleTopics(
}
}

err := publishWithRetry(pub, topicName, messagesToPublish...)
err := publishWithRetry(pub, topicName, messagesToPublishForTopic...)
if err != nil {
t.Error(err)
}
Expand All @@ -353,7 +359,7 @@ func TestConcurrentSubscribeMultipleTopics(
if err != nil {
t.Error(err)
}
topicMessages, _ := bulkRead(tCtx, messages, len(messagesToPublish), defaultTimeout*5)
topicMessages, _ := bulkRead(tCtx, messages, len(messagesToPublishForTopic), defaultTimeout*5)

receivedMessagesCh <- topicMessages
}()
Expand Down Expand Up @@ -1270,7 +1276,7 @@ func AddSimpleMessagesParallel(t *testing.T, messagesCount int, publisher messag
for i := 0; i < publishers; i++ {
go func() {
for msg := range publishMsg {
err := publishWithRetry(publisher, topicName, msg)
err := publishWithRetry(publisher, topicName, msg.Copy())
require.NoError(t, err, "cannot publish messages")
wg.Done()
}
Expand Down

0 comments on commit f7167cc

Please sign in to comment.