diff --git a/.github/workflows/pr.yml b/.github/workflows/pr.yml index 3a19c48ff..ca488c3f3 100644 --- a/.github/workflows/pr.yml +++ b/.github/workflows/pr.yml @@ -5,4 +5,4 @@ on: - master jobs: ci: - uses: ThreeDotsLabs/watermill/.github/workflows/tests.yml@master + uses: ThreeDotsLabs/watermill/.github/workflows/tests.yml@lint diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index a2cf123e6..d672afff2 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -16,6 +16,15 @@ jobs: with: go-version: '^1.19.1' - run: make build + lint: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: actions/setup-go@v2 + with: + go-version: '^1.19.1' + - name: golangci-lint + uses: golangci/golangci-lint-action@v3 tests: needs: [build] runs-on: ubuntu-latest diff --git a/components/cqrs/command_bus_test.go b/components/cqrs/command_bus_test.go index cce5fc6b3..cc3f6b8a6 100644 --- a/components/cqrs/command_bus_test.go +++ b/components/cqrs/command_bus_test.go @@ -34,6 +34,8 @@ func TestNewCommandBus(t *testing.T) { assert.Error(t, err) } +type contextKey string + func TestCommandBus_Send_ContextPropagation(t *testing.T) { publisher := newPublisherStub() @@ -46,7 +48,7 @@ func TestCommandBus_Send_ContextPropagation(t *testing.T) { ) require.NoError(t, err) - ctx := context.WithValue(context.Background(), "key", "value") + ctx := context.WithValue(context.Background(), contextKey("key"), "value") err = commandBus.Send(ctx, "message") require.NoError(t, err) diff --git a/components/cqrs/event_bus_test.go b/components/cqrs/event_bus_test.go index fd2522820..ea5056b27 100644 --- a/components/cqrs/event_bus_test.go +++ b/components/cqrs/event_bus_test.go @@ -45,7 +45,7 @@ func TestEventBus_Send_ContextPropagation(t *testing.T) { ) require.NoError(t, err) - ctx := context.WithValue(context.Background(), "key", "value") + ctx := context.WithValue(context.Background(), contextKey("key"), "value") err = eventBus.Publish(ctx, "message") require.NoError(t, err) diff --git a/components/cqrs/marshaler_protobuf_test.go b/components/cqrs/marshaler_protobuf_test.go index a503475c7..276ea83dc 100644 --- a/components/cqrs/marshaler_protobuf_test.go +++ b/components/cqrs/marshaler_protobuf_test.go @@ -4,20 +4,18 @@ import ( "testing" "time" - "github.com/ThreeDotsLabs/watermill" - "github.com/ThreeDotsLabs/watermill/components/cqrs" - - "github.com/golang/protobuf/ptypes" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/ThreeDotsLabs/watermill" + "github.com/ThreeDotsLabs/watermill/components/cqrs" ) func TestProtobufMarshaler(t *testing.T) { marshaler := cqrs.ProtobufMarshaler{} - when, err := ptypes.TimestampProto(time.Now()) - require.NoError(t, err) + when := timestamppb.New(time.Now()) eventToMarshal := &TestProtobufEvent{ Id: watermill.NewULID(), When: when, @@ -41,8 +39,7 @@ func TestProtobufMarshaler_Marshal_generated_name(t *testing.T) { }, } - when, err := ptypes.TimestampProto(time.Now()) - require.NoError(t, err) + when := timestamppb.New(time.Now()) eventToMarshal := &TestProtobufEvent{ Id: watermill.NewULID(), When: when, diff --git a/components/forwarder/envelope_test.go b/components/forwarder/envelope_test.go index ff50e4f9a..1a0fcd3cc 100644 --- a/components/forwarder/envelope_test.go +++ b/components/forwarder/envelope_test.go @@ -10,13 +10,15 @@ import ( "github.com/stretchr/testify/require" ) +type contextKey string + func TestEnvelope(t *testing.T) { expectedUUID := watermill.NewUUID() expectedPayload := message.Payload("msg content") expectedMetadata := message.Metadata{"key": "value"} expectedDestinationTopic := "dest_topic" - ctx := context.WithValue(context.Background(), "key", "value") + ctx := context.WithValue(context.Background(), contextKey("key"), "value") msg := message.NewMessage(expectedUUID, expectedPayload) msg.Metadata = expectedMetadata @@ -25,7 +27,7 @@ func TestEnvelope(t *testing.T) { wrappedMsg, err := wrapMessageInEnvelope(expectedDestinationTopic, msg) require.NoError(t, err) require.NotNil(t, wrappedMsg) - v, ok := wrappedMsg.Context().Value("key").(string) + v, ok := wrappedMsg.Context().Value(contextKey("key")).(string) require.True(t, ok) require.Equal(t, "value", v) diff --git a/components/metrics/http_test.go b/components/metrics/http_test.go index e5d56cec0..3098be608 100644 --- a/components/metrics/http_test.go +++ b/components/metrics/http_test.go @@ -5,16 +5,17 @@ import ( "testing" "time" - "github.com/ThreeDotsLabs/watermill/components/metrics" "github.com/pkg/errors" - "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/collectors" "github.com/stretchr/testify/assert" + + "github.com/ThreeDotsLabs/watermill/components/metrics" ) func TestCreateRegistryAndServeHTTP_metrics_endpoint(t *testing.T) { reg, cancel := metrics.CreateRegistryAndServeHTTP(":8090") defer cancel() - err := reg.Register(prometheus.NewBuildInfoCollector()) + err := reg.Register(collectors.NewBuildInfoCollector()) if err != nil { t.Fatal(errors.Wrap(err, "registration of prometheus build info collector failed")) } @@ -34,7 +35,7 @@ func TestCreateRegistryAndServeHTTP_metrics_endpoint(t *testing.T) { func TestCreateRegistryAndServeHTTP_unknown_endpoint(t *testing.T) { reg, cancel := metrics.CreateRegistryAndServeHTTP(":8091") defer cancel() - err := reg.Register(prometheus.NewBuildInfoCollector()) + err := reg.Register(collectors.NewBuildInfoCollector()) if err != nil { t.Error(errors.Wrap(err, "registration of prometheus build info collector failed")) } diff --git a/log.go b/log.go index 671a17255..55f332867 100644 --- a/log.go +++ b/log.go @@ -148,7 +148,7 @@ func (l *StdLoggerAdapter) log(logger *log.Logger, level string, msg string, fie fieldsStr += key + "=" + valueStr + " " } - logger.Output(3, fmt.Sprintf("\t"+`level=%s msg="%s" %s`, level, msg, fieldsStr)) + _ = logger.Output(3, fmt.Sprintf("\t"+`level=%s msg="%s" %s`, level, msg, fieldsStr)) } type LogLevel uint diff --git a/message/router/middleware/throttle.go b/message/router/middleware/throttle.go index fd825e438..4c4e2cb14 100644 --- a/message/router/middleware/throttle.go +++ b/message/router/middleware/throttle.go @@ -9,22 +9,22 @@ import ( // Throttle provides a middleware that limits the amount of messages processed per unit of time. // This may be done e.g. to prevent excessive load caused by running a handler on a long queue of unprocessed messages. type Throttle struct { - throttle <-chan time.Time + ticker *time.Ticker } // NewThrottle creates a new Throttle middleware. // Example duration and count: NewThrottle(10, time.Second) for 10 messages per second func NewThrottle(count int64, duration time.Duration) *Throttle { - return &Throttle{time.Tick(duration / time.Duration(count))} + return &Throttle{ + ticker: time.NewTicker(duration / time.Duration(count)), + } } // Middleware returns the Throttle middleware. func (t Throttle) Middleware(h message.HandlerFunc) message.HandlerFunc { return func(message *message.Message) ([]*message.Message, error) { - select { - case <-t.throttle: - // throttle is shared by multiple handlers, which will wait for their "tick" - } + // throttle is shared by multiple handlers, which will wait for their "tick" + <-t.ticker.C return h(message) } diff --git a/message/router/middleware/throttle_test.go b/message/router/middleware/throttle_test.go index 750cc52d8..25911694b 100644 --- a/message/router/middleware/throttle_test.go +++ b/message/router/middleware/throttle_test.go @@ -21,7 +21,8 @@ const ( func TestThrottle_Middleware(t *testing.T) { throttle := middleware.NewThrottle(perSecond, testTimeout) - ctx, _ := context.WithTimeout(context.Background(), testTimeout) + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() producedMessagesChannel := make(chan struct{}) diff --git a/message/router_test.go b/message/router_test.go index 563190285..5216de70c 100644 --- a/message/router_test.go +++ b/message/router_test.go @@ -735,7 +735,7 @@ func TestRouter_close_handler(t *testing.T) { t.Fatal("timeout waiting for handler stopped") } - expectedReceivedMessages = publishMessagesForHandler(t, 1, pub, sub, subscribeTopic1) + _ = publishMessagesForHandler(t, 1, pub, sub, subscribeTopic1) _, received := subscriber.BulkRead(receivedMessagesCh1, 1, time.Millisecond*1) assert.False(t, received) diff --git a/message/subscriber/read_test.go b/message/subscriber/read_test.go index 40bb8c6a5..675429cb8 100644 --- a/message/subscriber/read_test.go +++ b/message/subscriber/read_test.go @@ -71,14 +71,11 @@ func TestBulkRead_timeout(t *testing.T) { messagesCount := 100 sendLimit := 90 - var messages []*message.Message messagesCh := make(chan *message.Message, messagesCount) for i := 0; i < messagesCount; i++ { msg := message.NewMessage(watermill.NewUUID(), nil) - messages = append(messages, msg) - if i < sendLimit { messagesCh <- msg } @@ -114,13 +111,11 @@ func TestBulkRead_with_limit(t *testing.T) { messagesCount := 110 limit := 100 - var messages []*message.Message messagesCh := make(chan *message.Message, messagesCount) for i := 0; i < messagesCount; i++ { msg := message.NewMessage(watermill.NewUUID(), nil) - messages = append(messages, msg) messagesCh <- msg } @@ -151,13 +146,11 @@ func TestBulkRead_return_on_channel_close(t *testing.T) { messagesCount := 100 sendLimit := 90 - var messages []*message.Message messagesCh := make(chan *message.Message, messagesCount) messagesChClosed := false for i := 0; i < messagesCount; i++ { msg := message.NewMessage(watermill.NewUUID(), nil) - messages = append(messages, msg) if i < sendLimit { messagesCh <- msg diff --git a/pubsub/gochannel/pubsub.go b/pubsub/gochannel/pubsub.go index bc766bc0d..0d4fc61c7 100644 --- a/pubsub/gochannel/pubsub.go +++ b/pubsub/gochannel/pubsub.go @@ -276,9 +276,7 @@ func (g *GoChannel) topicSubscribers(topic string) []*subscriber { // let's do a copy to avoid race conditions and deadlocks due to lock subscribersCopy := make([]*subscriber, len(subscribers)) - for i, s := range subscribers { - subscribersCopy[i] = s - } + copy(subscribersCopy, subscribers) return subscribersCopy } diff --git a/pubsub/tests/test_pubsub.go b/pubsub/tests/test_pubsub.go index 18de3acc8..17f5b8877 100644 --- a/pubsub/tests/test_pubsub.go +++ b/pubsub/tests/test_pubsub.go @@ -344,18 +344,18 @@ func TestConcurrentSubscribeMultipleTopics( if subscribeInitializer, ok := sub.(message.SubscribeInitializer); ok { err := subscribeInitializer.SubscribeInitialize(topicName) if err != nil { - t.Fatal(err) + t.Error(err) } } err := publishWithRetry(pub, topicName, messagesToPublish...) if err != nil { - t.Fatal(err) + t.Error(err) } messages, err := sub.Subscribe(context.Background(), topicName) if err != nil { - t.Fatal(err) + t.Error(err) } topicMessages, _ := bulkRead(tCtx, messages, len(messagesToPublish), defaultTimeout) @@ -966,6 +966,8 @@ func TestMessageCtx( } } +type contextKey string + // TestSubscribeCtx tests if the Subscriber's Context works correctly. func TestSubscribeCtx( t *testing.T, @@ -978,7 +980,7 @@ func TestSubscribeCtx( const messagesCount = 20 ctxWithCancel, cancel := context.WithCancel(context.Background()) - ctxWithCancel = context.WithValue(ctxWithCancel, "foo", "bar") + ctxWithCancel = context.WithValue(ctxWithCancel, contextKey("foo"), "bar") topicName := testTopicName(tCtx.TestID) if subscribeInitializer, ok := sub.(message.SubscribeInitializer); ok { @@ -1007,7 +1009,7 @@ ClosedLoop: time.Sleep(time.Millisecond * 100) } - ctx := context.WithValue(context.Background(), "foo", "bar") + ctx := context.WithValue(context.Background(), contextKey("foo"), "bar") msgs, err := sub.Subscribe(ctx, topicName) require.NoError(t, err) @@ -1015,7 +1017,7 @@ ClosedLoop: AssertAllMessagesReceived(t, publishedMessages, receivedMessages) for _, msg := range receivedMessages { - assert.EqualValues(t, "bar", msg.Context().Value("foo")) + assert.EqualValues(t, "bar", msg.Context().Value(contextKey("foo"))) } }