Skip to content

Commit

Permalink
Kafka (and other) tests fix (#37)
Browse files Browse the repository at this point in the history
Improved tests overall
  • Loading branch information
roblaszczak authored and maclav3 committed Feb 6, 2019
1 parent f39af0d commit 402ef67
Show file tree
Hide file tree
Showing 40 changed files with 1,590 additions and 613 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@ vendor
docs/themes/
docs/public/
docs/content/src-link
*.out
*.log
13 changes: 11 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,16 @@ mycli:
@mycli -h 127.0.0.1 -u root -p secret

test:
go test -v `glide novendor`
go test ./...

test_v:
go test -v ./...

test_short:
go test ./... -short

test_stress:
go test -tags=stress `glide novendor`
go test -tags=stress ./...

test_reconnect:
go test -tags=reconnect ./...
56 changes: 56 additions & 0 deletions dev/coverage.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#!/bin/sh
########
# Source: https://gist.github.com/lwolf/3764a3b6cd08387e80aa6ca3b9534b8a
# originaly from https://github.com/mlafeldt/chef-runner/blob/v0.7.0/script/coverage
#######
# Generate test coverage statistics for Go packages.
#
# Works around the fact that `go test -coverprofile` currently does not work
# with multiple packages, see https://code.google.com/p/go/issues/detail?id=6909
#
# Usage: script/coverage [--html|--coveralls]
#
# --html Additionally create HTML report and open it in browser
# --coveralls Push coverage statistics to coveralls.io
#

set -e

workdir=.cover
profile="$workdir/cover.out"
mode=count

generate_cover_data() {
rm -rf "$workdir"
mkdir "$workdir"

for pkg in "$@"; do
f="$workdir/$(echo $pkg | tr / -).cover"
go test -covermode="$mode" -coverprofile="$f" "$pkg"
done

echo "mode: $mode" >"$profile"
grep -h -v "^mode:" "$workdir"/*.cover >>"$profile"
}

show_cover_report() {
go tool cover -${1}="$profile"
}

push_to_coveralls() {
echo "Pushing coverage statistics to coveralls.io"
goveralls -coverprofile="$profile"
}

generate_cover_data $(go list ./... | grep -v /vendor/)
show_cover_report func
case "$1" in
"")
;;
--html)
show_cover_report html ;;
--coveralls)
push_to_coveralls ;;
*)
echo >&2 "error: invalid option: $1"; exit 1 ;;
esac
1 change: 1 addition & 0 deletions docs/content/docs/getting-started/amqp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"log"

"github.com/ThreeDotsLabs/watermill/message/infrastructure/amqp"
uuid "github.com/satori/go.uuid"

"github.com/ThreeDotsLabs/watermill"

Expand Down
2 changes: 0 additions & 2 deletions docs/content/docs/getting-started/go-channel/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package main

import (
"log"
"time"

"github.com/satori/go.uuid"

Expand All @@ -17,7 +16,6 @@ func main() {
pubSub := gochannel.NewGoChannel(
0, // buffer (channel) size
watermill.NewStdLogger(false, false),
time.Second, // send timeout
)

messages, err := pubSub.Subscribe("example.topic")
Expand Down
2 changes: 1 addition & 1 deletion docs/content/docs/getting-started/router/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func main() {

// for simplicity we are using gochannel Pub/Sub here,
// you can replace it with any Pub/Sub implementation, it will work the same
pubSub := gochannel.NewGoChannel(0, logger, time.Second)
pubSub := gochannel.NewGoChannel(0, logger)

// producing some messages in background
go publishMessages(pubSub)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/ThreeDotsLabs/watermill
require (
cloud.google.com/go v0.33.1
github.com/DataDog/zstd v1.3.4 // indirect
github.com/Shopify/sarama v1.20.0
github.com/Shopify/sarama v1.20.1
github.com/Shopify/toxiproxy v2.1.3+incompatible // indirect
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da // indirect
github.com/boltdb/bolt v1.3.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ github.com/DataDog/zstd v1.3.4 h1:LAGHkXuvC6yky+C2CUG2tD7w8QlrUwpue8XwIh0X4AY=
github.com/DataDog/zstd v1.3.4/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
github.com/Shopify/sarama v1.20.0 h1:wAMHhl1lGRlobeoV/xOKpbqD2OQsOvY4A/vIOGroIe8=
github.com/Shopify/sarama v1.20.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
github.com/Shopify/sarama v1.20.1 h1:Bb0h3I++r4eX333Y0uZV2vwUXepJbt6ig05TUU1qt9I=
github.com/Shopify/sarama v1.20.1/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
github.com/Shopify/toxiproxy v2.1.3+incompatible h1:awiJqUYH4q4OmoBiRccJykjd7B+w0loJi2keSna4X/M=
github.com/Shopify/toxiproxy v2.1.3+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da h1:8GUt8eRujhVEGZFFEjBj46YV4rDjvGrNxb0KMWYkL2I=
Expand Down
2 changes: 2 additions & 0 deletions internal/sync/waitgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"time"
)

// WaitGroupTimeout adds timeout feature for sync.WaitGroup.Wait().
// It returns true, when timeouted.
func WaitGroupTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
wgClosed := make(chan struct{}, 1)
go func() {
Expand Down
4 changes: 2 additions & 2 deletions internal/sync/waitgroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ import (
func TestWaitGroupTimeout_no_timeout(t *testing.T) {
wg := &sync.WaitGroup{}

timeouted := WaitGroupTimeout(wg, time.Millisecond*10)
timeouted := WaitGroupTimeout(wg, time.Millisecond*100)
assert.False(t, timeouted)
}

func TestWaitGroupTimeout_timeout(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(1)

timeouted := WaitGroupTimeout(wg, time.Millisecond*10)
timeouted := WaitGroupTimeout(wg, time.Millisecond*100)
assert.True(t, timeouted)
}
4 changes: 3 additions & 1 deletion internal/tests/asserts.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ func AssertAllMessagesReceived(t *testing.T, sent message.Messages, received mes

return assert.Equal(
t, sentIDs, receivedIDs,
"received different messages ID's, missing: %s", MissingMessages(sent, received),
"received different messages ID's, missing: %s, extra %s",
MissingMessages(sent, received),
MissingMessages(received, sent),
)
}

Expand Down
77 changes: 58 additions & 19 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package watermill

import (
"fmt"
"io"
"log"
"os"
"reflect"
Expand All @@ -24,11 +25,21 @@ func (l LogFields) Add(newFields LogFields) LogFields {
return resultFields
}

func (l LogFields) Copy() LogFields {
cpy := make(LogFields, len(l))
for k, v := range l {
cpy[k] = v
}

return cpy
}

type LoggerAdapter interface {
Error(msg string, err error, fields LogFields)
Info(msg string, fields LogFields)
Debug(msg string, fields LogFields)
Trace(msg string, fields LogFields)
With(fields LogFields) LoggerAdapter
}

type NopLogger struct{}
Expand All @@ -37,16 +48,23 @@ func (NopLogger) Error(msg string, err error, fields LogFields) {}
func (NopLogger) Info(msg string, fields LogFields) {}
func (NopLogger) Debug(msg string, fields LogFields) {}
func (NopLogger) Trace(msg string, fields LogFields) {}
func (l NopLogger) With(fields LogFields) LoggerAdapter { return l }

type StdLoggerAdapter struct {
ErrorLogger *log.Logger
InfoLogger *log.Logger
DebugLogger *log.Logger
TraceLogger *log.Logger

fields LogFields
}

func NewStdLogger(debug, trace bool) LoggerAdapter {
l := log.New(os.Stderr, "[watermill] ", log.LstdFlags|log.Lmicroseconds|log.Lshortfile)
return NewStdLoggerWithOut(os.Stderr, debug, trace)
}

func NewStdLoggerWithOut(out io.Writer, debug bool, trace bool) LoggerAdapter {
l := log.New(out, "[watermill] ", log.LstdFlags|log.Lmicroseconds|log.Lshortfile)
a := &StdLoggerAdapter{InfoLogger: l, ErrorLogger: l}

if debug {
Expand Down Expand Up @@ -75,16 +93,28 @@ func (l *StdLoggerAdapter) Trace(msg string, fields LogFields) {
l.log(l.TraceLogger, "TRACE", msg, fields)
}

func (l *StdLoggerAdapter) With(fields LogFields) LoggerAdapter {
return &StdLoggerAdapter{
ErrorLogger: l.ErrorLogger,
InfoLogger: l.InfoLogger,
DebugLogger: l.DebugLogger,
TraceLogger: l.TraceLogger,
fields: l.fields.Add(fields),
}
}

func (l *StdLoggerAdapter) log(logger *log.Logger, level string, msg string, fields LogFields) {
if logger == nil {
return
}

fieldsStr := ""

keys := make([]string, len(fields))
allFields := l.fields.Add(fields)

keys := make([]string, len(allFields))
i := 0
for field := range fields {
for field := range allFields {
keys[i] = field
i++
}
Expand All @@ -93,7 +123,7 @@ func (l *StdLoggerAdapter) log(logger *log.Logger, level string, msg string, fie

for _, key := range keys {
var valueStr string
value := fields[key]
value := allFields[key]

if stringer, ok := value.(fmt.Stringer); ok {
valueStr = stringer.String()
Expand All @@ -114,10 +144,10 @@ func (l *StdLoggerAdapter) log(logger *log.Logger, level string, msg string, fie
type LogLevel uint

const (
Trace LogLevel = iota + 1
Debug
Info
Error
TraceLogLevel LogLevel = iota + 1
DebugLogLevel
InfoLogLevel
ErrorLogLevel
)

type CapturedMessage struct {
Expand All @@ -129,18 +159,27 @@ type CapturedMessage struct {

type CaptureLoggerAdapter struct {
captured map[LogLevel][]CapturedMessage
fields LogFields
}

func NewCaptureLogger() CaptureLoggerAdapter {
return CaptureLoggerAdapter{
func NewCaptureLogger() *CaptureLoggerAdapter {
return &CaptureLoggerAdapter{
captured: map[LogLevel][]CapturedMessage{},
}
}

func (c *CaptureLoggerAdapter) With(fields LogFields) LoggerAdapter {
return &CaptureLoggerAdapter{c.captured, c.fields.Add(fields)}
}

func (c *CaptureLoggerAdapter) capture(msg CapturedMessage) {
c.captured[msg.Level] = append(c.captured[msg.Level], msg)
}

func (c CaptureLoggerAdapter) Captured() map[LogLevel][]CapturedMessage {
return c.captured
}

func (c CaptureLoggerAdapter) Has(msg CapturedMessage) bool {
for _, capturedMsg := range c.captured[msg.Level] {
if reflect.DeepEqual(msg, capturedMsg) {
Expand All @@ -151,7 +190,7 @@ func (c CaptureLoggerAdapter) Has(msg CapturedMessage) bool {
}

func (c CaptureLoggerAdapter) HasError(err error) bool {
for _, capturedMsg := range c.captured[Error] {
for _, capturedMsg := range c.captured[ErrorLogLevel] {
if capturedMsg.Err == err {
return true
}
Expand All @@ -161,33 +200,33 @@ func (c CaptureLoggerAdapter) HasError(err error) bool {

func (c *CaptureLoggerAdapter) Error(msg string, err error, fields LogFields) {
c.capture(CapturedMessage{
Level: Error,
Fields: fields,
Level: ErrorLogLevel,
Fields: c.fields.Add(fields),
Msg: msg,
Err: err,
})
}

func (c *CaptureLoggerAdapter) Info(msg string, fields LogFields) {
c.capture(CapturedMessage{
Level: Info,
Fields: fields,
Level: InfoLogLevel,
Fields: c.fields.Add(fields),
Msg: msg,
})
}

func (c *CaptureLoggerAdapter) Debug(msg string, fields LogFields) {
c.capture(CapturedMessage{
Level: Debug,
Fields: fields,
Level: DebugLogLevel,
Fields: c.fields.Add(fields),
Msg: msg,
})
}

func (c *CaptureLoggerAdapter) Trace(msg string, fields LogFields) {
c.capture(CapturedMessage{
Level: Trace,
Fields: fields,
Level: TraceLogLevel,
Fields: c.fields.Add(fields),
Msg: msg,
})
}
Loading

0 comments on commit 402ef67

Please sign in to comment.