Skip to content

Commit

Permalink
fixed some race conditions (#52)
Browse files Browse the repository at this point in the history
  • Loading branch information
roblaszczak authored and maclav3 committed Feb 21, 2019
1 parent 218edd8 commit 927c7ae
Show file tree
Hide file tree
Showing 15 changed files with 78 additions and 29 deletions.
2 changes: 0 additions & 2 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,3 @@ test_race:
extends: .test
variables:
TEST_FLAGS: "-short -race"
only:
- master
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ test_v:
test_short:
go test ./... -short

test_race:
go test ./... -short -race

test_stress:
go test -tags=stress ./...

Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ services:

googlecloud:
image: google/cloud-sdk:228.0.0
entrypoint: gcloud --quiet beta emulators pubsub start --host-port=localhost:8085 --verbosity=debug --log-http
entrypoint: gcloud --quiet beta emulators pubsub start --host-port=0.0.0.0:8085 --verbosity=debug --log-http
ports:
- 8085:8085
restart: on-failure
Expand Down
22 changes: 18 additions & 4 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"reflect"
"sort"
"strings"
"sync"
)

type LogFields map[string]interface{}
Expand Down Expand Up @@ -160,6 +161,7 @@ type CapturedMessage struct {
type CaptureLoggerAdapter struct {
captured map[LogLevel][]CapturedMessage
fields LogFields
lock sync.Mutex
}

func NewCaptureLogger() *CaptureLoggerAdapter {
Expand All @@ -169,18 +171,27 @@ func NewCaptureLogger() *CaptureLoggerAdapter {
}

func (c *CaptureLoggerAdapter) With(fields LogFields) LoggerAdapter {
return &CaptureLoggerAdapter{c.captured, c.fields.Add(fields)}
return &CaptureLoggerAdapter{captured: c.captured, fields: c.fields.Add(fields)}
}

func (c *CaptureLoggerAdapter) capture(msg CapturedMessage) {
c.lock.Lock()
defer c.lock.Unlock()

c.captured[msg.Level] = append(c.captured[msg.Level], msg)
}

func (c CaptureLoggerAdapter) Captured() map[LogLevel][]CapturedMessage {
func (c *CaptureLoggerAdapter) Captured() map[LogLevel][]CapturedMessage {
c.lock.Lock()
defer c.lock.Unlock()

return c.captured
}

func (c CaptureLoggerAdapter) Has(msg CapturedMessage) bool {
func (c *CaptureLoggerAdapter) Has(msg CapturedMessage) bool {
c.lock.Lock()
defer c.lock.Unlock()

for _, capturedMsg := range c.captured[msg.Level] {
if reflect.DeepEqual(msg, capturedMsg) {
return true
Expand All @@ -189,7 +200,10 @@ func (c CaptureLoggerAdapter) Has(msg CapturedMessage) bool {
return false
}

func (c CaptureLoggerAdapter) HasError(err error) bool {
func (c *CaptureLoggerAdapter) HasError(err error) bool {
c.lock.Lock()
defer c.lock.Unlock()

for _, capturedMsg := range c.captured[ErrorLogLevel] {
if capturedMsg.Err == err {
return true
Expand Down
3 changes: 2 additions & 1 deletion message/decorator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,10 @@ func TestMessageTransformSubscriberDecorator_Subscribe(t *testing.T) {
go func() {
for i := 0; i < numMessages; i++ {
msg := message.NewMessage(strconv.Itoa(i), []byte{})
sent = append(sent, msg)

err = pubsub.Publish("topic", msg)
require.NoError(t, err)
sent = append(sent, msg)
}
}()

Expand Down
19 changes: 15 additions & 4 deletions message/infrastructure/gochannel/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,12 @@ type GoChannel struct {
subscribersLock sync.RWMutex
subscribersByTopicLock sync.Map // map of *sync.Mutex

closed bool
closing chan struct{}
closed bool
closedLock sync.Mutex
closing chan struct{}

persistedMessages map[string][]*message.Message
persistedMessages map[string][]*message.Message
persistedMessagesLock sync.RWMutex
}

func (g *GoChannel) Publisher() message.Publisher {
Expand Down Expand Up @@ -99,10 +101,12 @@ func (g *GoChannel) Publish(topic string, messages ...*message.Message) error {
defer subLock.(*sync.Mutex).Unlock()

if g.config.Persistent {
g.persistedMessagesLock.Lock()
if _, ok := g.persistedMessages[topic]; !ok {
g.persistedMessages[topic] = make([]*message.Message, 0)
}
g.persistedMessages[topic] = append(g.persistedMessages[topic], messages...)
g.persistedMessagesLock.Unlock()
}

for i := range messages {
Expand Down Expand Up @@ -210,7 +214,11 @@ func (g *GoChannel) Subscribe(ctx context.Context, topic string) (<-chan *messag
defer g.subscribersLock.Unlock()
defer subLock.(*sync.Mutex).Unlock()

if messages, ok := g.persistedMessages[topic]; ok {
g.persistedMessagesLock.RLock()
messages, ok := g.persistedMessages[topic]
g.persistedMessagesLock.RUnlock()

if ok {
for i := range messages {
msg := g.persistedMessages[topic][i]

Expand Down Expand Up @@ -255,6 +263,9 @@ func (g *GoChannel) topicSubscribers(topic string) []*subscriber {
}

func (g *GoChannel) Close() error {
g.closedLock.Lock()
defer g.closedLock.Unlock()

if g.closed {
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion message/infrastructure/googlecloud/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,10 @@ func (p *Publisher) topic(ctx context.Context, topic string) (t *pubsub.Topic, e

p.topicsLock.Lock()
defer func() {
p.topicsLock.Unlock()
if err == nil {
p.topics[topic] = t
}
p.topicsLock.Unlock()
}()

t = p.client.Topic(topic)
Expand Down
2 changes: 1 addition & 1 deletion message/infrastructure/googlecloud/pubsub_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/ThreeDotsLabs/watermill/message/infrastructure/googlecloud"
)

// Run `docker-compose up` and set PUBSUB_EMULATOR_HOST=googlecloud:8085 for this to work
// Run `docker-compose up` and set PUBSUB_EMULATOR_HOST=localhost:8085 for this to work

func BenchmarkSubscriber(b *testing.B) {
infrastructure.BenchSubscriber(b, func(n int) message.PubSub {
Expand Down
2 changes: 1 addition & 1 deletion message/infrastructure/googlecloud/pubsub_stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/ThreeDotsLabs/watermill/message/infrastructure"
)

// Run `docker-compose up` and set PUBSUB_EMULATOR_HOST=googlecloud:8085 for this to work
// Run `docker-compose up` and set PUBSUB_EMULATOR_HOST=localhost:8085 for this to work

func TestPublishSubscribe_stress(t *testing.T) {
infrastructure.TestPubSubStressTest(
Expand Down
2 changes: 1 addition & 1 deletion message/infrastructure/googlecloud/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/ThreeDotsLabs/watermill/message/infrastructure/googlecloud"
)

// Run `docker-compose up` and set PUBSUB_EMULATOR_HOST=googlecloud:8085 for this to work
// Run `docker-compose up` and set PUBSUB_EMULATOR_HOST=localhost:8085 for this to work

func newPubSub(t *testing.T, marshaler googlecloud.MarshalerUnmarshaler, subscriptionName googlecloud.SubscriptionNameFn) message.PubSub {
ctx := context.Background()
Expand Down
17 changes: 13 additions & 4 deletions message/infrastructure/http/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,12 @@ func (s *SubscriberConfig) setDefaults() {
type Subscriber struct {
config SubscriberConfig

server *http.Server
address net.Addr
logger watermill.LoggerAdapter
server *http.Server

address net.Addr
addrLock sync.RWMutex

logger watermill.LoggerAdapter

outputChannels []chan *message.Message
outputChannelsLock sync.Locker
Expand Down Expand Up @@ -154,12 +157,18 @@ func (s *Subscriber) StartHTTPServer() error {
if err != nil {
return err
}
s.addrLock.Lock()
s.address = listener.Addr()
s.addrLock.Unlock()

return s.server.Serve(listener)
}

// Addr returns the server address or nil if the server isn't running.
func (s Subscriber) Addr() net.Addr {
func (s *Subscriber) Addr() net.Addr {
s.addrLock.RLock()
defer s.addrLock.RUnlock()

return s.address
}

Expand Down
25 changes: 18 additions & 7 deletions message/infrastructure/nats/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,9 @@ func (s *StreamingSubscriber) Subscribe(ctx context.Context, topic string) (<-ch

s.logger.Debug("Starting subscriber", subscriberLogFields)

sub, err := s.subscribe(ctx, output, topic, subscriberLogFields)
processMessagesWg := &sync.WaitGroup{}

sub, err := s.subscribe(ctx, output, topic, subscriberLogFields, processMessagesWg)
if err != nil {
return nil, errors.Wrap(err, "cannot subscribe")
}
Expand All @@ -187,6 +189,7 @@ func (s *StreamingSubscriber) Subscribe(ctx context.Context, topic string) (<-ch
s.logger.Error("Cannot close subscriber", err, subscriberLogFields)
}

processMessagesWg.Wait()
close(output)
s.outputsWg.Done()
}(sub, subscriberLogFields)
Expand All @@ -200,7 +203,13 @@ func (s *StreamingSubscriber) Subscribe(ctx context.Context, topic string) (<-ch
}

func (s *StreamingSubscriber) SubscribeInitialize(topic string) (err error) {
sub, err := s.subscribe(context.Background(), make(chan *message.Message), topic, nil)
sub, err := s.subscribe(
context.Background(),
make(chan *message.Message),
topic,
nil,
&sync.WaitGroup{},
)
if err != nil {
return errors.Wrap(err, "cannot initialize subscribe")
}
Expand All @@ -213,12 +222,16 @@ func (s *StreamingSubscriber) subscribe(
output chan *message.Message,
topic string,
subscriberLogFields watermill.LogFields,
processMessagesWg *sync.WaitGroup,
) (stan.Subscription, error) {
if s.config.QueueGroup != "" {
return s.conn.QueueSubscribe(
topic,
s.config.QueueGroup,
func(m *stan.Msg) {
processMessagesWg.Add(1)
defer processMessagesWg.Done()

s.processMessage(ctx, m, output, subscriberLogFields)
},
s.config.StanSubscriptionOptions...,
Expand All @@ -228,6 +241,9 @@ func (s *StreamingSubscriber) subscribe(
return s.conn.Subscribe(
topic,
func(m *stan.Msg) {
processMessagesWg.Add(1)
defer processMessagesWg.Done()

s.processMessage(ctx, m, output, subscriberLogFields)
},
s.config.StanSubscriptionOptions...,
Expand Down Expand Up @@ -262,11 +278,6 @@ func (s *StreamingSubscriber) processMessage(
messageLogFields := logFields.Add(watermill.LogFields{"message_uuid": msg.UUID})
s.logger.Trace("Unmarshaled message", messageLogFields)

if s.closed {
s.logger.Trace("Closed, message discarded", messageLogFields)
return
}

select {
case output <- msg:
s.logger.Trace("Message sent to consumer", messageLogFields)
Expand Down
2 changes: 1 addition & 1 deletion message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (m *Message) Nacked() <-chan struct{} {
//
// The returned context is always non-nil; it defaults to the
// background context.
func (m Message) Context() context.Context {
func (m *Message) Context() context.Context {
if m.ctx != nil {
return m.ctx
}
Expand Down
2 changes: 1 addition & 1 deletion message/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ func (r *Router) decorateHandlerSubscriber(h *handler) error {
}

// addHandlerContext enriches the contex with values that are relevant within this handler's context.
func (h handler) addHandlerContext(messages ...*Message) {
func (h *handler) addHandlerContext(messages ...*Message) {
for i, msg := range messages {
ctx := msg.Context()

Expand Down
2 changes: 2 additions & 0 deletions message/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ func TestRouter_functional(t *testing.T) {
)

go r.Run()
<-r.Running()

defer func() {
assert.NoError(t, r.Close())
}()
Expand Down

0 comments on commit 927c7ae

Please sign in to comment.