Skip to content

Commit 6132ec6

Browse files
committed
Refactor so we can reuse endpoint for wal and non wal implementation
1 parent fb1f93b commit 6132ec6

File tree

7 files changed

+591
-613
lines changed

7 files changed

+591
-613
lines changed
Lines changed: 2 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,14 @@
11
package client
22

33
import (
4-
"context"
5-
"crypto/sha256"
64
"fmt"
75
"sync"
8-
"time"
96

107
"github.com/go-kit/log"
11-
"github.com/grafana/dskit/backoff"
128
"github.com/prometheus/client_golang/prometheus"
139

1410
"github.com/grafana/alloy/internal/component/common/loki"
1511
"github.com/grafana/alloy/internal/component/common/loki/client/internal"
16-
"github.com/grafana/alloy/internal/useragent"
1712
)
1813

1914
func NewFanoutConsumer(logger log.Logger, reg prometheus.Registerer, cfgs ...Config) (*FanoutConsumer, error) {
@@ -39,7 +34,7 @@ func NewFanoutConsumer(logger log.Logger, reg prometheus.Registerer, cfgs ...Con
3934
}
4035

4136
endpointsCheck[name] = struct{}{}
42-
endpoint, err := newEndpoint(metrics, cfg, logger)
37+
endpoint, err := newEndpoint(metrics, cfg, logger, internal.NewNopMarkerHandler())
4338
if err != nil {
4439
return nil, fmt.Errorf("error starting client: %w", err)
4540
}
@@ -63,7 +58,7 @@ type FanoutConsumer struct {
6358
func (c *FanoutConsumer) run() {
6459
for e := range c.recv {
6560
for _, c := range c.endpoints {
66-
c.Chan() <- e
61+
c.enqueue(e, 0)
6762
}
6863
}
6964
}
@@ -88,87 +83,3 @@ func (c *FanoutConsumer) Stop() {
8883
// Wait for all endpoints to stop.
8984
stopWG.Wait()
9085
}
91-
92-
// getEndpointName computes the specific name for each endpoint config. The name is either the configured Name setting in Config,
93-
// or a hash of the config as whole, this allows us to detect repeated configs.
94-
func getEndpointName(cfg Config) string {
95-
if cfg.Name != "" {
96-
return cfg.Name
97-
}
98-
return asSha256(cfg)
99-
}
100-
101-
func asSha256(o any) string {
102-
h := sha256.New()
103-
_, _ = fmt.Fprintf(h, "%v", o)
104-
105-
temp := fmt.Sprintf("%x", h.Sum(nil))
106-
return temp[:6]
107-
}
108-
109-
var userAgent = useragent.Get()
110-
111-
type endpoint struct {
112-
cfg Config
113-
entries chan loki.Entry
114-
115-
wg sync.WaitGroup
116-
117-
ctx context.Context
118-
cancel context.CancelFunc
119-
120-
shards *shards
121-
}
122-
123-
func newEndpoint(metrics *Metrics, cfg Config, logger log.Logger) (*endpoint, error) {
124-
logger = log.With(logger, "component", "endpoint", "host", cfg.URL.Host)
125-
126-
shards, err := newShards(metrics, logger, internal.NewNopMarkerHandler(), cfg)
127-
if err != nil {
128-
return nil, err
129-
}
130-
131-
ctx, cancel := context.WithCancel(context.Background())
132-
133-
c := &endpoint{
134-
cfg: cfg,
135-
entries: make(chan loki.Entry),
136-
shards: shards,
137-
ctx: ctx,
138-
cancel: cancel,
139-
}
140-
141-
c.shards.start(cfg.QueueConfig.MinShards)
142-
143-
c.wg.Go(func() { c.run() })
144-
return c, nil
145-
}
146-
147-
func (c *endpoint) run() {
148-
for {
149-
select {
150-
case <-c.ctx.Done():
151-
return
152-
case e := <-c.entries:
153-
backoff := backoff.New(c.ctx, backoff.Config{
154-
MinBackoff: 5 * time.Millisecond,
155-
MaxBackoff: 50 * time.Millisecond,
156-
})
157-
for !c.shards.enqueue(e, 0) {
158-
if !backoff.Ongoing() {
159-
break
160-
}
161-
}
162-
}
163-
}
164-
}
165-
166-
func (c *endpoint) Chan() chan<- loki.Entry {
167-
return c.entries
168-
}
169-
170-
func (c *endpoint) Stop() {
171-
c.shards.stop()
172-
c.cancel()
173-
c.wg.Wait()
174-
}

internal/component/common/loki/client/consumer_fanout_test.go

Lines changed: 0 additions & 434 deletions
Large diffs are not rendered by default.

internal/component/common/loki/client/consumer_wal.go

Lines changed: 44 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,11 @@
11
package client
22

33
import (
4-
"context"
54
"fmt"
65
"sync"
7-
"time"
86

97
"github.com/go-kit/log"
108
"github.com/go-kit/log/level"
11-
"github.com/grafana/dskit/backoff"
129
"github.com/prometheus/client_golang/prometheus"
1310
"github.com/prometheus/common/model"
1411
"github.com/prometheus/prometheus/tsdb/chunks"
@@ -57,16 +54,18 @@ func NewWALConsumer(logger log.Logger, reg prometheus.Registerer, walCfg wal.Con
5754
}
5855
markerHandler := internal.NewMarkerHandler(markerFileHandler, walCfg.MaxSegmentAge, logger, walMarkerMetrics.WithCurriedId(name))
5956

60-
endpoint, err := newWalEndpoint(metrics, walEndpointMetrics.CurryWithId(name), cfg, logger, markerHandler)
57+
endpoint, err := newEndpoint(metrics, cfg, logger, markerHandler)
6158
if err != nil {
62-
return nil, fmt.Errorf("error starting wal endpoint: %w", err)
59+
return nil, fmt.Errorf("error starting endpoint: %w", err)
6360
}
6461

62+
adapter := newWalEndpointAdapter(endpoint, logger, walEndpointMetrics, markerHandler)
63+
6564
// subscribe watcher's wal.WriteTo to writer events. This will make the writer trigger the cleanup of the wal.WriteTo
6665
// series cache whenever a segment is deleted.
67-
writer.SubscribeCleanup(endpoint)
66+
writer.SubscribeCleanup(adapter)
6867

69-
watcher := wal.NewWatcher(walCfg.Dir, name, walWatcherMetrics, endpoint, log.With(logger, "component", name), walCfg.WatchConfig, markerHandler)
68+
watcher := wal.NewWatcher(walCfg.Dir, name, walWatcherMetrics, adapter, log.With(logger, "component", name), walCfg.WatchConfig, markerHandler)
7069

7170
// subscribe watcher to wal write events
7271
writer.SubscribeWrite(watcher)
@@ -76,7 +75,7 @@ func NewWALConsumer(logger log.Logger, reg prometheus.Registerer, walCfg wal.Con
7675

7776
m.pairs = append(m.pairs, endpointWatcherPair{
7877
watcher: watcher,
79-
endpoint: endpoint,
78+
endpoint: adapter,
8079
})
8180
}
8281

@@ -85,7 +84,7 @@ func NewWALConsumer(logger log.Logger, reg prometheus.Registerer, walCfg wal.Con
8584

8685
type endpointWatcherPair struct {
8786
watcher *wal.Watcher
88-
endpoint *walEndpoint
87+
endpoint *walEndpointAdapter
8988
}
9089

9190
// Stop will proceed to stop, in order, watcher and the endpoint.
@@ -115,8 +114,8 @@ func (m *WALConsumer) Stop() {
115114
m.stop(false)
116115
}
117116

118-
// StopAndDrain will stop the manager, its WalWriter, Write-Ahead Log watchers,
119-
// and queues accordingly. It attempt to drain the WAL completely.
117+
// StopAndDrain will stop the consumer, its WalWriter, Write-Ahead Log watchers,
118+
// and endpoints accordingly. It attempt to drain the WAL completely.
120119
func (m *WALConsumer) StopAndDrain() {
121120
m.stop(true)
122121
}
@@ -139,47 +138,29 @@ func (m *WALConsumer) stop(drain bool) {
139138
stopWG.Wait()
140139
}
141140

142-
func newWalEndpoint(metrics *Metrics, wcMetrics *WALEndpointMetrics, cfg Config, logger log.Logger, markerHandler internal.MarkerHandler) (*walEndpoint, error) {
143-
logger = log.With(logger, "component", "endpoint", "host", cfg.URL.Host)
144-
145-
shards, err := newShards(metrics, logger, markerHandler, cfg)
146-
if err != nil {
147-
return nil, err
148-
}
149-
150-
ctx, cancel := context.WithCancel(context.Background())
151-
152-
c := &walEndpoint{
153-
logger: logger,
154-
cfg: cfg,
141+
func newWalEndpointAdapter(endpoint *endpoint, logger log.Logger, wcMetrics *WALEndpointMetrics, markerHandler internal.MarkerHandler) *walEndpointAdapter {
142+
c := &walEndpointAdapter{
143+
logger: log.With(logger, "component", "waladapter"),
155144
weMetrics: wcMetrics,
156-
shards: shards,
157-
158-
ctx: ctx,
159-
cancel: cancel,
145+
endpoint: endpoint,
160146

161147
series: make(map[chunks.HeadSeriesRef]model.LabelSet),
162148
seriesSegment: make(map[chunks.HeadSeriesRef]int),
163149

164150
markerHandler: markerHandler,
165151
}
166152

167-
c.shards.start(cfg.QueueConfig.MinShards)
168-
169-
return c, nil
153+
return c
170154
}
171155

172-
// walEndpoint is a WAL-specific remote write implementation. This endpoint attests to the wal.WriteTo interface,
173-
// which allows it to be injected in the wal.Watcher as a destination where to write read series and entries. As the watcher
174-
// reads from the WAL, batches are created and dispatched onto a send queue when ready to be sent.
175-
type walEndpoint struct {
176-
weMetrics *WALEndpointMetrics
156+
// walEndpointAdapter is an adapter between watcher and endpoint. This component attests to the wal.WriteTo interface,
157+
// which allows it to be injected in the wal.Watcher as a destination where to write series and entries. As the watcher
158+
// reads from the WAL, entires are forwarded here so it can be written to endpoint.
159+
type walEndpointAdapter struct {
177160
logger log.Logger
178-
cfg Config
179-
shards *shards
161+
weMetrics *WALEndpointMetrics
180162

181-
ctx context.Context
182-
cancel context.CancelFunc
163+
endpoint *endpoint
183164

184165
// series cache
185166
series map[chunks.HeadSeriesRef]model.LabelSet
@@ -189,7 +170,7 @@ type walEndpoint struct {
189170
markerHandler internal.MarkerHandler
190171
}
191172

192-
func (c *walEndpoint) SeriesReset(segmentNum int) {
173+
func (c *walEndpointAdapter) SeriesReset(segmentNum int) {
193174
c.seriesLock.Lock()
194175
defer c.seriesLock.Unlock()
195176
for k, v := range c.seriesSegment {
@@ -201,7 +182,7 @@ func (c *walEndpoint) SeriesReset(segmentNum int) {
201182
}
202183
}
203184

204-
func (c *walEndpoint) StoreSeries(series []record.RefSeries, segment int) {
185+
func (c *walEndpointAdapter) StoreSeries(series []record.RefSeries, segment int) {
205186
c.seriesLock.Lock()
206187
defer c.seriesLock.Unlock()
207188
for _, seriesRec := range series {
@@ -210,55 +191,42 @@ func (c *walEndpoint) StoreSeries(series []record.RefSeries, segment int) {
210191
}
211192
}
212193

213-
func (c *walEndpoint) AppendEntries(entries wal.RefEntries, segment int) error {
194+
func (c *walEndpointAdapter) AppendEntries(entries wal.RefEntries, segment int) error {
214195
c.seriesLock.RLock()
215196
l, ok := c.series[entries.Ref]
216197
c.seriesLock.RUnlock()
217-
var maxSeenTimestamp int64 = -1
218-
if ok {
219-
for _, e := range entries.Entries {
220-
ok := c.appendSingleEntry(loki.Entry{Labels: l, Entry: e}, segment)
221-
if !ok {
222-
return nil
223-
}
224-
225-
if e.Timestamp.Unix() > maxSeenTimestamp {
226-
maxSeenTimestamp = e.Timestamp.Unix()
227-
}
228-
}
229-
// count all enqueued appended entries as received from WAL
230-
c.markerHandler.UpdateReceivedData(segment, len(entries.Entries))
231-
} else {
198+
199+
if !ok {
232200
// TODO(thepalbi): Add metric here
233201
level.Debug(c.logger).Log("msg", "series for entry not found")
202+
return nil
234203
}
235204

205+
var maxSeenTimestamp int64 = -1
206+
for _, e := range entries.Entries {
207+
ok := c.endpoint.enqueue(loki.Entry{Labels: l, Entry: e}, segment)
208+
if !ok {
209+
return nil
210+
}
211+
212+
if e.Timestamp.Unix() > maxSeenTimestamp {
213+
maxSeenTimestamp = e.Timestamp.Unix()
214+
}
215+
}
216+
217+
// count all enqueued appended entries as received from WAL
218+
c.markerHandler.UpdateReceivedData(segment, len(entries.Entries))
219+
236220
// It's safe to assume that upon an AppendEntries call, there will always be at least
237221
// one entry.
238222
c.weMetrics.lastReadTimestamp.WithLabelValues().Set(float64(maxSeenTimestamp))
239223

240224
return nil
241225
}
242226

243-
func (c *walEndpoint) appendSingleEntry(entry loki.Entry, segmentNum int) bool {
244-
backoff := backoff.New(c.ctx, backoff.Config{
245-
MinBackoff: 5 * time.Millisecond,
246-
MaxBackoff: 50 * time.Millisecond,
247-
})
248-
for !c.shards.enqueue(entry, segmentNum) {
249-
if !backoff.Ongoing() {
250-
// we could not enqueue and endpoint is stopped.
251-
return false
252-
}
253-
}
254-
255-
return true
256-
}
257-
258227
// Stop the endpoint, enqueueing pending batches and draining the send queue accordingly. Both closing operations are
259228
// limited by a deadline, controlled by a configured drain timeout, which is global to the Stop call.
260-
func (c *walEndpoint) Stop() {
261-
// drain shards
262-
c.shards.stop()
229+
func (c *walEndpointAdapter) Stop() {
230+
c.endpoint.Stop()
263231
c.markerHandler.Stop()
264232
}

0 commit comments

Comments
 (0)