Skip to content

Commit 8a3827f

Browse files
committed
publisher: refactor to a generic sink interface
1 parent 8fe2652 commit 8a3827f

File tree

5 files changed

+92
-70
lines changed

5 files changed

+92
-70
lines changed

publisher/kafka.go

+15-17
Original file line numberDiff line numberDiff line change
@@ -13,43 +13,41 @@ import (
1313
pb "github.com/raystack/raccoon/proto"
1414
)
1515

16-
// KafkaProducer Produce data to kafka synchronously
17-
type KafkaProducer interface {
18-
// ProduceBulk message to kafka. Block until all messages are sent. Return array of error. Order is not guaranteed.
19-
ProduceBulk(events []*pb.Event, connGroup string, deliveryChannel chan kafka.Event) error
20-
}
21-
2216
func NewKafka() (*Kafka, error) {
2317
kp, err := newKafkaClient(config.PublisherKafka.ToKafkaConfigMap())
2418
if err != nil {
2519
return &Kafka{}, err
2620
}
2721
return &Kafka{
28-
kp: kp,
29-
flushInterval: config.PublisherKafka.FlushInterval,
30-
topicFormat: config.EventDistribution.PublisherPattern,
22+
kp: kp,
23+
flushInterval: config.PublisherKafka.FlushInterval,
24+
topicFormat: config.EventDistribution.PublisherPattern,
25+
deliveryChannelSize: config.Worker.DeliveryChannelSize,
3126
}, nil
3227
}
3328

34-
func NewKafkaFromClient(client Client, flushInterval int, topicFormat string) *Kafka {
29+
func NewKafkaFromClient(client Client, flushInterval int, topicFormat string, deliveryChannelSize int) *Kafka {
3530
return &Kafka{
36-
kp: client,
37-
flushInterval: flushInterval,
38-
topicFormat: topicFormat,
31+
kp: client,
32+
flushInterval: flushInterval,
33+
topicFormat: topicFormat,
34+
deliveryChannelSize: deliveryChannelSize,
3935
}
4036
}
4137

4238
type Kafka struct {
43-
kp Client
44-
flushInterval int
45-
topicFormat string
39+
kp Client
40+
flushInterval int
41+
topicFormat string
42+
deliveryChannelSize int
4643
}
4744

4845
// ProduceBulk messages to kafka. Block until all messages are sent. Return array of error. Order of Errors is guaranteed.
4946
// DeliveryChannel needs to be exclusive. DeliveryChannel is exposed for recyclability purpose.
50-
func (pr *Kafka) ProduceBulk(events []*pb.Event, connGroup string, deliveryChannel chan kafka.Event) error {
47+
func (pr *Kafka) ProduceBulk(events []*pb.Event, connGroup string) error {
5148
errors := make([]error, len(events))
5249
totalProcessed := 0
50+
deliveryChannel := make(chan kafka.Event, pr.deliveryChannelSize)
5351
for order, event := range events {
5452
topic := fmt.Sprintf(pr.topicFormat, event.Type)
5553
message := &kafka.Message{

publisher/kafka_test.go

+9-9
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func TestProducer_Close(suite *testing.T) {
3131
client := &mockClient{}
3232
client.On("Flush", 10).Return(0)
3333
client.On("Close").Return()
34-
kp := NewKafkaFromClient(client, 10, "%s")
34+
kp := NewKafkaFromClient(client, 10, "%s", 1)
3535
kp.Close()
3636
client.AssertExpectations(t)
3737
})
@@ -55,9 +55,9 @@ func TestKafka_ProduceBulk(suite *testing.T) {
5555
}
5656
}()
5757
})
58-
kp := NewKafkaFromClient(client, 10, "%s")
58+
kp := NewKafkaFromClient(client, 10, "%s", 1)
5959

60-
err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}}, group1, make(chan kafka.Event, 2))
60+
err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}}, group1)
6161
assert.NoError(t, err)
6262
})
6363
})
@@ -79,9 +79,9 @@ func TestKafka_ProduceBulk(suite *testing.T) {
7979
}()
8080
}).Once()
8181
client.On("Produce", mock.Anything, mock.Anything).Return(fmt.Errorf("buffer full")).Once()
82-
kp := NewKafkaFromClient(client, 10, "%s")
82+
kp := NewKafkaFromClient(client, 10, "%s", 1)
8383

84-
err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}}, group1, make(chan kafka.Event, 2))
84+
err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}}, group1)
8585
assert.Len(t, err.(BulkError).Errors, 3)
8686
assert.Error(t, err.(BulkError).Errors[0])
8787
assert.Empty(t, err.(BulkError).Errors[1])
@@ -91,9 +91,9 @@ func TestKafka_ProduceBulk(suite *testing.T) {
9191
t.Run("Should return topic name when unknown topic is returned", func(t *testing.T) {
9292
client := &mockClient{}
9393
client.On("Produce", mock.Anything, mock.Anything).Return(fmt.Errorf("Local: Unknown topic")).Once()
94-
kp := NewKafkaFromClient(client, 10, "%s")
94+
kp := NewKafkaFromClient(client, 10, "%s", 1)
9595

96-
err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}}, "group1", make(chan kafka.Event, 2))
96+
err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}}, "group1")
9797
assert.EqualError(t, err.(BulkError).Errors[0], "Local: Unknown topic "+topic)
9898
})
9999
})
@@ -115,9 +115,9 @@ func TestKafka_ProduceBulk(suite *testing.T) {
115115
}
116116
}()
117117
}).Once()
118-
kp := NewKafkaFromClient(client, 10, "%s")
118+
kp := NewKafkaFromClient(client, 10, "%s", 1)
119119

120-
err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}}, "group1", make(chan kafka.Event, 2))
120+
err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}}, "group1")
121121
assert.NotEmpty(t, err)
122122
assert.Len(t, err.(BulkError).Errors, 2)
123123
assert.Equal(t, "buffer full", err.(BulkError).Errors[0].Error())

worker/mocks.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package worker
22

33
import (
4-
kafka "github.com/confluentinc/confluent-kafka-go/kafka"
54
pb "github.com/raystack/raccoon/proto"
65
mock "github.com/stretchr/testify/mock"
76
)
@@ -12,8 +11,8 @@ type mockKafkaPublisher struct {
1211
}
1312

1413
// ProduceBulk provides a mock function with given fields: events, deliveryChannel
15-
func (m *mockKafkaPublisher) ProduceBulk(events []*pb.Event, connGroup string, deliveryChannel chan kafka.Event) error {
16-
mock := m.Called(events, connGroup, deliveryChannel)
14+
func (m *mockKafkaPublisher) ProduceBulk(events []*pb.Event, connGroup string) error {
15+
mock := m.Called(events, connGroup)
1716
return mock.Error(0)
1817
}
1918

worker/worker.go

+64-39
Original file line numberDiff line numberDiff line change
@@ -5,75 +5,100 @@ import (
55
"sync"
66
"time"
77

8-
"github.com/confluentinc/confluent-kafka-go/kafka"
98
"github.com/raystack/raccoon/collection"
109
"github.com/raystack/raccoon/logger"
1110
"github.com/raystack/raccoon/metrics"
11+
pb "github.com/raystack/raccoon/proto"
1212
"github.com/raystack/raccoon/publisher"
1313
)
1414

15+
// Producer produces data to sink
16+
type Producer interface {
17+
// ProduceBulk message to a sink. Blocks until all messages are sent. Returns slice of error.
18+
ProduceBulk(events []*pb.Event, connGroup string) error
19+
}
20+
1521
// Pool spawn goroutine as much as Size that will listen to EventsChannel. On Close, wait for all data in EventsChannel to be processed.
1622
type Pool struct {
1723
Size int
1824
deliveryChannelSize int
1925
EventsChannel <-chan collection.CollectRequest
20-
kafkaProducer publisher.KafkaProducer
26+
producer Producer
2127
wg sync.WaitGroup
2228
}
2329

2430
// CreateWorkerPool create new Pool struct given size and EventsChannel worker.
25-
func CreateWorkerPool(size int, eventsChannel <-chan collection.CollectRequest, deliveryChannelSize int, kafkaProducer publisher.KafkaProducer) *Pool {
31+
func CreateWorkerPool(size int, eventsChannel <-chan collection.CollectRequest, deliveryChannelSize int, producer Producer) *Pool {
2632
return &Pool{
2733
Size: size,
2834
deliveryChannelSize: deliveryChannelSize,
2935
EventsChannel: eventsChannel,
30-
kafkaProducer: kafkaProducer,
36+
producer: producer,
3137
wg: sync.WaitGroup{},
3238
}
3339
}
3440

35-
// StartWorkers initialize worker pool as much as Pool.Size
36-
func (w *Pool) StartWorkers() {
37-
w.wg.Add(w.Size)
38-
for i := 0; i < w.Size; i++ {
39-
go func(workerName string) {
40-
logger.Info("Running worker: " + workerName)
41-
deliveryChan := make(chan kafka.Event, w.deliveryChannelSize)
42-
for request := range w.EventsChannel {
43-
metrics.Histogram("batch_idle_in_channel_milliseconds", (time.Now().Sub(request.TimePushed)).Milliseconds(), map[string]string{"worker": workerName})
44-
batchReadTime := time.Now()
45-
//@TODO - Should add integration tests to prove that the worker receives the same message that it produced, on the delivery channel it created
41+
func (w *Pool) newWorker(name string) {
4642

47-
err := w.kafkaProducer.ProduceBulk(request.GetEvents(), request.ConnectionIdentifier.Group, deliveryChan)
43+
logger.Info("Running worker: " + name)
44+
for request := range w.EventsChannel {
4845

49-
produceTime := time.Since(batchReadTime)
50-
metrics.Histogram("kafka_producebulk_tt_ms", produceTime.Milliseconds(), map[string]string{})
46+
metrics.Histogram(
47+
"batch_idle_in_channel_milliseconds",
48+
time.Since(request.TimePushed).Milliseconds(),
49+
map[string]string{"worker": name})
5150

52-
if request.AckFunc != nil {
53-
request.AckFunc(err)
54-
}
51+
batchReadTime := time.Now()
52+
//@TODO - Should add integration tests to prove that the worker receives the same message that it produced, on the delivery channel it created
53+
54+
err := w.producer.ProduceBulk(request.GetEvents(), request.ConnectionIdentifier.Group)
55+
56+
// TODO(turtledev): instrument this for individual sinks
57+
// produceTime := time.Since(batchReadTime)
58+
// metrics.Histogram("kafka_producebulk_tt_ms", produceTime.Milliseconds(), map[string]string{})
5559

56-
totalErr := 0
60+
if request.AckFunc != nil {
61+
request.AckFunc(err)
62+
}
63+
64+
totalErr := 0
65+
if err != nil {
66+
// WARN(turtledev): this can panic if returned error is not of
67+
// type publisher.BulkError
68+
for _, err := range err.(publisher.BulkError).Errors {
5769
if err != nil {
58-
for _, err := range err.(publisher.BulkError).Errors {
59-
if err != nil {
60-
logger.Errorf("[worker] Fail to publish message to kafka %v", err)
61-
totalErr++
62-
}
63-
}
64-
}
65-
lenBatch := int64(len(request.GetEvents()))
66-
logger.Debug(fmt.Sprintf("Success sending messages, %v", lenBatch-int64(totalErr)))
67-
if lenBatch > 0 {
68-
eventTimingMs := time.Since(request.GetSentTime().AsTime()).Milliseconds() / lenBatch
69-
metrics.Histogram("event_processing_duration_milliseconds", eventTimingMs, map[string]string{"conn_group": request.ConnectionIdentifier.Group})
70-
now := time.Now()
71-
metrics.Histogram("worker_processing_duration_milliseconds", (now.Sub(batchReadTime).Milliseconds())/lenBatch, map[string]string{"worker": workerName})
72-
metrics.Histogram("server_processing_latency_milliseconds", (now.Sub(request.TimeConsumed)).Milliseconds()/lenBatch, map[string]string{"conn_group": request.ConnectionIdentifier.Group})
70+
logger.Errorf("[worker] Fail to publish message to kafka %v", err)
71+
totalErr++
7372
}
7473
}
75-
w.wg.Done()
76-
}(fmt.Sprintf("worker-%d", i))
74+
}
75+
lenBatch := int64(len(request.GetEvents()))
76+
logger.Debug(fmt.Sprintf("Success sending messages, %v", lenBatch-int64(totalErr)))
77+
if lenBatch > 0 {
78+
eventTimingMs := time.Since(request.GetSentTime().AsTime()).Milliseconds() / lenBatch
79+
metrics.Histogram(
80+
"event_processing_duration_milliseconds",
81+
eventTimingMs,
82+
map[string]string{"conn_group": request.ConnectionIdentifier.Group})
83+
now := time.Now()
84+
metrics.Histogram(
85+
"worker_processing_duration_milliseconds",
86+
(now.Sub(batchReadTime).Milliseconds())/lenBatch,
87+
map[string]string{"worker": name})
88+
metrics.Histogram(
89+
"server_processing_latency_milliseconds",
90+
(now.Sub(request.TimeConsumed)).Milliseconds()/lenBatch,
91+
map[string]string{"conn_group": request.ConnectionIdentifier.Group})
92+
}
93+
}
94+
w.wg.Done()
95+
}
96+
97+
// StartWorkers initialize worker pool as much as Pool.Size
98+
func (w *Pool) StartWorkers() {
99+
w.wg.Add(w.Size)
100+
for i := 0; i < w.Size; i++ {
101+
w.newWorker(fmt.Sprintf("worker-%d", i))
77102
}
78103
}
79104

worker/worker_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func TestWorker(t *testing.T) {
3636
Size: 1,
3737
deliveryChannelSize: 0,
3838
EventsChannel: bc,
39-
kafkaProducer: &kp,
39+
producer: &kp,
4040
wg: sync.WaitGroup{},
4141
}
4242
worker.StartWorkers()
@@ -63,7 +63,7 @@ func TestWorker(t *testing.T) {
6363
Size: 1,
6464
deliveryChannelSize: 100,
6565
EventsChannel: bc,
66-
kafkaProducer: &kp,
66+
producer: &kp,
6767
wg: sync.WaitGroup{},
6868
}
6969
worker.StartWorkers()

0 commit comments

Comments
 (0)