Skip to content

Commit

Permalink
Add golangci-lint (#310)
Browse files Browse the repository at this point in the history
  • Loading branch information
m110 authored Jan 21, 2023
1 parent 6adfe4e commit d9b4650
Show file tree
Hide file tree
Showing 14 changed files with 48 additions and 43 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ on:
- master
jobs:
ci:
uses: ThreeDotsLabs/watermill/.github/workflows/tests.yml@master
uses: ThreeDotsLabs/watermill/.github/workflows/tests.yml@lint
9 changes: 9 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ jobs:
with:
go-version: '^1.19.1'
- run: make build
lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/setup-go@v2
with:
go-version: '^1.19.1'
- name: golangci-lint
uses: golangci/golangci-lint-action@v3
tests:
needs: [build]
runs-on: ubuntu-latest
Expand Down
4 changes: 3 additions & 1 deletion components/cqrs/command_bus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ func TestNewCommandBus(t *testing.T) {
assert.Error(t, err)
}

type contextKey string

func TestCommandBus_Send_ContextPropagation(t *testing.T) {
publisher := newPublisherStub()

Expand All @@ -46,7 +48,7 @@ func TestCommandBus_Send_ContextPropagation(t *testing.T) {
)
require.NoError(t, err)

ctx := context.WithValue(context.Background(), "key", "value")
ctx := context.WithValue(context.Background(), contextKey("key"), "value")

err = commandBus.Send(ctx, "message")
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion components/cqrs/event_bus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestEventBus_Send_ContextPropagation(t *testing.T) {
)
require.NoError(t, err)

ctx := context.WithValue(context.Background(), "key", "value")
ctx := context.WithValue(context.Background(), contextKey("key"), "value")

err = eventBus.Publish(ctx, "message")
require.NoError(t, err)
Expand Down
15 changes: 6 additions & 9 deletions components/cqrs/marshaler_protobuf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,18 @@ import (
"testing"
"time"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/components/cqrs"

"github.com/golang/protobuf/ptypes"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/components/cqrs"
)

func TestProtobufMarshaler(t *testing.T) {
marshaler := cqrs.ProtobufMarshaler{}

when, err := ptypes.TimestampProto(time.Now())
require.NoError(t, err)
when := timestamppb.New(time.Now())
eventToMarshal := &TestProtobufEvent{
Id: watermill.NewULID(),
When: when,
Expand All @@ -41,8 +39,7 @@ func TestProtobufMarshaler_Marshal_generated_name(t *testing.T) {
},
}

when, err := ptypes.TimestampProto(time.Now())
require.NoError(t, err)
when := timestamppb.New(time.Now())
eventToMarshal := &TestProtobufEvent{
Id: watermill.NewULID(),
When: when,
Expand Down
6 changes: 4 additions & 2 deletions components/forwarder/envelope_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ import (
"github.com/stretchr/testify/require"
)

type contextKey string

func TestEnvelope(t *testing.T) {
expectedUUID := watermill.NewUUID()
expectedPayload := message.Payload("msg content")
expectedMetadata := message.Metadata{"key": "value"}
expectedDestinationTopic := "dest_topic"

ctx := context.WithValue(context.Background(), "key", "value")
ctx := context.WithValue(context.Background(), contextKey("key"), "value")

msg := message.NewMessage(expectedUUID, expectedPayload)
msg.Metadata = expectedMetadata
Expand All @@ -25,7 +27,7 @@ func TestEnvelope(t *testing.T) {
wrappedMsg, err := wrapMessageInEnvelope(expectedDestinationTopic, msg)
require.NoError(t, err)
require.NotNil(t, wrappedMsg)
v, ok := wrappedMsg.Context().Value("key").(string)
v, ok := wrappedMsg.Context().Value(contextKey("key")).(string)
require.True(t, ok)
require.Equal(t, "value", v)

Expand Down
9 changes: 5 additions & 4 deletions components/metrics/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,17 @@ import (
"testing"
"time"

"github.com/ThreeDotsLabs/watermill/components/metrics"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/stretchr/testify/assert"

"github.com/ThreeDotsLabs/watermill/components/metrics"
)

func TestCreateRegistryAndServeHTTP_metrics_endpoint(t *testing.T) {
reg, cancel := metrics.CreateRegistryAndServeHTTP(":8090")
defer cancel()
err := reg.Register(prometheus.NewBuildInfoCollector())
err := reg.Register(collectors.NewBuildInfoCollector())
if err != nil {
t.Fatal(errors.Wrap(err, "registration of prometheus build info collector failed"))
}
Expand All @@ -34,7 +35,7 @@ func TestCreateRegistryAndServeHTTP_metrics_endpoint(t *testing.T) {
func TestCreateRegistryAndServeHTTP_unknown_endpoint(t *testing.T) {
reg, cancel := metrics.CreateRegistryAndServeHTTP(":8091")
defer cancel()
err := reg.Register(prometheus.NewBuildInfoCollector())
err := reg.Register(collectors.NewBuildInfoCollector())
if err != nil {
t.Error(errors.Wrap(err, "registration of prometheus build info collector failed"))
}
Expand Down
2 changes: 1 addition & 1 deletion log.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (l *StdLoggerAdapter) log(logger *log.Logger, level string, msg string, fie
fieldsStr += key + "=" + valueStr + " "
}

logger.Output(3, fmt.Sprintf("\t"+`level=%s msg="%s" %s`, level, msg, fieldsStr))
_ = logger.Output(3, fmt.Sprintf("\t"+`level=%s msg="%s" %s`, level, msg, fieldsStr))
}

type LogLevel uint
Expand Down
12 changes: 6 additions & 6 deletions message/router/middleware/throttle.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,22 @@ import (
// Throttle provides a middleware that limits the amount of messages processed per unit of time.
// This may be done e.g. to prevent excessive load caused by running a handler on a long queue of unprocessed messages.
type Throttle struct {
throttle <-chan time.Time
ticker *time.Ticker
}

// NewThrottle creates a new Throttle middleware.
// Example duration and count: NewThrottle(10, time.Second) for 10 messages per second
func NewThrottle(count int64, duration time.Duration) *Throttle {
return &Throttle{time.Tick(duration / time.Duration(count))}
return &Throttle{
ticker: time.NewTicker(duration / time.Duration(count)),
}
}

// Middleware returns the Throttle middleware.
func (t Throttle) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
select {
case <-t.throttle:
// throttle is shared by multiple handlers, which will wait for their "tick"
}
// throttle is shared by multiple handlers, which will wait for their "tick"
<-t.ticker.C

return h(message)
}
Expand Down
3 changes: 2 additions & 1 deletion message/router/middleware/throttle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ const (
func TestThrottle_Middleware(t *testing.T) {
throttle := middleware.NewThrottle(perSecond, testTimeout)

ctx, _ := context.WithTimeout(context.Background(), testTimeout)
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()

producedMessagesChannel := make(chan struct{})

Expand Down
2 changes: 1 addition & 1 deletion message/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,7 @@ func TestRouter_close_handler(t *testing.T) {
t.Fatal("timeout waiting for handler stopped")
}

expectedReceivedMessages = publishMessagesForHandler(t, 1, pub, sub, subscribeTopic1)
_ = publishMessagesForHandler(t, 1, pub, sub, subscribeTopic1)
_, received := subscriber.BulkRead(receivedMessagesCh1, 1, time.Millisecond*1)
assert.False(t, received)

Expand Down
7 changes: 0 additions & 7 deletions message/subscriber/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,11 @@ func TestBulkRead_timeout(t *testing.T) {
messagesCount := 100
sendLimit := 90

var messages []*message.Message
messagesCh := make(chan *message.Message, messagesCount)

for i := 0; i < messagesCount; i++ {
msg := message.NewMessage(watermill.NewUUID(), nil)

messages = append(messages, msg)

if i < sendLimit {
messagesCh <- msg
}
Expand Down Expand Up @@ -114,13 +111,11 @@ func TestBulkRead_with_limit(t *testing.T) {
messagesCount := 110
limit := 100

var messages []*message.Message
messagesCh := make(chan *message.Message, messagesCount)

for i := 0; i < messagesCount; i++ {
msg := message.NewMessage(watermill.NewUUID(), nil)

messages = append(messages, msg)
messagesCh <- msg
}

Expand Down Expand Up @@ -151,13 +146,11 @@ func TestBulkRead_return_on_channel_close(t *testing.T) {
messagesCount := 100
sendLimit := 90

var messages []*message.Message
messagesCh := make(chan *message.Message, messagesCount)
messagesChClosed := false

for i := 0; i < messagesCount; i++ {
msg := message.NewMessage(watermill.NewUUID(), nil)
messages = append(messages, msg)

if i < sendLimit {
messagesCh <- msg
Expand Down
4 changes: 1 addition & 3 deletions pubsub/gochannel/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,7 @@ func (g *GoChannel) topicSubscribers(topic string) []*subscriber {

// let's do a copy to avoid race conditions and deadlocks due to lock
subscribersCopy := make([]*subscriber, len(subscribers))
for i, s := range subscribers {
subscribersCopy[i] = s
}
copy(subscribersCopy, subscribers)

return subscribersCopy
}
Expand Down
14 changes: 8 additions & 6 deletions pubsub/tests/test_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,18 +344,18 @@ func TestConcurrentSubscribeMultipleTopics(
if subscribeInitializer, ok := sub.(message.SubscribeInitializer); ok {
err := subscribeInitializer.SubscribeInitialize(topicName)
if err != nil {
t.Fatal(err)
t.Error(err)
}
}

err := publishWithRetry(pub, topicName, messagesToPublish...)
if err != nil {
t.Fatal(err)
t.Error(err)
}

messages, err := sub.Subscribe(context.Background(), topicName)
if err != nil {
t.Fatal(err)
t.Error(err)
}
topicMessages, _ := bulkRead(tCtx, messages, len(messagesToPublish), defaultTimeout)

Expand Down Expand Up @@ -966,6 +966,8 @@ func TestMessageCtx(
}
}

type contextKey string

// TestSubscribeCtx tests if the Subscriber's Context works correctly.
func TestSubscribeCtx(
t *testing.T,
Expand All @@ -978,7 +980,7 @@ func TestSubscribeCtx(
const messagesCount = 20

ctxWithCancel, cancel := context.WithCancel(context.Background())
ctxWithCancel = context.WithValue(ctxWithCancel, "foo", "bar")
ctxWithCancel = context.WithValue(ctxWithCancel, contextKey("foo"), "bar")

topicName := testTopicName(tCtx.TestID)
if subscribeInitializer, ok := sub.(message.SubscribeInitializer); ok {
Expand Down Expand Up @@ -1007,15 +1009,15 @@ ClosedLoop:
time.Sleep(time.Millisecond * 100)
}

ctx := context.WithValue(context.Background(), "foo", "bar")
ctx := context.WithValue(context.Background(), contextKey("foo"), "bar")
msgs, err := sub.Subscribe(ctx, topicName)
require.NoError(t, err)

receivedMessages, _ := bulkRead(tCtx, msgs, messagesCount, defaultTimeout)
AssertAllMessagesReceived(t, publishedMessages, receivedMessages)

for _, msg := range receivedMessages {
assert.EqualValues(t, "bar", msg.Context().Value("foo"))
assert.EqualValues(t, "bar", msg.Context().Value(contextKey("foo")))
}
}

Expand Down

0 comments on commit d9b4650

Please sign in to comment.