Skip to content

Commit

Permalink
model: introduce APMEvent; Batch is now []model.APMEvent (#5613)
Browse files Browse the repository at this point in the history
* model: introduce APMEvent; Batch is now []APMEvent

Introduce APMEvent, a sort of discriminated union of all
possible APM event types.

In the future we will define module types in protobuf and
generate the Go code. The APMEvent type will evolve to map
directly to the Elasticsearch docs we produce, along these
lines:

```
message APMEvent {
  google.protobuf.Timestamp timestamp = 1;
  Event event = 2;
  Trace trace = 3;
  Transaction transaction = 4;
  Span span = 5;
  Metricset metricset = 6;
  Parent parent = 7;
  Agent agent = 8;
  Client client = 9;
  Cloud cloud = 10;
  Container container = 11;
  Host host = 12;
  Kubernetes kubernetes = 13;
  map<string, LabelValue> labels = 14;
  Process process = 15;
  Service service = 16;
  Session session = 17;
  User user = 18;
  UserAgent user_agent = 19;
  HTTP http = 20;
  URL url = 21;
  Child child = 22;
  Error error = 23;
}
```

When we do that, we'll be moving common fields (e.g. service details,
labels) to APMEvent, and out of the specific event objects.

* Adapt code to model.Batch changes

* Update approvals

Order of events changed: they're once again reported in the order received.

(cherry picked from commit 0661ad0)
  • Loading branch information
axw authored and mergify-bot committed Jul 7, 2021
1 parent 83774cb commit ef93806
Show file tree
Hide file tree
Showing 40 changed files with 1,557 additions and 1,523 deletions.
9 changes: 4 additions & 5 deletions agentcfg/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,22 +83,21 @@ func (r Reporter) Run(ctx context.Context) error {
continue
case <-t.C:
}
batch := new(model.Batch)
batch := make(model.Batch, 0, len(applied))
for etag := range applied {
m := &model.Metricset{
batch = append(batch, model.APMEvent{Metricset: &model.Metricset{
Name: "agent_config",
Labels: common.MapStr{"etag": etag},
Samples: []model.Sample{{Name: "agent_config_applied", Value: 1}},
}
batch.Metricsets = append(batch.Metricsets, m)
}})
}
// Reset applied map, so that we report only configs applied
// during a given iteration.
applied = make(map[string]struct{})
wg.Add(1)
go func() {
defer wg.Done()
if err := r.p.ProcessBatch(ctx, batch); err != nil {
if err := r.p.ProcessBatch(ctx, &batch); err != nil {
r.logger.Errorf("error sending applied agent configs to kibana: %v", err)
}
}()
Expand Down
4 changes: 3 additions & 1 deletion agentcfg/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ type batchProcessor struct {
func (p *batchProcessor) ProcessBatch(_ context.Context, b *model.Batch) error {
p.mu.Lock()
defer p.mu.Unlock()
p.received = append(p.received, b.Metricsets...)
for _, event := range *b {
p.received = append(p.received, event.Metricset)
}
p.receivedc <- struct{}{}
return nil
}
8 changes: 4 additions & 4 deletions beater/api/profile/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,15 +158,15 @@ func Handler(requestMetadataFunc RequestMetadataFunc, processor model.BatchProce
}
}

modelProfiles := make([]*model.PprofProfile, len(profiles))
batch := make(model.Batch, len(profiles))
for i, p := range profiles {
modelProfiles[i] = &model.PprofProfile{
batch[i].Profile = &model.PprofProfile{
Metadata: profileMetadata,
Profile: p,
}
}

if err := processor.ProcessBatch(c.Request.Context(), &model.Batch{Profiles: modelProfiles}); err != nil {
if err := processor.ProcessBatch(c.Request.Context(), &batch); err != nil {
switch err {
case publish.ErrChannelClosed:
return nil, requestError{
Expand All @@ -181,7 +181,7 @@ func Handler(requestMetadataFunc RequestMetadataFunc, processor model.BatchProce
}
return nil, err
}
return &result{Accepted: len(modelProfiles)}, nil
return &result{Accepted: len(batch)}, nil
}
return func(c *request.Context) {
result, err := handle(c)
Expand Down
6 changes: 3 additions & 3 deletions beater/api/profile/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,9 @@ func TestHandler(t *testing.T) {
reports: 1,
batchProcessor: func(t *testing.T) model.BatchProcessor {
return model.ProcessBatchFunc(func(ctx context.Context, batch *model.Batch) error {
require.Len(t, batch.Profiles, 2)
for _, profile := range batch.Profiles {
assert.Equal(t, "foo", profile.Metadata.Service.Name)
require.Len(t, *batch, 2)
for _, event := range *batch {
assert.Equal(t, "foo", event.Profile.Metadata.Service.Name)
}
return nil
})
Expand Down
11 changes: 7 additions & 4 deletions beater/beater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,16 @@ func newTestBeater(
Logger: logger,
WrapRunServer: func(runServer RunServerFunc) RunServerFunc {
var processor model.ProcessBatchFunc = func(ctx context.Context, batch *model.Batch) error {
for _, tx := range batch.Transactions {
for _, event := range *batch {
if event.Transaction == nil {
continue
}
// Add a label to test that everything
// goes through the wrapped reporter.
if tx.Labels == nil {
tx.Labels = common.MapStr{}
if event.Transaction.Labels == nil {
event.Transaction.Labels = common.MapStr{}
}
tx.Labels["wrapped_reporter"] = true
event.Transaction.Labels["wrapped_reporter"] = true
}
return nil
}
Expand Down
10 changes: 4 additions & 6 deletions beater/otlp/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ var (
)

func TestConsumeTraces(t *testing.T) {
var batches []*model.Batch
var batches []model.Batch
var reportError error
var batchProcessor model.ProcessBatchFunc = func(ctx context.Context, batch *model.Batch) error {
batches = append(batches, batch)
batches = append(batches, *batch)
return reportError
}

Expand Down Expand Up @@ -93,10 +93,8 @@ func TestConsumeTraces(t *testing.T) {
errStatus := status.Convert(err)
assert.Equal(t, "failed to publish events", errStatus.Message())
require.Len(t, batches, 2)

for _, batch := range batches {
assert.Equal(t, 1, batch.Len())
}
assert.Len(t, batches[0], 1)
assert.Len(t, batches[1], 1)

actual := map[string]interface{}{}
monitoring.GetRegistry("apm-server.otlp.grpc.traces").Do(monitoring.Full, func(key string, value interface{}) {
Expand Down
2 changes: 1 addition & 1 deletion beater/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func rateLimitBatchProcessor(ctx context.Context, batch *model.Batch) error {
if limiter, ok := ratelimit.FromContext(ctx); ok {
ctx, cancel := context.WithTimeout(ctx, rateLimitTimeout)
defer cancel()
if err := limiter.WaitN(ctx, batch.Len()); err != nil {
if err := limiter.WaitN(ctx, len(*batch)); err != nil {
return ratelimit.ErrRateLimitExceeded
}
}
Expand Down
6 changes: 3 additions & 3 deletions beater/processors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ func TestRateLimitBatchProcessor(t *testing.T) {
limiter := rate.NewLimiter(1, 10)
ctx := ratelimit.ContextWithLimiter(context.Background(), limiter)

var batch model.Batch
for i := 0; i < 5; i++ {
batch.Transactions = append(batch.Transactions, &model.Transaction{})
batch := make(model.Batch, 5)
for i := range batch {
batch[i].Transaction = &model.Transaction{}
}
for i := 0; i < 2; i++ {
err := rateLimitBatchProcessor(ctx, &batch)
Expand Down
Loading

0 comments on commit ef93806

Please sign in to comment.