Skip to content

Commit

Permalink
Use WARN for broker connect DeadlineExceeded errs (#622)
Browse files Browse the repository at this point in the history
This change updates the logger to use WARN instead of ERROR for broker
connect DeadlineExceeded errors. This is because DeadlineExceeded errors
since they don't indicate a problem, but we still want to log them as
warnings in case they correlate with other events.

---------

Signed-off-by: Marc Lopez Rubio <[email protected]>
  • Loading branch information
marclop authored Jan 22, 2025
1 parent b0c8f8c commit a040b38
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 24 deletions.
13 changes: 10 additions & 3 deletions kafka/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package kafka

import (
"context"
"errors"
"net"
"time"

Expand All @@ -35,12 +37,17 @@ type loggerHook struct {
// OnBrokerConnect implements the kgo.HookBrokerConnect interface.
func (l *loggerHook) OnBrokerConnect(meta kgo.BrokerMetadata, dialDur time.Duration, _ net.Conn, err error) {
if err != nil {
l.logger.Error("failed to connect to broker",
fields := []zap.Field{
zap.Error(err),
zap.String("duration", dialDur.String()),
zap.Duration("event.duration", dialDur),
zap.String("host", meta.Host),
zap.Int32("port", meta.Port),
zap.Stack("stack"),
)
}
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
l.logger.Warn("failed to connect to broker", fields...)
return
}
l.logger.Error("failed to connect to broker", fields...)
}
}
78 changes: 57 additions & 21 deletions kafka/logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,38 +22,74 @@ import (
"errors"
"net"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/attribute"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"
)

func TestHookLogsFailedDial(t *testing.T) {
cluster, cfg := newFakeCluster(t)
t.Cleanup(cluster.Close)

core, logs := observer.New(zap.ErrorLevel)
cfg.Logger = zap.New(core)
// Simulate returning an error when dialing the broker.
const errorMsg = "busted"
cfg.Dialer = func(context.Context, string, string) (net.Conn, error) {
return nil, errors.New(errorMsg)
assertLogs := func(t *testing.T,
logs *observer.ObservedLogs,
expectedLevel zapcore.Level,
expectedErr string,
) {
observedLogs := logs.FilterMessage("failed to connect to broker").TakeAll()
// Franz-go will retry once to connect to the broker, so we might see either one or two log lines.
assert.GreaterOrEqual(t, len(observedLogs), 1,
"expected one or two log lines, got %#v", observedLogs,
)
// The error message should contain the error message from the dialer.
assert.EqualValues(t, observedLogs[0].ContextMap()["error"], expectedErr)
assert.Contains(t, observedLogs[0].ContextMap(), "event.duration")
assert.Equal(t, observedLogs[0].Level, expectedLevel)
}
t.Run("context.Canceled", func(t *testing.T) {
cluster, cfg := newFakeCluster(t)
t.Cleanup(cluster.Close)

// Calling newClient triggers the metadata refresh, forcing a connection to the fake cluster
// using the broken dialer.
c, err := cfg.newClient(func(string) attribute.KeyValue { return attribute.String("k", "v") })
require.NoError(t, err)
assert.Error(t, c.Ping(context.Background()))
core, logs := observer.New(zap.WarnLevel)
cfg.Logger = zap.New(core)
ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond)
defer cancel()
cfg.Dialer = func(context.Context, string, string) (net.Conn, error) {
<-ctx.Done()
return nil, ctx.Err()
}
// Calling newClient triggers the metadata refresh, forcing a connection to the fake cluster
// using the broken dialer.
c, err := cfg.newClient(func(string) attribute.KeyValue { return attribute.String("k", "v") })
require.NoError(t, err)

observedLogs := logs.FilterMessage("failed to connect to broker").TakeAll()
// Franz-go will retry once to connect to the broker, so we might see either one or two log lines.
assert.True(t, len(observedLogs) == 1 || len(observedLogs) == 2,
"expected one or two log lines, got %#v", observedLogs)
<-time.After(time.Millisecond)

// The error message should contain the error message from the dialer.
assert.EqualValues(t, observedLogs[0].ContextMap()["error"], errorMsg)
assert.Contains(t, observedLogs[0].ContextMap(), "duration")
// The dialer will return context.Canceled, which should be logged as a warning.
assert.Error(t, c.Ping(ctx))

assertLogs(t, logs, zap.WarnLevel, context.DeadlineExceeded.Error())
})
t.Run("busted dialer", func(t *testing.T) {
cluster, cfg := newFakeCluster(t)
t.Cleanup(cluster.Close)

core, logs := observer.New(zap.ErrorLevel)
cfg.Logger = zap.New(core)
// Simulate returning an error when dialing the broker.
const errorMsg = "busted"
cfg.Dialer = func(context.Context, string, string) (net.Conn, error) {
return nil, errors.New(errorMsg)
}

// Calling newClient triggers the metadata refresh, forcing a connection to the fake cluster
// using the broken dialer.
c, err := cfg.newClient(func(string) attribute.KeyValue { return attribute.String("k", "v") })
require.NoError(t, err)
assert.Error(t, c.Ping(context.Background()))

assertLogs(t, logs, zap.ErrorLevel, errorMsg)
})
}

0 comments on commit a040b38

Please sign in to comment.