Skip to content

Commit 1997a9d

Browse files
committed
fix tests
1 parent 4983063 commit 1997a9d

File tree

2 files changed

+15
-28
lines changed

2 files changed

+15
-28
lines changed

internal/consuming/kafka_test.go

+11-24
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,14 @@ import (
77
"encoding/json"
88
"errors"
99
"fmt"
10-
"github.com/centrifugal/centrifugo/v5/internal/configtypes"
1110
"strconv"
1211
"strings"
1312
"sync/atomic"
1413
"testing"
1514
"time"
1615

17-
"github.com/centrifugal/centrifuge"
16+
"github.com/centrifugal/centrifugo/v5/internal/configtypes"
17+
1818
"github.com/centrifugal/centrifugo/v5/internal/apiproto"
1919
"github.com/google/uuid"
2020
"github.com/prometheus/client_golang/prometheus"
@@ -41,19 +41,6 @@ func (m *MockDispatcher) Broadcast(ctx context.Context, req *apiproto.BroadcastR
4141
return m.onBroadcast(ctx, req)
4242
}
4343

44-
// MockLogger implements the Logger interface for testing.
45-
type MockLogger struct {
46-
// Add necessary fields to simulate behavior or record calls
47-
}
48-
49-
func (m *MockLogger) LogEnabled(_ centrifuge.LogLevel) bool {
50-
return true // or false based on your test needs
51-
}
52-
53-
func (m *MockLogger) Log(_ centrifuge.LogEntry) {
54-
// Implement mock logic, e.g., storing log entries for assertions
55-
}
56-
5744
func produceTestMessage(topic string, message []byte, headers []kgo.RecordHeader) error {
5845
// Create a new client
5946
client, err := kgo.NewClient(kgo.SeedBrokers(testKafkaBrokerURL))
@@ -167,7 +154,7 @@ func TestKafkaConsumer_GreenScenario(t *testing.T) {
167154
eventReceived := make(chan struct{})
168155
consumerClosed := make(chan struct{})
169156

170-
consumer, err := NewKafkaConsumer("test", uuid.NewString(), &MockLogger{}, &MockDispatcher{
157+
consumer, err := NewKafkaConsumer("test", uuid.NewString(), &MockDispatcher{
171158
onDispatch: func(ctx context.Context, method string, data []byte) error {
172159
require.Equal(t, testMethod, method)
173160
require.Equal(t, testPayload, data)
@@ -218,7 +205,7 @@ func TestKafkaConsumer_SeveralConsumers(t *testing.T) {
218205
consumerClosed := make(chan struct{})
219206

220207
for i := 0; i < 3; i++ {
221-
consumer, err := NewKafkaConsumer("test", uuid.NewString(), &MockLogger{}, &MockDispatcher{
208+
consumer, err := NewKafkaConsumer("test", uuid.NewString(), &MockDispatcher{
222209
onDispatch: func(ctx context.Context, method string, data []byte) error {
223210
require.Equal(t, testMethod, method)
224211
require.Equal(t, testPayload, data)
@@ -284,7 +271,7 @@ func TestKafkaConsumer_RetryAfterDispatchError(t *testing.T) {
284271
return nil
285272
},
286273
}
287-
consumer, err := NewKafkaConsumer("test", uuid.NewString(), &MockLogger{}, mockDispatcher, config, newCommonMetrics(prometheus.NewRegistry()))
274+
consumer, err := NewKafkaConsumer("test", uuid.NewString(), mockDispatcher, config, newCommonMetrics(prometheus.NewRegistry()))
288275
require.NoError(t, err)
289276

290277
go func() {
@@ -354,7 +341,7 @@ func TestKafkaConsumer_BlockedPartitionDoesNotBlockAnotherTopic(t *testing.T) {
354341
return nil
355342
},
356343
}
357-
consumer, err := NewKafkaConsumer("test", uuid.NewString(), &MockLogger{}, mockDispatcher, config, newCommonMetrics(prometheus.NewRegistry()))
344+
consumer, err := NewKafkaConsumer("test", uuid.NewString(), mockDispatcher, config, newCommonMetrics(prometheus.NewRegistry()))
358345
require.NoError(t, err)
359346

360347
go func() {
@@ -427,7 +414,7 @@ func TestKafkaConsumer_BlockedPartitionDoesNotBlockAnotherPartition(t *testing.T
427414
return nil
428415
},
429416
}
430-
consumer, err := NewKafkaConsumer("test", uuid.NewString(), &MockLogger{}, mockDispatcher, config, newCommonMetrics(prometheus.NewRegistry()))
417+
consumer, err := NewKafkaConsumer("test", uuid.NewString(), mockDispatcher, config, newCommonMetrics(prometheus.NewRegistry()))
431418
require.NoError(t, err)
432419

433420
go func() {
@@ -507,7 +494,7 @@ func TestKafkaConsumer_PausePartitions(t *testing.T) {
507494
return nil
508495
},
509496
}
510-
consumer, err := NewKafkaConsumer("test", uuid.NewString(), &MockLogger{}, mockDispatcher, config, newCommonMetrics(prometheus.NewRegistry()))
497+
consumer, err := NewKafkaConsumer("test", uuid.NewString(), mockDispatcher, config, newCommonMetrics(prometheus.NewRegistry()))
511498
require.NoError(t, err)
512499

513500
consumer.testOnlyConfig = testConfig
@@ -591,7 +578,7 @@ func TestKafkaConsumer_WorksCorrectlyInLoadedTopic(t *testing.T) {
591578
ConsumerGroup: uuid.New().String(),
592579
PartitionBufferSize: tc.partitionBuffer,
593580
}
594-
consumer, err := NewKafkaConsumer("test", uuid.NewString(), &MockLogger{}, mockDispatcher, config, newCommonMetrics(prometheus.NewRegistry()))
581+
consumer, err := NewKafkaConsumer("test", uuid.NewString(), mockDispatcher, config, newCommonMetrics(prometheus.NewRegistry()))
595582
require.NoError(t, err)
596583

597584
var records []*kgo.Record
@@ -690,7 +677,7 @@ func TestKafkaConsumer_TestPauseAfterResumeRace(t *testing.T) {
690677
return nil
691678
},
692679
}
693-
consumer, err := NewKafkaConsumer("test", uuid.NewString(), &MockLogger{}, mockDispatcher, config, newCommonMetrics(prometheus.NewRegistry()))
680+
consumer, err := NewKafkaConsumer("test", uuid.NewString(), mockDispatcher, config, newCommonMetrics(prometheus.NewRegistry()))
694681
require.NoError(t, err)
695682

696683
consumer.testOnlyConfig = testOnlyConfig{
@@ -770,7 +757,7 @@ func TestKafkaConsumer_GreenScenario_PublicationDataMode(t *testing.T) {
770757
eventReceived := make(chan struct{})
771758
consumerClosed := make(chan struct{})
772759

773-
consumer, err := NewKafkaConsumer("test", uuid.NewString(), &MockLogger{}, &MockDispatcher{
760+
consumer, err := NewKafkaConsumer("test", uuid.NewString(), &MockDispatcher{
774761
onBroadcast: func(ctx context.Context, req *apiproto.BroadcastRequest) error {
775762
require.Equal(t, testChannels, req.Channels)
776763
require.Equal(t, apiproto.Raw(testPayload), req.Data)

internal/consuming/postgresql_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ func TestPostgresConsumer_GreenScenario(t *testing.T) {
134134
PartitionPollInterval: configtypes.Duration(300 * time.Millisecond),
135135
PartitionNotificationChannel: testNotificationChannel,
136136
}
137-
consumer, err := NewPostgresConsumer("test", &MockLogger{}, &MockDispatcher{
137+
consumer, err := NewPostgresConsumer("test", &MockDispatcher{
138138
onDispatch: func(ctx context.Context, method string, data []byte) error {
139139
require.Equal(t, testMethod, method)
140140
require.Equal(t, testPayload, data)
@@ -191,7 +191,7 @@ func TestPostgresConsumer_SeveralConsumers(t *testing.T) {
191191
numConsumers := 10
192192

193193
for i := 0; i < numConsumers; i++ {
194-
consumer, err := NewPostgresConsumer("test", &MockLogger{}, &MockDispatcher{
194+
consumer, err := NewPostgresConsumer("test", &MockDispatcher{
195195
onDispatch: func(ctx context.Context, method string, data []byte) error {
196196
require.Equal(t, testMethod, method)
197197
require.Equal(t, testPayload, data)
@@ -249,7 +249,7 @@ func TestPostgresConsumer_NotificationTrigger(t *testing.T) {
249249

250250
numEvents := 0
251251

252-
consumer, err := NewPostgresConsumer("test", &MockLogger{}, &MockDispatcher{
252+
consumer, err := NewPostgresConsumer("test", &MockDispatcher{
253253
onDispatch: func(ctx context.Context, method string, data []byte) error {
254254
require.Equal(t, testMethod, method)
255255
require.Equal(t, testPayload, data)
@@ -317,7 +317,7 @@ func TestPostgresConsumer_DifferentPartitions(t *testing.T) {
317317

318318
var dispatchMu sync.Mutex
319319

320-
consumer, err := NewPostgresConsumer("test", &MockLogger{}, &MockDispatcher{
320+
consumer, err := NewPostgresConsumer("test", &MockDispatcher{
321321
onDispatch: func(ctx context.Context, method string, data []byte) error {
322322
dispatchMu.Lock()
323323
defer dispatchMu.Unlock()

0 commit comments

Comments
 (0)