Skip to content

Commit 8111250

Browse files
committed
chore: StatsD code removal
1 parent 55d100d commit 8111250

13 files changed

+38
-419
lines changed

agent/agent.go

+5-8
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ type Agent struct {
2727
extractorFactory *registry.ExtractorFactory
2828
processorFactory *registry.ProcessorFactory
2929
sinkFactory *registry.SinkFactory
30-
monitor []Monitor
30+
monitor Monitor
3131
logger log.Logger
3232
retrier *retrier
3333
stopOnSinkError bool
@@ -279,8 +279,8 @@ func (r *Agent) setupSink(ctx context.Context, sr recipe.PluginRecipe, stream *s
279279
}
280280

281281
retryNotification := func(e error, d time.Duration) {
282-
for _, mt := range r.monitor {
283-
mt.RecordSinkRetryCount(ctx, pluginInfo)
282+
if r.monitor != nil {
283+
r.monitor.RecordSinkRetryCount(ctx, pluginInfo)
284284
}
285285

286286
r.logger.Warn(
@@ -302,9 +302,6 @@ func (r *Agent) setupSink(ctx context.Context, sr recipe.PluginRecipe, stream *s
302302
)
303303

304304
pluginInfo.Success = err == nil
305-
for _, mt := range r.monitor {
306-
mt.RecordPlugin(ctx, pluginInfo) // this can be deleted when statsd is removed
307-
}
308305
if err != nil {
309306
// once it reaches here, it means that the retry has been exhausted and still got error
310307
r.logger.Error("error running sink", "sink", sr.Name, "error", err.Error())
@@ -328,8 +325,8 @@ func (r *Agent) setupSink(ctx context.Context, sr recipe.PluginRecipe, stream *s
328325
}
329326

330327
func (r *Agent) logAndRecordMetrics(ctx context.Context, run Run) {
331-
for _, monitor := range r.monitor {
332-
monitor.RecordRun(ctx, run)
328+
if r.monitor != nil {
329+
r.monitor.RecordRun(ctx, run)
333330
}
334331

335332
if run.Success {

agent/agent_test.go

+28-28
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ func TestAgentRun(t *testing.T) {
8282
ProcessorFactory: pf,
8383
SinkFactory: sf,
8484
Logger: utils.Logger,
85-
Monitor: []agent.Monitor{monitor},
85+
Monitor: monitor,
8686
})
8787
run := r.Run(ctx, validRecipe)
8888
assert.False(t, run.Success)
@@ -113,7 +113,7 @@ func TestAgentRun(t *testing.T) {
113113
ProcessorFactory: registry.NewProcessorFactory(),
114114
SinkFactory: sf,
115115
Logger: utils.Logger,
116-
Monitor: []agent.Monitor{monitor},
116+
Monitor: monitor,
117117
})
118118
run := r.Run(ctx, validRecipe)
119119
assert.False(t, run.Success)
@@ -146,7 +146,7 @@ func TestAgentRun(t *testing.T) {
146146
ProcessorFactory: pf,
147147
SinkFactory: registry.NewSinkFactory(),
148148
Logger: utils.Logger,
149-
Monitor: []agent.Monitor{monitor},
149+
Monitor: monitor,
150150
})
151151
run := r.Run(ctx, validRecipe)
152152
assert.False(t, run.Success)
@@ -183,7 +183,7 @@ func TestAgentRun(t *testing.T) {
183183
ProcessorFactory: pf,
184184
SinkFactory: sf,
185185
Logger: utils.Logger,
186-
Monitor: []agent.Monitor{monitor},
186+
Monitor: monitor,
187187
})
188188
run := r.Run(ctx, validRecipe)
189189
assert.False(t, run.Success)
@@ -222,7 +222,7 @@ func TestAgentRun(t *testing.T) {
222222
ProcessorFactory: pf,
223223
SinkFactory: sf,
224224
Logger: utils.Logger,
225-
Monitor: []agent.Monitor{monitor},
225+
Monitor: monitor,
226226
})
227227
run := r.Run(ctx, validRecipe)
228228
assert.False(t, run.Success)
@@ -263,7 +263,7 @@ func TestAgentRun(t *testing.T) {
263263
ProcessorFactory: pf,
264264
SinkFactory: sf,
265265
Logger: utils.Logger,
266-
Monitor: []agent.Monitor{monitor},
266+
Monitor: monitor,
267267
})
268268
run := r.Run(ctx, validRecipe)
269269
assert.False(t, run.Success)
@@ -305,7 +305,7 @@ func TestAgentRun(t *testing.T) {
305305
ProcessorFactory: pf,
306306
SinkFactory: sf,
307307
Logger: utils.Logger,
308-
Monitor: []agent.Monitor{monitor},
308+
Monitor: monitor,
309309
})
310310
run := r.Run(ctx, validRecipe)
311311
assert.False(t, run.Success)
@@ -346,7 +346,7 @@ func TestAgentRun(t *testing.T) {
346346
ProcessorFactory: pf,
347347
SinkFactory: sf,
348348
Logger: utils.Logger,
349-
Monitor: []agent.Monitor{monitor},
349+
Monitor: monitor,
350350
})
351351
run := r.Run(ctx, validRecipe)
352352
assert.False(t, run.Success)
@@ -394,7 +394,7 @@ func TestAgentRun(t *testing.T) {
394394
ProcessorFactory: pf,
395395
SinkFactory: sf,
396396
Logger: utils.Logger,
397-
Monitor: []agent.Monitor{monitor},
397+
Monitor: monitor,
398398
})
399399
run := r.Run(ctx, validRecipe)
400400
assert.False(t, run.Success)
@@ -441,7 +441,7 @@ func TestAgentRun(t *testing.T) {
441441
ProcessorFactory: pf,
442442
SinkFactory: sf,
443443
Logger: utils.Logger,
444-
Monitor: []agent.Monitor{monitor},
444+
Monitor: monitor,
445445
})
446446
run := r.Run(ctx, validRecipe)
447447
assert.False(t, run.Success)
@@ -483,15 +483,15 @@ func TestAgentRun(t *testing.T) {
483483

484484
monitor := newMockMonitor()
485485
monitor.On("RecordRun", mockCtx, mock.AnythingOfType("agent.Run")).Once()
486-
monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo"))
486+
monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo")).Maybe()
487487
defer monitor.AssertExpectations(t)
488488

489489
r := agent.NewAgent(agent.Config{
490490
ExtractorFactory: ef,
491491
ProcessorFactory: pf,
492492
SinkFactory: sf,
493493
Logger: utils.Logger,
494-
Monitor: []agent.Monitor{monitor},
494+
Monitor: monitor,
495495
})
496496
run := r.Run(ctx, validRecipe)
497497
assert.True(t, run.Success)
@@ -533,7 +533,7 @@ func TestAgentRun(t *testing.T) {
533533

534534
monitor := newMockMonitor()
535535
monitor.On("RecordRun", mockCtx, mock.AnythingOfType("agent.Run")).Once()
536-
monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo"))
536+
monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo")).Maybe()
537537
defer monitor.AssertExpectations(t)
538538

539539
r := agent.NewAgent(agent.Config{
@@ -542,7 +542,7 @@ func TestAgentRun(t *testing.T) {
542542
SinkFactory: sf,
543543
Logger: utils.Logger,
544544
StopOnSinkError: true,
545-
Monitor: []agent.Monitor{monitor},
545+
Monitor: monitor,
546546
})
547547

548548
run := r.Run(ctx, validRecipe)
@@ -585,7 +585,7 @@ func TestAgentRun(t *testing.T) {
585585

586586
monitor := newMockMonitor()
587587
monitor.On("RecordRun", mockCtx, mock.AnythingOfType("agent.Run")).Once()
588-
monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo"))
588+
monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo")).Maybe()
589589
defer monitor.AssertExpectations(t)
590590

591591
r := agent.NewAgent(agent.Config{
@@ -594,7 +594,7 @@ func TestAgentRun(t *testing.T) {
594594
SinkFactory: sf,
595595
Logger: utils.Logger,
596596
StopOnSinkError: false,
597-
Monitor: []agent.Monitor{monitor},
597+
Monitor: monitor,
598598
})
599599

600600
run := r.Run(ctx, validRecipe)
@@ -649,15 +649,15 @@ func TestAgentRun(t *testing.T) {
649649

650650
monitor := newMockMonitor()
651651
monitor.On("RecordRun", mockCtx, mock.AnythingOfType("agent.Run")).Once()
652-
monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo"))
652+
monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo")).Maybe()
653653
defer monitor.AssertExpectations(t)
654654

655655
r := agent.NewAgent(agent.Config{
656656
ExtractorFactory: ef,
657657
ProcessorFactory: pf,
658658
SinkFactory: sf,
659659
Logger: utils.Logger,
660-
Monitor: []agent.Monitor{monitor},
660+
Monitor: monitor,
661661
})
662662
run := r.Run(ctx, validRecipe)
663663
assert.NoError(t, run.Error)
@@ -706,14 +706,14 @@ func TestAgentRun(t *testing.T) {
706706

707707
monitor := newMockMonitor()
708708
monitor.On("RecordRun", mockCtx, mock.AnythingOfType("agent.Run")).Once()
709-
monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo"))
709+
monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo")).Maybe()
710710
defer monitor.AssertExpectations(t)
711711

712712
r := agent.NewAgent(agent.Config{
713713
ExtractorFactory: ef,
714714
ProcessorFactory: pf,
715715
SinkFactory: sf,
716-
Monitor: []agent.Monitor{monitor},
716+
Monitor: monitor,
717717
Logger: utils.Logger,
718718
TimerFn: timerFn,
719719
})
@@ -761,15 +761,15 @@ func TestAgentRun(t *testing.T) {
761761

762762
monitor := newMockMonitor()
763763
monitor.On("RecordRun", mockCtx, mock.AnythingOfType("agent.Run")).Once()
764-
monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo"))
764+
monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo")).Maybe()
765765
defer monitor.AssertExpectations(t)
766766

767767
r := agent.NewAgent(agent.Config{
768768
ExtractorFactory: ef,
769769
ProcessorFactory: pf,
770770
SinkFactory: sf,
771771
Logger: utils.Logger,
772-
Monitor: []agent.Monitor{monitor},
772+
Monitor: monitor,
773773
MaxRetries: 2, // need to retry "at least" 2 times since Extractor returns RetryError twice
774774
RetryInitialInterval: 1 * time.Millisecond, // this is to override default retry interval to reduce test time
775775
})
@@ -815,7 +815,7 @@ func TestAgentRun(t *testing.T) {
815815

816816
monitor := newMockMonitor()
817817
monitor.On("RecordRun", mockCtx, mock.AnythingOfType("agent.Run")).Once()
818-
monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo"))
818+
monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo")).Maybe()
819819
monitor.On("RecordSinkRetryCount", mockCtx, mock.AnythingOfType("agent.PluginInfo"))
820820
defer monitor.AssertExpectations(t)
821821

@@ -824,7 +824,7 @@ func TestAgentRun(t *testing.T) {
824824
ProcessorFactory: pf,
825825
SinkFactory: sf,
826826
Logger: utils.Logger,
827-
Monitor: []agent.Monitor{monitor},
827+
Monitor: monitor,
828828
MaxRetries: 2, // need to retry "at least" 2 times since Sink returns RetryError twice
829829
RetryInitialInterval: 1 * time.Millisecond, // this is to override default retry interval to reduce test time
830830
})
@@ -884,7 +884,7 @@ func TestAgentRun(t *testing.T) {
884884
ProcessorFactory: pf,
885885
SinkFactory: sf,
886886
Logger: utils.Logger,
887-
Monitor: []agent.Monitor{monitor},
887+
Monitor: monitor,
888888
MaxRetries: 5,
889889
RetryInitialInterval: 10 * time.Second,
890890
})
@@ -1057,15 +1057,15 @@ func TestAgentRunMultiple(t *testing.T) {
10571057

10581058
monitor := newMockMonitor()
10591059
monitor.On("RecordRun", mockCtx, mock.AnythingOfType("agent.Run"))
1060-
monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo"))
1060+
monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo")).Maybe()
10611061
defer monitor.AssertExpectations(t)
10621062

10631063
r := agent.NewAgent(agent.Config{
10641064
ExtractorFactory: ef,
10651065
ProcessorFactory: pf,
10661066
SinkFactory: sf,
10671067
Logger: utils.Logger,
1068-
Monitor: []agent.Monitor{monitor},
1068+
Monitor: monitor,
10691069
})
10701070
runs := r.RunMultiple(ctx, recipeList)
10711071

@@ -1152,7 +1152,7 @@ func TestValidate(t *testing.T) {
11521152
ProcessorFactory: pf,
11531153
SinkFactory: sf,
11541154
Logger: utils.Logger,
1155-
Monitor: []agent.Monitor{newMockMonitor()},
1155+
Monitor: newMockMonitor(),
11561156
})
11571157

11581158
var expectedErrs []error

agent/config.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ type Config struct {
1111
ExtractorFactory *registry.ExtractorFactory
1212
ProcessorFactory *registry.ProcessorFactory
1313
SinkFactory *registry.SinkFactory
14-
Monitor []Monitor
14+
Monitor Monitor
1515
Logger log.Logger
1616
MaxRetries int
1717
RetryInitialInterval time.Duration

cmd/run.go

+2-19
Original file line numberDiff line numberDiff line change
@@ -70,16 +70,7 @@ func RunCmd() *cobra.Command {
7070
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
7171
defer stop()
7272

73-
var mts []agent.Monitor
74-
if cfg.StatsdEnabled {
75-
mt, err := newStatsdMonitor(cfg)
76-
if err != nil {
77-
return err
78-
}
79-
80-
mts = append(mts, mt)
81-
82-
}
73+
var mts agent.Monitor
8374

8475
if cfg.OtelEnabled {
8576
doneOtlp, err := metrics.InitOtel(ctx, cfg, lg, Version)
@@ -88,7 +79,7 @@ func RunCmd() *cobra.Command {
8879
}
8980
defer doneOtlp()
9081

91-
mts = append(mts, metrics.NewOtelMonitor())
82+
mts = metrics.NewOtelMonitor()
9283
}
9384

9485
runner := agent.NewAgent(agent.Config{
@@ -156,11 +147,3 @@ func RunCmd() *cobra.Command {
156147

157148
return cmd
158149
}
159-
160-
func newStatsdMonitor(cfg config.Config) (*metrics.StatsdMonitor, error) {
161-
client, err := metrics.NewStatsdClient(cfg.StatsdHost)
162-
if err != nil {
163-
return nil, err
164-
}
165-
return metrics.NewStatsdMonitor(client, cfg.AppName), nil
166-
}

config/config.go

-2
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@ type Config struct {
1414
MaxRetries int `mapstructure:"MAX_RETRIES" default:"5"`
1515
RetryInitialIntervalSeconds int `mapstructure:"RETRY_INITIAL_INTERVAL_SECONDS" default:"5"`
1616
StopOnSinkError bool `mapstructure:"STOP_ON_SINK_ERROR" default:"false"`
17-
StatsdEnabled bool `mapstructure:"STATSD_ENABLED" default:"false"`
18-
StatsdHost string `mapstructure:"STATSD_HOST" default:"localhost:8125"`
1917
OtelEnabled bool `mapstructure:"OTEL_ENABLED" default:"false"`
2018
OtelCollectorAddr string `mapstructure:"OTEL_COLLECTOR_ADDR" default:"localhost:4317"`
2119
OtelTraceSampleProbability float64 `mapstructure:"OTEL_TRACE_SAMPLE_PROBABILITY" default:"1"`

config/config_test.go

-4
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@ func TestLoad(t *testing.T) {
2525
expected: config.Config{
2626
AppName: "meteor",
2727
LogLevel: "info",
28-
StatsdEnabled: false,
29-
StatsdHost: "localhost:8125",
3028
OtelEnabled: false,
3129
OtelCollectorAddr: "localhost:4317",
3230
OtelTraceSampleProbability: 1,
@@ -43,8 +41,6 @@ func TestLoad(t *testing.T) {
4341
expected: config.Config{
4442
AppName: "meteor",
4543
LogLevel: "info",
46-
StatsdEnabled: false,
47-
StatsdHost: "localhost:8125",
4844
OtelEnabled: false,
4945
OtelCollectorAddr: "localhost:4317",
5046
OtelTraceSampleProbability: 1,

config/meteor.yaml.sample

-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
11
LOG_LEVEL: info
2-
STATSD_ENABLED: false
3-
STATSD_HOST: "localhost:8125"
42
MAX_RETRIES: 5
53
RETRY_INITIAL_INTERVAL_SECONDS: 5
64
STOP_ON_SINK_ERROR: false

config/testdata/invalid-config.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
STATSD_ENABLED: not-a-boolean
1+
OTEL_ENABLED: not-a-boolean

config/testdata/valid-config.yaml

+1-4
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,4 @@
11
LOG_LEVEL: info
2-
STATSD_ENABLED: false
3-
STATSD_HOST: "localhost:8125"
4-
STATSD_PREFIX: meteor
52
MAX_RETRIES: 5
63
RETRY_INITIAL_INTERVAL_SECONDS: 5
7-
STOP_ON_SINK_ERROR: false
4+
STOP_ON_SINK_ERROR: false

go.mod

-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ require (
2020
github.com/dnaeon/go-vcr/v2 v2.0.1
2121
github.com/elastic/go-elasticsearch v0.0.0
2222
github.com/elastic/go-elasticsearch/v8 v8.0.0-20210708134649-33f644c8e327
23-
github.com/etsy/statsd v0.9.0
2423
github.com/go-kivik/couchdb v2.0.0+incompatible
2524
github.com/go-kivik/kivik v2.0.0+incompatible
2625
github.com/go-playground/validator/v10 v10.10.0

0 commit comments

Comments
 (0)