diff --git a/agentcfg/reporter.go b/agentcfg/reporter.go index 89c07fd5bc3..05a325629d0 100644 --- a/agentcfg/reporter.go +++ b/agentcfg/reporter.go @@ -83,14 +83,13 @@ func (r Reporter) Run(ctx context.Context) error { continue case <-t.C: } - batch := new(model.Batch) + batch := make(model.Batch, 0, len(applied)) for etag := range applied { - m := &model.Metricset{ + batch = append(batch, model.APMEvent{Metricset: &model.Metricset{ Name: "agent_config", Labels: common.MapStr{"etag": etag}, Samples: []model.Sample{{Name: "agent_config_applied", Value: 1}}, - } - batch.Metricsets = append(batch.Metricsets, m) + }}) } // Reset applied map, so that we report only configs applied // during a given iteration. @@ -98,7 +97,7 @@ func (r Reporter) Run(ctx context.Context) error { wg.Add(1) go func() { defer wg.Done() - if err := r.p.ProcessBatch(ctx, batch); err != nil { + if err := r.p.ProcessBatch(ctx, &batch); err != nil { r.logger.Errorf("error sending applied agent configs to kibana: %v", err) } }() diff --git a/agentcfg/reporter_test.go b/agentcfg/reporter_test.go index 77cfe6a21a0..71613a37a71 100644 --- a/agentcfg/reporter_test.go +++ b/agentcfg/reporter_test.go @@ -97,7 +97,9 @@ type batchProcessor struct { func (p *batchProcessor) ProcessBatch(_ context.Context, b *model.Batch) error { p.mu.Lock() defer p.mu.Unlock() - p.received = append(p.received, b.Metricsets...) + for _, event := range *b { + p.received = append(p.received, event.Metricset) + } p.receivedc <- struct{}{} return nil } diff --git a/beater/api/profile/handler.go b/beater/api/profile/handler.go index e1f30da55fd..e453d7aea60 100644 --- a/beater/api/profile/handler.go +++ b/beater/api/profile/handler.go @@ -158,15 +158,15 @@ func Handler(requestMetadataFunc RequestMetadataFunc, processor model.BatchProce } } - modelProfiles := make([]*model.PprofProfile, len(profiles)) + batch := make(model.Batch, len(profiles)) for i, p := range profiles { - modelProfiles[i] = &model.PprofProfile{ + batch[i].Profile = &model.PprofProfile{ Metadata: profileMetadata, Profile: p, } } - if err := processor.ProcessBatch(c.Request.Context(), &model.Batch{Profiles: modelProfiles}); err != nil { + if err := processor.ProcessBatch(c.Request.Context(), &batch); err != nil { switch err { case publish.ErrChannelClosed: return nil, requestError{ @@ -181,7 +181,7 @@ func Handler(requestMetadataFunc RequestMetadataFunc, processor model.BatchProce } return nil, err } - return &result{Accepted: len(modelProfiles)}, nil + return &result{Accepted: len(batch)}, nil } return func(c *request.Context) { result, err := handle(c) diff --git a/beater/api/profile/handler_test.go b/beater/api/profile/handler_test.go index 0fa0f80eb52..781125800c1 100644 --- a/beater/api/profile/handler_test.go +++ b/beater/api/profile/handler_test.go @@ -140,9 +140,9 @@ func TestHandler(t *testing.T) { reports: 1, batchProcessor: func(t *testing.T) model.BatchProcessor { return model.ProcessBatchFunc(func(ctx context.Context, batch *model.Batch) error { - require.Len(t, batch.Profiles, 2) - for _, profile := range batch.Profiles { - assert.Equal(t, "foo", profile.Metadata.Service.Name) + require.Len(t, *batch, 2) + for _, event := range *batch { + assert.Equal(t, "foo", event.Profile.Metadata.Service.Name) } return nil }) diff --git a/beater/beater_test.go b/beater/beater_test.go index b7295aac0c0..71ac57f9d10 100644 --- a/beater/beater_test.go +++ b/beater/beater_test.go @@ -154,13 +154,16 @@ func newTestBeater( Logger: logger, WrapRunServer: func(runServer RunServerFunc) RunServerFunc { var processor model.ProcessBatchFunc = func(ctx context.Context, batch *model.Batch) error { - for _, tx := range batch.Transactions { + for _, event := range *batch { + if event.Transaction == nil { + continue + } // Add a label to test that everything // goes through the wrapped reporter. - if tx.Labels == nil { - tx.Labels = common.MapStr{} + if event.Transaction.Labels == nil { + event.Transaction.Labels = common.MapStr{} } - tx.Labels["wrapped_reporter"] = true + event.Transaction.Labels["wrapped_reporter"] = true } return nil } diff --git a/beater/otlp/grpc_test.go b/beater/otlp/grpc_test.go index 86b340b3313..e768105b33f 100644 --- a/beater/otlp/grpc_test.go +++ b/beater/otlp/grpc_test.go @@ -47,10 +47,10 @@ var ( ) func TestConsumeTraces(t *testing.T) { - var batches []*model.Batch + var batches []model.Batch var reportError error var batchProcessor model.ProcessBatchFunc = func(ctx context.Context, batch *model.Batch) error { - batches = append(batches, batch) + batches = append(batches, *batch) return reportError } @@ -93,10 +93,8 @@ func TestConsumeTraces(t *testing.T) { errStatus := status.Convert(err) assert.Equal(t, "failed to publish events", errStatus.Message()) require.Len(t, batches, 2) - - for _, batch := range batches { - assert.Equal(t, 1, batch.Len()) - } + assert.Len(t, batches[0], 1) + assert.Len(t, batches[1], 1) actual := map[string]interface{}{} monitoring.GetRegistry("apm-server.otlp.grpc.traces").Do(monitoring.Full, func(key string, value interface{}) { diff --git a/beater/processors.go b/beater/processors.go index 60301b989b6..83abf7d7667 100644 --- a/beater/processors.go +++ b/beater/processors.go @@ -46,7 +46,7 @@ func rateLimitBatchProcessor(ctx context.Context, batch *model.Batch) error { if limiter, ok := ratelimit.FromContext(ctx); ok { ctx, cancel := context.WithTimeout(ctx, rateLimitTimeout) defer cancel() - if err := limiter.WaitN(ctx, batch.Len()); err != nil { + if err := limiter.WaitN(ctx, len(*batch)); err != nil { return ratelimit.ErrRateLimitExceeded } } diff --git a/beater/processors_test.go b/beater/processors_test.go index 88c4b7e8b74..f3116e85b63 100644 --- a/beater/processors_test.go +++ b/beater/processors_test.go @@ -33,9 +33,9 @@ func TestRateLimitBatchProcessor(t *testing.T) { limiter := rate.NewLimiter(1, 10) ctx := ratelimit.ContextWithLimiter(context.Background(), limiter) - var batch model.Batch - for i := 0; i < 5; i++ { - batch.Transactions = append(batch.Transactions, &model.Transaction{}) + batch := make(model.Batch, 5) + for i := range batch { + batch[i].Transaction = &model.Transaction{} } for i := 0; i < 2; i++ { err := rateLimitBatchProcessor(ctx, &batch) diff --git a/beater/test_approved_es_documents/TestPublishIntegrationEvents.approved.json b/beater/test_approved_es_documents/TestPublishIntegrationEvents.approved.json index 142ce049f04..1252383861f 100644 --- a/beater/test_approved_es_documents/TestPublishIntegrationEvents.approved.json +++ b/beater/test_approved_es_documents/TestPublishIntegrationEvents.approved.json @@ -5,10 +5,10 @@ "agent": { "ephemeral_id": "e71be9ac-93b0-44b9-a997-5638f6ccfc36", "name": "java", - "version": "1.10.0-SNAPSHOT" + "version": "1.10.0" }, "client": { - "ip": "12.53.12.1" + "ip": "192.168.0.1" }, "container": { "id": "8ec7ceb990749e79b37f6dc6cd3628633618d6ce412553a552a0fa6b69419ad4" @@ -16,8 +16,131 @@ "ecs": { "version": "1.10.0" }, - "event": { - "outcome": "success" + "error": { + "culprit": "opbeans.controllers.DTInterceptor.preHandle(DTInterceptor.java:73)", + "custom": { + "and_objects": { + "foo": [ + "bar", + "baz" + ] + }, + "my_key": 1, + "some_other_value": "foobar" + }, + "exception": [ + { + "attributes": { + "foo": "bar" + }, + "code": "42", + "handled": false, + "message": "Theusernamerootisunknown", + "module": "org.springframework.http.client", + "stacktrace": [ + { + "abs_path": "/tmp/AbstractPlainSocketImpl.java", + "context": { + "post": [ + "line4", + "line5" + ], + "pre": [ + "line1", + "line2" + ] + }, + "exclude_from_grouping": false, + "filename": "AbstractPlainSocketImpl.java", + "function": "connect", + "library_frame": true, + "line": { + "column": 4, + "context": "3", + "number": 3 + }, + "module": "java.net", + "vars": { + "key": "value" + } + }, + { + "exclude_from_grouping": false, + "filename": "AbstractClientHttpRequest.java", + "function": "execute", + "line": { + "number": 102 + }, + "vars": { + "key": "value" + } + } + ], + "type": "java.net.UnknownHostException" + }, + { + "message": "something wrong writing a file", + "type": "InternalDbError" + }, + { + "message": "disk spinning way too fast", + "type": "VeryInternalDbError" + }, + { + "message": "on top of it,internet doesn't work", + "parent": 1, + "type": "ConnectionError" + } + ], + "grouping_key": "9a4054e958afe722b5877e8fac578ff3", + "id": "9876543210abcdeffedcba0123456789", + "log": { + "level": "error", + "logger_name": "http404", + "message": "Request method 'POST' not supported", + "param_message": "Request method 'POST' /events/:event not supported", + "stacktrace": [ + { + "abs_path": "/tmp/Socket.java", + "classname": "Request::Socket", + "context": { + "post": [ + "line4", + "line5" + ], + "pre": [ + "line1", + "line2" + ] + }, + "exclude_from_grouping": false, + "filename": "Socket.java", + "function": "connect", + "library_frame": true, + "line": { + "column": 4, + "context": "line3", + "number": 3 + }, + "module": "java.net", + "vars": { + "key": "value" + } + }, + { + "abs_path": "/tmp/SimpleBufferingClientHttpRequest.java", + "exclude_from_grouping": false, + "filename": "SimpleBufferingClientHttpRequest.java", + "function": "executeInternal", + "line": { + "number": 102 + }, + "vars": { + "key": "value" + } + } + ] + } }, "host": { "architecture": "amd64", @@ -31,13 +154,7 @@ "http": { "request": { "body": { - "original": { - "additional": { - "bar": 123, - "req": "additionalinformation" - }, - "string": "helloworld" - } + "original": "HelloWorld" }, "cookies": { "c1": "v1", @@ -48,29 +165,30 @@ "SERVER_SOFTWARE": "nginx" }, "headers": { - "Content-Type": [ - "text/html" + "Content-Length": [ + "0" ], "Cookie": [ - "c1=v1,c2=v2" + "c1=v1", + "c2=v2" ], "Elastic-Apm-Traceparent": [ - "00-33a0bd4cceff0370a7c57d807032688e-69feaabc5b88d7e8-01" + "00-8c21b4b556467a0b17ae5da959b5f388-31301f1fb2998121-01" ], - "User-Agent": [ - "Mozilla/5.0(Macintosh;IntelMacOSX10_10_5)AppleWebKit/537.36(KHTML,likeGecko)Chrome/51.0.2704.103Safari/537.36", - "MozillaChromeEdge" + "Forwarded": [ + "for=192.168.0.1" + ], + "Host": [ + "opbeans-java:3000" ] }, "method": "POST", "socket": { "encrypted": true, - "remote_address": "12.53.12.1:8080" + "remote_address": "12.53.12.1" } }, "response": { - "decoded_body_size": 401.9, - "encoded_body_size": 356.9, "finished": true, "headers": { "Content-Type": [ @@ -78,8 +196,7 @@ ] }, "headers_sent": true, - "status_code": 200, - "transfer_size": 300 + "status_code": 200 }, "version": "1.1" }, @@ -97,8 +214,7 @@ "ab_testing": true, "group": "experimental", "organization_uuid": "9f0e9d64-c185-4d21-a6f4-4673ed561ec8", - "segment": 5, - "wrapped_reporter": true + "segment": 5 }, "observer": { "ephemeral_id": "00000000-0000-0000-0000-000000000000", @@ -109,7 +225,7 @@ "version_major": 1 }, "parent": { - "id": "abcdefabcdef01234567" + "id": "9632587410abcdef" }, "process": { "args": [ @@ -120,22 +236,22 @@ "title": "/usr/lib/jvm/java-10-openjdk-amd64/bin/java" }, "processor": { - "event": "transaction", - "name": "transaction" + "event": "error", + "name": "error" }, "service": { "environment": "production", "framework": { - "name": "spring", - "version": "5.0.0" + "name": "Node", + "version": "1" }, "language": { "name": "Java", - "version": "10.0.2" + "version": "1.2" }, - "name": "experimental-java", + "name": "service1", "node": { - "name": "8ec7ceb990749e79b37f6dc6cd3628633618d6ce412553a552a0fa6b69419ad4" + "name": "node-xyz" }, "runtime": { "name": "Java", @@ -144,38 +260,18 @@ "version": "4.3.0" }, "source": { - "ip": "12.53.12.1" + "ip": "192.168.0.1" }, "timestamp": { "us": 1571657444929001 }, "trace": { - "id": "0acd456789abcdef0123456789abcdef" + "id": "0123456789abcdeffedcba0123456789" }, "transaction": { - "custom": { - "(": "notavalidregexandthatisfine", - "and_objects": { - "foo": [ - "bar", - "baz" - ] - }, - "my_key": 1, - "some_other_value": "foobar" - }, - "duration": { - "us": 32592 - }, - "id": "4340a8e0df1906ecbfa9", - "name": "ResourceHttpRequestHandler", - "result": "HTTP2xx", + "id": "1234567890987654", "sampled": true, - "span_count": { - "dropped": 0, - "started": 17 - }, - "type": "http" + "type": "request" }, "url": { "domain": "www.example.com", @@ -188,12 +284,9 @@ "scheme": "https" }, "user": { - "email": "foo@mail.com", + "email": "user@foo.mail", "id": "99", "name": "foo" - }, - "user_agent": { - "original": "Mozilla/5.0(Macintosh;IntelMacOSX10_10_5)AppleWebKit/537.36(KHTML,likeGecko)Chrome/51.0.2704.103Safari/537.36, MozillaChromeEdge" } }, { @@ -363,22 +456,20 @@ "agent": { "ephemeral_id": "e71be9ac-93b0-44b9-a997-5638f6ccfc36", "name": "java", - "version": "1.10.0" + "version": "1.10.0-SNAPSHOT" + }, + "client": { + "ip": "12.53.12.1" }, - "byte_counter": 1, "container": { "id": "8ec7ceb990749e79b37f6dc6cd3628633618d6ce412553a552a0fa6b69419ad4" }, - "dotted": { - "float": { - "gauge": 6.12 - } - }, - "double_gauge": 3.141592653589793, "ecs": { "version": "1.10.0" }, - "float_gauge": 9.16, + "event": { + "outcome": "success" + }, "host": { "architecture": "amd64", "hostname": "node-name", @@ -388,39 +479,78 @@ "platform": "Linux" } }, - "integer_gauge": 42767, - "kubernetes": { - "namespace": "default", - "node": { - "name": "node-name" - }, - "pod": { - "name": "instrumented-java-service", - "uid": "b17f231da0ad128dc6c6c0b2e82f6f303d3893e3" - } - }, - "labels": { - "ab_testing": true, - "code": 200, - "group": "experimental", - "segment": 5, - "success": true - }, - "long_gauge": 3147483648, - "metricset.name": "span_breakdown", - "negative": { - "d": { - "o": { - "t": { - "t": { - "e": { - "d": -1022 - } - } + "http": { + "request": { + "body": { + "original": { + "additional": { + "bar": 123, + "req": "additionalinformation" + }, + "string": "helloworld" } + }, + "cookies": { + "c1": "v1", + "c2": "v2" + }, + "env": { + "GATEWAY_INTERFACE": "CGI/1.1", + "SERVER_SOFTWARE": "nginx" + }, + "headers": { + "Content-Type": [ + "text/html" + ], + "Cookie": [ + "c1=v1,c2=v2" + ], + "Elastic-Apm-Traceparent": [ + "00-33a0bd4cceff0370a7c57d807032688e-69feaabc5b88d7e8-01" + ], + "User-Agent": [ + "Mozilla/5.0(Macintosh;IntelMacOSX10_10_5)AppleWebKit/537.36(KHTML,likeGecko)Chrome/51.0.2704.103Safari/537.36", + "MozillaChromeEdge" + ] + }, + "method": "POST", + "socket": { + "encrypted": true, + "remote_address": "12.53.12.1:8080" } + }, + "response": { + "decoded_body_size": 401.9, + "encoded_body_size": 356.9, + "finished": true, + "headers": { + "Content-Type": [ + "application/json" + ] + }, + "headers_sent": true, + "status_code": 200, + "transfer_size": 300 + }, + "version": "1.1" + }, + "kubernetes": { + "namespace": "default", + "node": { + "name": "node-name" + }, + "pod": { + "name": "instrumented-java-service", + "uid": "b17f231da0ad128dc6c6c0b2e82f6f303d3893e3" } }, + "labels": { + "ab_testing": true, + "group": "experimental", + "organization_uuid": "9f0e9d64-c185-4d21-a6f4-4673ed561ec8", + "segment": 5, + "wrapped_reporter": true + }, "observer": { "ephemeral_id": "00000000-0000-0000-0000-000000000000", "hostname": "", @@ -429,6 +559,9 @@ "version": "1.2.3", "version_major": 1 }, + "parent": { + "id": "abcdefabcdef01234567" + }, "process": { "args": [ "-v" @@ -438,8 +571,8 @@ "title": "/usr/lib/jvm/java-10-openjdk-amd64/bin/java" }, "processor": { - "event": "metric", - "name": "metric" + "event": "transaction", + "name": "transaction" }, "service": { "environment": "production", @@ -451,7 +584,7 @@ "name": "Java", "version": "10.0.2" }, - "name": "1234_service-12a3", + "name": "experimental-java", "node": { "name": "8ec7ceb990749e79b37f6dc6cd3628633618d6ce412553a552a0fa6b69419ad4" }, @@ -461,35 +594,57 @@ }, "version": "4.3.0" }, - "short_counter": 227, - "span": { - "self_time": { - "count": 1, - "sum": { - "us": 633.288 - } - }, - "subtype": "mysql", - "type": "db" + "source": { + "ip": "12.53.12.1" + }, + "timestamp": { + "us": 1571657444929001 + }, + "trace": { + "id": "0acd456789abcdef0123456789abcdef" }, "transaction": { - "breakdown": { - "count": 12 + "custom": { + "(": "notavalidregexandthatisfine", + "and_objects": { + "foo": [ + "bar", + "baz" + ] + }, + "my_key": 1, + "some_other_value": "foobar" }, "duration": { - "count": 2, - "sum": { - "us": 12 - } + "us": 32592 }, - "name": "GET/", - "self_time": { - "count": 2, - "sum": { - "us": 10 - } + "id": "4340a8e0df1906ecbfa9", + "name": "ResourceHttpRequestHandler", + "result": "HTTP2xx", + "sampled": true, + "span_count": { + "dropped": 0, + "started": 17 }, - "type": "request" + "type": "http" + }, + "url": { + "domain": "www.example.com", + "fragment": "#hash", + "full": "https://www.example.com/p/a/t/h?query=string#hash", + "original": "/p/a/t/h?query=string#hash", + "path": "/p/a/t/h", + "port": 8080, + "query": "?query=string", + "scheme": "https" + }, + "user": { + "email": "foo@mail.com", + "id": "99", + "name": "foo" + }, + "user_agent": { + "original": "Mozilla/5.0(Macintosh;IntelMacOSX10_10_5)AppleWebKit/537.36(KHTML,likeGecko)Chrome/51.0.2704.103Safari/537.36, MozillaChromeEdge" } }, { @@ -499,141 +654,20 @@ "name": "java", "version": "1.10.0" }, - "client": { - "ip": "192.168.0.1" - }, + "byte_counter": 1, "container": { "id": "8ec7ceb990749e79b37f6dc6cd3628633618d6ce412553a552a0fa6b69419ad4" }, + "dotted": { + "float": { + "gauge": 6.12 + } + }, + "double_gauge": 3.141592653589793, "ecs": { "version": "1.10.0" }, - "error": { - "culprit": "opbeans.controllers.DTInterceptor.preHandle(DTInterceptor.java:73)", - "custom": { - "and_objects": { - "foo": [ - "bar", - "baz" - ] - }, - "my_key": 1, - "some_other_value": "foobar" - }, - "exception": [ - { - "attributes": { - "foo": "bar" - }, - "code": "42", - "handled": false, - "message": "Theusernamerootisunknown", - "module": "org.springframework.http.client", - "stacktrace": [ - { - "abs_path": "/tmp/AbstractPlainSocketImpl.java", - "context": { - "post": [ - "line4", - "line5" - ], - "pre": [ - "line1", - "line2" - ] - }, - "exclude_from_grouping": false, - "filename": "AbstractPlainSocketImpl.java", - "function": "connect", - "library_frame": true, - "line": { - "column": 4, - "context": "3", - "number": 3 - }, - "module": "java.net", - "vars": { - "key": "value" - } - }, - { - "exclude_from_grouping": false, - "filename": "AbstractClientHttpRequest.java", - "function": "execute", - "line": { - "number": 102 - }, - "vars": { - "key": "value" - } - } - ], - "type": "java.net.UnknownHostException" - }, - { - "message": "something wrong writing a file", - "type": "InternalDbError" - }, - { - "message": "disk spinning way too fast", - "type": "VeryInternalDbError" - }, - { - "message": "on top of it,internet doesn't work", - "parent": 1, - "type": "ConnectionError" - } - ], - "grouping_key": "9a4054e958afe722b5877e8fac578ff3", - "id": "9876543210abcdeffedcba0123456789", - "log": { - "level": "error", - "logger_name": "http404", - "message": "Request method 'POST' not supported", - "param_message": "Request method 'POST' /events/:event not supported", - "stacktrace": [ - { - "abs_path": "/tmp/Socket.java", - "classname": "Request::Socket", - "context": { - "post": [ - "line4", - "line5" - ], - "pre": [ - "line1", - "line2" - ] - }, - "exclude_from_grouping": false, - "filename": "Socket.java", - "function": "connect", - "library_frame": true, - "line": { - "column": 4, - "context": "line3", - "number": 3 - }, - "module": "java.net", - "vars": { - "key": "value" - } - }, - { - "abs_path": "/tmp/SimpleBufferingClientHttpRequest.java", - "exclude_from_grouping": false, - "filename": "SimpleBufferingClientHttpRequest.java", - "function": "executeInternal", - "line": { - "number": 102 - }, - "vars": { - "key": "value" - } - } - ] - } - }, + "float_gauge": 9.16, "host": { "architecture": "amd64", "hostname": "node-name", @@ -643,55 +677,7 @@ "platform": "Linux" } }, - "http": { - "request": { - "body": { - "original": "HelloWorld" - }, - "cookies": { - "c1": "v1", - "c2": "v2" - }, - "env": { - "GATEWAY_INTERFACE": "CGI/1.1", - "SERVER_SOFTWARE": "nginx" - }, - "headers": { - "Content-Length": [ - "0" - ], - "Cookie": [ - "c1=v1", - "c2=v2" - ], - "Elastic-Apm-Traceparent": [ - "00-8c21b4b556467a0b17ae5da959b5f388-31301f1fb2998121-01" - ], - "Forwarded": [ - "for=192.168.0.1" - ], - "Host": [ - "opbeans-java:3000" - ] - }, - "method": "POST", - "socket": { - "encrypted": true, - "remote_address": "12.53.12.1" - } - }, - "response": { - "finished": true, - "headers": { - "Content-Type": [ - "application/json" - ] - }, - "headers_sent": true, - "status_code": 200 - }, - "version": "1.1" - }, + "integer_gauge": 42767, "kubernetes": { "namespace": "default", "node": { @@ -704,9 +690,25 @@ }, "labels": { "ab_testing": true, + "code": 200, "group": "experimental", - "organization_uuid": "9f0e9d64-c185-4d21-a6f4-4673ed561ec8", - "segment": 5 + "segment": 5, + "success": true + }, + "long_gauge": 3147483648, + "metricset.name": "span_breakdown", + "negative": { + "d": { + "o": { + "t": { + "t": { + "e": { + "d": -1022 + } + } + } + } + } }, "observer": { "ephemeral_id": "00000000-0000-0000-0000-000000000000", @@ -716,9 +718,6 @@ "version": "1.2.3", "version_major": 1 }, - "parent": { - "id": "9632587410abcdef" - }, "process": { "args": [ "-v" @@ -728,22 +727,22 @@ "title": "/usr/lib/jvm/java-10-openjdk-amd64/bin/java" }, "processor": { - "event": "error", - "name": "error" + "event": "metric", + "name": "metric" }, "service": { "environment": "production", "framework": { - "name": "Node", - "version": "1" + "name": "spring", + "version": "5.0.0" }, "language": { "name": "Java", - "version": "1.2" + "version": "10.0.2" }, - "name": "service1", + "name": "1234_service-12a3", "node": { - "name": "node-xyz" + "name": "8ec7ceb990749e79b37f6dc6cd3628633618d6ce412553a552a0fa6b69419ad4" }, "runtime": { "name": "Java", @@ -751,34 +750,35 @@ }, "version": "4.3.0" }, - "source": { - "ip": "192.168.0.1" - }, - "timestamp": { - "us": 1571657444929001 - }, - "trace": { - "id": "0123456789abcdeffedcba0123456789" + "short_counter": 227, + "span": { + "self_time": { + "count": 1, + "sum": { + "us": 633.288 + } + }, + "subtype": "mysql", + "type": "db" }, "transaction": { - "id": "1234567890987654", - "sampled": true, + "breakdown": { + "count": 12 + }, + "duration": { + "count": 2, + "sum": { + "us": 12 + } + }, + "name": "GET/", + "self_time": { + "count": 2, + "sum": { + "us": 10 + } + }, "type": "request" - }, - "url": { - "domain": "www.example.com", - "fragment": "#hash", - "full": "https://www.example.com/p/a/t/h?query=string#hash", - "original": "/p/a/t/h?query=string#hash", - "path": "/p/a/t/h", - "port": 8080, - "query": "?query=string", - "scheme": "https" - }, - "user": { - "email": "user@foo.mail", - "id": "99", - "name": "foo" } } ] diff --git a/beater/test_approved_es_documents/TestPublishIntegrationMinimalEvents.approved.json b/beater/test_approved_es_documents/TestPublishIntegrationMinimalEvents.approved.json index 412795593fc..c6733268a0a 100644 --- a/beater/test_approved_es_documents/TestPublishIntegrationMinimalEvents.approved.json +++ b/beater/test_approved_es_documents/TestPublishIntegrationMinimalEvents.approved.json @@ -9,15 +9,16 @@ "ecs": { "version": "1.10.0" }, - "event": { - "outcome": "unknown" + "error": { + "grouping_key": "0b9cba09845a097a271c6beb4c6207f3", + "id": "abcdef0123456789", + "log": { + "message": "error log message" + } }, "host": { "ip": "127.0.0.1" }, - "labels": { - "wrapped_reporter": true - }, "observer": { "ephemeral_id": "00000000-0000-0000-0000-000000000000", "hostname": "", @@ -27,28 +28,14 @@ "version_major": 1 }, "processor": { - "event": "transaction", - "name": "transaction" + "event": "error", + "name": "error" }, "service": { "name": "1234_service-12a3" }, "timestamp": { "us": 1547070053000000 - }, - "trace": { - "id": "01234567890123456789abcdefabcdef" - }, - "transaction": { - "duration": { - "us": 32592 - }, - "id": "abcdef1478523690", - "sampled": true, - "span_count": { - "started": 0 - }, - "type": "request" } }, { @@ -60,8 +47,14 @@ "ecs": { "version": "1.10.0" }, - "event": { - "outcome": "unknown" + "error": { + "exception": [ + { + "message": "error exception message" + } + ], + "grouping_key": "3a1fb5609458fbb132b44d8fc7cde104", + "id": "abcdef0123456790" }, "host": { "ip": "127.0.0.1" @@ -74,36 +67,19 @@ "version": "1.2.3", "version_major": 1 }, - "parent": { - "id": "ab23456a89012345" - }, "processor": { - "event": "span", - "name": "transaction" + "event": "error", + "name": "error" }, "service": { "name": "1234_service-12a3" }, - "span": { - "duration": { - "us": 3564 - }, - "id": "0123456a89012345", - "name": "GET /api/types", - "start": { - "us": 1845 - }, - "type": "request" - }, "timestamp": { "us": 1547070053000000 - }, - "trace": { - "id": "0123456789abcdef0123456789abcdef" } }, { - "@timestamp": "2018-08-30T18:53:27.154Z", + "@timestamp": "2019-01-09T21:40:53.000Z", "agent": { "name": "elastic-node", "version": "3.14.0" @@ -111,8 +87,14 @@ "ecs": { "version": "1.10.0" }, - "event": { - "outcome": "unknown" + "error": { + "exception": [ + { + "type": "error exception type" + } + ], + "grouping_key": "fa405fa2bd848dab17207e7b544d9ad4", + "id": "abcdef0123456791" }, "host": { "ip": "127.0.0.1" @@ -125,34 +107,19 @@ "version": "1.2.3", "version_major": 1 }, - "parent": { - "id": "ab23456a89012345" - }, "processor": { - "event": "span", - "name": "transaction" + "event": "error", + "name": "error" }, "service": { "name": "1234_service-12a3" }, - "span": { - "duration": { - "us": 3564 - }, - "id": "0123456a89012345", - "name": "GET /api/types", - "type": "request" - }, "timestamp": { - "us": 1535655207154000 - }, - "trace": { - "id": "0123456789abcdef0123456789abcdef" + "us": 1547070053000000 } }, { - "@timestamp": "2017-05-30T18:53:42.281Z", - "a": 3.2, + "@timestamp": "2019-01-09T21:40:53.000Z", "agent": { "name": "elastic-node", "version": "3.14.0" @@ -160,10 +127,12 @@ "ecs": { "version": "1.10.0" }, + "event": { + "outcome": "unknown" + }, "host": { "ip": "127.0.0.1" }, - "metricset.name": "app", "observer": { "ephemeral_id": "00000000-0000-0000-0000-000000000000", "hostname": "", @@ -172,16 +141,36 @@ "version": "1.2.3", "version_major": 1 }, + "parent": { + "id": "ab23456a89012345" + }, "processor": { - "event": "metric", - "name": "metric" + "event": "span", + "name": "transaction" }, "service": { "name": "1234_service-12a3" + }, + "span": { + "duration": { + "us": 3564 + }, + "id": "0123456a89012345", + "name": "GET /api/types", + "start": { + "us": 1845 + }, + "type": "request" + }, + "timestamp": { + "us": 1547070053000000 + }, + "trace": { + "id": "0123456789abcdef0123456789abcdef" } }, { - "@timestamp": "2019-01-09T21:40:53.000Z", + "@timestamp": "2018-08-30T18:53:27.154Z", "agent": { "name": "elastic-node", "version": "3.14.0" @@ -189,12 +178,8 @@ "ecs": { "version": "1.10.0" }, - "error": { - "grouping_key": "0b9cba09845a097a271c6beb4c6207f3", - "id": "abcdef0123456789", - "log": { - "message": "error log message" - } + "event": { + "outcome": "unknown" }, "host": { "ip": "127.0.0.1" @@ -207,15 +192,29 @@ "version": "1.2.3", "version_major": 1 }, + "parent": { + "id": "ab23456a89012345" + }, "processor": { - "event": "error", - "name": "error" + "event": "span", + "name": "transaction" }, "service": { "name": "1234_service-12a3" }, + "span": { + "duration": { + "us": 3564 + }, + "id": "0123456a89012345", + "name": "GET /api/types", + "type": "request" + }, "timestamp": { - "us": 1547070053000000 + "us": 1535655207154000 + }, + "trace": { + "id": "0123456789abcdef0123456789abcdef" } }, { @@ -227,18 +226,15 @@ "ecs": { "version": "1.10.0" }, - "error": { - "exception": [ - { - "message": "error exception message" - } - ], - "grouping_key": "3a1fb5609458fbb132b44d8fc7cde104", - "id": "abcdef0123456790" + "event": { + "outcome": "unknown" }, "host": { "ip": "127.0.0.1" }, + "labels": { + "wrapped_reporter": true + }, "observer": { "ephemeral_id": "00000000-0000-0000-0000-000000000000", "hostname": "", @@ -248,18 +244,33 @@ "version_major": 1 }, "processor": { - "event": "error", - "name": "error" + "event": "transaction", + "name": "transaction" }, "service": { "name": "1234_service-12a3" }, "timestamp": { "us": 1547070053000000 + }, + "trace": { + "id": "01234567890123456789abcdefabcdef" + }, + "transaction": { + "duration": { + "us": 32592 + }, + "id": "abcdef1478523690", + "sampled": true, + "span_count": { + "started": 0 + }, + "type": "request" } }, { - "@timestamp": "2019-01-09T21:40:53.000Z", + "@timestamp": "2017-05-30T18:53:42.281Z", + "a": 3.2, "agent": { "name": "elastic-node", "version": "3.14.0" @@ -267,18 +278,10 @@ "ecs": { "version": "1.10.0" }, - "error": { - "exception": [ - { - "type": "error exception type" - } - ], - "grouping_key": "fa405fa2bd848dab17207e7b544d9ad4", - "id": "abcdef0123456791" - }, "host": { "ip": "127.0.0.1" }, + "metricset.name": "app", "observer": { "ephemeral_id": "00000000-0000-0000-0000-000000000000", "hostname": "", @@ -288,14 +291,11 @@ "version_major": 1 }, "processor": { - "event": "error", - "name": "error" + "event": "metric", + "name": "metric" }, "service": { "name": "1234_service-12a3" - }, - "timestamp": { - "us": 1547070053000000 } } ] diff --git a/model/apmevent.go b/model/apmevent.go new file mode 100644 index 00000000000..b8c98866e3c --- /dev/null +++ b/model/apmevent.go @@ -0,0 +1,40 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package model + +import ( + "context" + + "github.com/elastic/apm-server/transform" + "github.com/elastic/beats/v7/libbeat/beat" +) + +// APMEvent holds the details of an APM event. +// +// Exactly one of the event fields should be non-nil. +type APMEvent struct { + Transaction *Transaction + Span *Span + Metricset *Metricset + Error *Error + Profile *PprofProfile +} + +func (e *APMEvent) Transform(ctx context.Context, cfg *transform.Config) []beat.Event { + return nil +} diff --git a/model/batch.go b/model/batch.go index 9cffc65cf3b..a9190bfb062 100644 --- a/model/batch.go +++ b/model/batch.go @@ -42,46 +42,25 @@ func (f ProcessBatchFunc) ProcessBatch(ctx context.Context, b *Batch) error { return f(ctx, b) } -type Batch struct { - Transactions []*Transaction - Spans []*Span - Metricsets []*Metricset - Errors []*Error - Profiles []*PprofProfile -} - -// Reset resets the batch to be empty, but it retains the underlying storage. -func (b *Batch) Reset() { - b.Transactions = b.Transactions[:0] - b.Spans = b.Spans[:0] - b.Metricsets = b.Metricsets[:0] - b.Errors = b.Errors[:0] - b.Profiles = b.Profiles[:0] -} - -func (b *Batch) Len() int { - if b == nil { - return 0 - } - return len(b.Transactions) + len(b.Spans) + len(b.Metricsets) + len(b.Errors) + len(b.Profiles) -} +// Batch is a collection of APM events. +type Batch []APMEvent +// Transform transforms all events in the batch, in sequence. func (b *Batch) Transform(ctx context.Context, cfg *transform.Config) []beat.Event { - events := make([]beat.Event, 0, b.Len()) - for _, event := range b.Transactions { - events = event.appendBeatEvents(cfg, events) - } - for _, event := range b.Spans { - events = event.appendBeatEvents(ctx, cfg, events) - } - for _, event := range b.Metricsets { - events = event.appendBeatEvents(cfg, events) - } - for _, event := range b.Errors { - events = event.appendBeatEvents(ctx, cfg, events) - } - for _, event := range b.Profiles { - events = event.appendBeatEvents(cfg, events) + out := make([]beat.Event, 0, len(*b)) + for _, event := range *b { + switch { + case event.Transaction != nil: + out = event.Transaction.appendBeatEvents(cfg, out) + case event.Span != nil: + out = event.Span.appendBeatEvents(ctx, cfg, out) + case event.Metricset != nil: + out = event.Metricset.appendBeatEvents(cfg, out) + case event.Error != nil: + out = event.Error.appendBeatEvents(ctx, cfg, out) + case event.Profile != nil: + out = event.Profile.appendBeatEvents(cfg, out) + } } - return events + return out } diff --git a/model/modelprocessor/environment_test.go b/model/modelprocessor/environment_test.go index 842ceb7345a..aac424b389a 100644 --- a/model/modelprocessor/environment_test.go +++ b/model/modelprocessor/environment_test.go @@ -45,55 +45,35 @@ func testProcessBatchMetadata(t *testing.T, processor model.BatchProcessor, in, // Check that the model.Batch fields have not changed since this // test was last updated, to ensure we process all model types. - var batchFields []string - typ := reflect.TypeOf(model.Batch{}) + var apmEventFields []string + typ := reflect.TypeOf(model.APMEvent{}) for i := 0; i < typ.NumField(); i++ { - batchFields = append(batchFields, typ.Field(i).Name) + apmEventFields = append(apmEventFields, typ.Field(i).Name) } assert.ElementsMatch(t, []string{ - "Transactions", - "Spans", - "Metricsets", - "Errors", - "Profiles", - }, batchFields) + "Transaction", + "Span", + "Metricset", + "Error", + "Profile", + }, apmEventFields) batch := &model.Batch{ - Transactions: []*model.Transaction{ - {Metadata: in}, - }, - Spans: []*model.Span{ - {Metadata: in}, - }, - Metricsets: []*model.Metricset{ - {Metadata: in}, - }, - Errors: []*model.Error{ - {Metadata: in}, - }, - Profiles: []*model.PprofProfile{ - {Metadata: in}, - }, + {Transaction: &model.Transaction{Metadata: in}}, + {Span: &model.Span{Metadata: in}}, + {Metricset: &model.Metricset{Metadata: in}}, + {Error: &model.Error{Metadata: in}}, + {Profile: &model.PprofProfile{Metadata: in}}, } err := processor.ProcessBatch(context.Background(), batch) require.NoError(t, err) expected := &model.Batch{ - Transactions: []*model.Transaction{ - {Metadata: out}, - }, - Spans: []*model.Span{ - {Metadata: out}, - }, - Metricsets: []*model.Metricset{ - {Metadata: out}, - }, - Errors: []*model.Error{ - {Metadata: out}, - }, - Profiles: []*model.PprofProfile{ - {Metadata: out}, - }, + {Transaction: &model.Transaction{Metadata: out}}, + {Span: &model.Span{Metadata: out}}, + {Metricset: &model.Metricset{Metadata: out}}, + {Error: &model.Error{Metadata: out}}, + {Profile: &model.PprofProfile{Metadata: out}}, } assert.Equal(t, expected, batch) } diff --git a/model/modelprocessor/metadata.go b/model/modelprocessor/metadata.go index 527a81fb619..8d542442cb4 100644 --- a/model/modelprocessor/metadata.go +++ b/model/modelprocessor/metadata.go @@ -29,29 +29,28 @@ type MetadataProcessorFunc func(ctx context.Context, meta *model.Metadata) error // ProcessBatch calls f with the metadata of each event in b. func (f MetadataProcessorFunc) ProcessBatch(ctx context.Context, b *model.Batch) error { - for _, event := range b.Transactions { - if err := f(ctx, &event.Metadata); err != nil { - return err - } - } - for _, event := range b.Spans { - if err := f(ctx, &event.Metadata); err != nil { - return err - } - } - for _, event := range b.Metricsets { - if err := f(ctx, &event.Metadata); err != nil { - return err - } - } - for _, event := range b.Errors { - if err := f(ctx, &event.Metadata); err != nil { - return err - } - } - for _, event := range b.Profiles { - if err := f(ctx, &event.Metadata); err != nil { - return err + for _, event := range *b { + switch { + case event.Transaction != nil: + if err := f(ctx, &event.Transaction.Metadata); err != nil { + return err + } + case event.Span != nil: + if err := f(ctx, &event.Span.Metadata); err != nil { + return err + } + case event.Metricset != nil: + if err := f(ctx, &event.Metricset.Metadata); err != nil { + return err + } + case event.Error != nil: + if err := f(ctx, &event.Error.Metadata); err != nil { + return err + } + case event.Profile != nil: + if err := f(ctx, &event.Profile.Metadata); err != nil { + return err + } } } return nil diff --git a/model/modelprocessor/metricsetname.go b/model/modelprocessor/metricsetname.go index c153d656033..71317181fa6 100644 --- a/model/modelprocessor/metricsetname.go +++ b/model/modelprocessor/metricsetname.go @@ -39,8 +39,9 @@ type SetMetricsetName struct{} // will be given a specific name, while all other metrics will be given the name // "app". func (SetMetricsetName) ProcessBatch(ctx context.Context, b *model.Batch) error { - for _, ms := range b.Metricsets { - if ms.Name != "" || len(ms.Samples) == 0 { + for _, event := range *b { + ms := event.Metricset + if ms == nil || ms.Name != "" || len(ms.Samples) == 0 { continue } ms.Name = appMetricsetName diff --git a/model/modelprocessor/metricsetname_test.go b/model/modelprocessor/metricsetname_test.go index a9fa34e0620..c89d1409ac7 100644 --- a/model/modelprocessor/metricsetname_test.go +++ b/model/modelprocessor/metricsetname_test.go @@ -69,11 +69,11 @@ func TestSetMetricsetName(t *testing.T) { }} for _, test := range tests { - batch := &model.Batch{Metricsets: []*model.Metricset{&test.metricset}} + batch := model.Batch{{Metricset: &test.metricset}} processor := modelprocessor.SetMetricsetName{} - err := processor.ProcessBatch(context.Background(), batch) + err := processor.ProcessBatch(context.Background(), &batch) assert.NoError(t, err) - assert.Equal(t, test.name, batch.Metricsets[0].Name) + assert.Equal(t, test.name, batch[0].Metricset.Name) } } diff --git a/model/profile_test.go b/model/profile_test.go index c5c872606d1..bd935487520 100644 --- a/model/profile_test.go +++ b/model/profile_test.go @@ -87,7 +87,7 @@ func TestPprofProfileTransform(t *testing.T) { }, } - batch := &model.Batch{Profiles: []*model.PprofProfile{&pp}} + batch := &model.Batch{{Profile: &pp}} output := batch.Transform(context.Background(), &transform.Config{DataStreams: true}) require.Len(t, output, 2) assert.Equal(t, output[0], output[1]) diff --git a/processor/otel/consumer.go b/processor/otel/consumer.go index 2232d29aa6d..383d942ae7f 100644 --- a/processor/otel/consumer.go +++ b/processor/otel/consumer.go @@ -202,7 +202,7 @@ func (c *Consumer) convertSpan( Outcome: spanStatusOutcome(otelSpan.Status()), } translateTransaction(otelSpan, otelLibrary, metadata, &transactionBuilder{Transaction: transaction}) - out.Transactions = append(out.Transactions, transaction) + *out = append(*out, model.APMEvent{Transaction: transaction}) } else { span = &model.Span{ Metadata: metadata, @@ -215,7 +215,7 @@ func (c *Consumer) convertSpan( Outcome: spanStatusOutcome(otelSpan.Status()), } translateSpan(otelSpan, metadata, span) - out.Spans = append(out.Spans, span) + *out = append(*out, model.APMEvent{Span: span}) } events := otelSpan.Events() @@ -816,7 +816,7 @@ func convertSpanEvent( if span != nil { addSpanCtxToErr(span, e) } - out.Errors = append(out.Errors, e) + *out = append(*out, model.APMEvent{Error: e}) } } diff --git a/processor/otel/consumer_test.go b/processor/otel/consumer_test.go index ff7b8dd3025..8cf54287528 100644 --- a/processor/otel/consumer_test.go +++ b/processor/otel/consumer_test.go @@ -90,12 +90,11 @@ func TestOutcome(t *testing.T) { spans.Spans().Append(otelSpan1) spans.Spans().Append(otelSpan2) batch := transformTraces(t, traces) - require.Len(t, batch.Transactions, 1) - require.Len(t, batch.Spans, 1) + require.Len(t, batch, 2) - assert.Equal(t, expectedOutcome, batch.Transactions[0].Outcome) - assert.Equal(t, expectedResult, batch.Transactions[0].Result) - assert.Equal(t, expectedOutcome, batch.Spans[0].Outcome) + assert.Equal(t, expectedOutcome, batch[0].Transaction.Outcome) + assert.Equal(t, expectedResult, batch[0].Transaction.Result) + assert.Equal(t, expectedOutcome, batch[1].Span.Outcome) } test(t, "unknown", "", pdata.StatusCodeUnset) @@ -116,11 +115,10 @@ func TestRepresentativeCount(t *testing.T) { spans.Spans().Append(otelSpan1) spans.Spans().Append(otelSpan2) batch := transformTraces(t, traces) - require.Len(t, batch.Transactions, 1) - require.Len(t, batch.Spans, 1) + require.Len(t, batch, 2) - assert.Equal(t, 1.0, batch.Transactions[0].RepresentativeCount) - assert.Equal(t, 1.0, batch.Spans[0].RepresentativeCount) + assert.Equal(t, 1.0, batch[0].Transaction.RepresentativeCount) + assert.Equal(t, 1.0, batch[1].Span.RepresentativeCount) } func TestHTTPTransactionURL(t *testing.T) { @@ -470,7 +468,7 @@ func TestInstrumentationLibrary(t *testing.T) { otelSpan.SetSpanID(pdata.NewSpanID([8]byte{2})) spans.Spans().Append(otelSpan) events := transformTraces(t, traces) - tx := events.Transactions[0] + tx := events[0].Transaction assert.Equal(t, "library-name", tx.Metadata.Service.Framework.Name) assert.Equal(t, "1.2.3", tx.Metadata.Service.Framework.Version) @@ -633,18 +631,16 @@ func TestConsumeTracesExportTimestamp(t *testing.T) { otelSpan2.Events().Append(otelSpanEvent) batch := transformTraces(t, traces) - require.Len(t, batch.Transactions, 1) - require.Len(t, batch.Spans, 1) - require.Len(t, batch.Errors, 1) + require.Len(t, batch, 3) // Give some leeway for one event, and check other events' timestamps relative to that one. - assert.InDelta(t, now.Add(transactionOffset).Unix(), batch.Transactions[0].Timestamp.Unix(), allowedError) - assert.Equal(t, spanOffset-transactionOffset, batch.Spans[0].Timestamp.Sub(batch.Transactions[0].Timestamp)) - assert.Equal(t, exceptionOffset-transactionOffset, batch.Errors[0].Timestamp.Sub(batch.Transactions[0].Timestamp)) + assert.InDelta(t, now.Add(transactionOffset).Unix(), batch[0].Transaction.Timestamp.Unix(), allowedError) + assert.Equal(t, spanOffset-transactionOffset, batch[1].Span.Timestamp.Sub(batch[0].Transaction.Timestamp)) + assert.Equal(t, exceptionOffset-transactionOffset, batch[2].Error.Timestamp.Sub(batch[0].Transaction.Timestamp)) // Durations should be unaffected. - assert.Equal(t, float64(transactionDuration.Milliseconds()), batch.Transactions[0].Duration) - assert.Equal(t, float64(spanDuration.Milliseconds()), batch.Spans[0].Duration) + assert.Equal(t, float64(transactionDuration.Milliseconds()), batch[0].Transaction.Duration) + assert.Equal(t, float64(spanDuration.Milliseconds()), batch[1].Span.Duration) } func TestConsumer_JaegerMetadata(t *testing.T) { @@ -737,14 +733,14 @@ func TestConsumer_JaegerSampleRate(t *testing.T) { recorder := batchRecorderBatchProcessor(&batches) require.NoError(t, (&otel.Consumer{Processor: recorder}).ConsumeTraces(context.Background(), traces)) require.Len(t, batches, 1) - batch := batches[0] + batch := *batches[0] events := transformBatch(context.Background(), batches...) approveEvents(t, "jaeger_sampling_rate", events) - tx1 := batch.Transactions[0] - tx2 := batch.Transactions[1] - span := batch.Spans[0] + tx1 := batch[0].Transaction + span := batch[1].Span + tx2 := batch[2].Transaction assert.Equal(t, 1.25 /* 1/0.8 */, tx1.RepresentativeCount) assert.Equal(t, 2.5 /* 1/0.4 */, span.RepresentativeCount) assert.Zero(t, tx2.RepresentativeCount) // not set for non-probabilistic @@ -767,8 +763,9 @@ func TestConsumer_JaegerTraceID(t *testing.T) { traces := jaegertranslator.ProtoBatchToInternalTraces(jaegerBatch) require.NoError(t, (&otel.Consumer{Processor: recorder}).ConsumeTraces(context.Background(), traces)) - assert.Equal(t, "00000000000000000000000046467830", batches[0].Transactions[0].TraceID) - assert.Equal(t, "00000000464678300000000046467830", batches[0].Transactions[1].TraceID) + batch := *batches[0] + assert.Equal(t, "00000000000000000000000046467830", batch[0].Transaction.TraceID) + assert.Equal(t, "00000000464678300000000046467830", batch[1].Transaction.TraceID) } func TestConsumer_JaegerTransaction(t *testing.T) { @@ -1031,8 +1028,9 @@ func TestJaegerServiceVersion(t *testing.T) { recorder := batchRecorderBatchProcessor(&batches) require.NoError(t, (&otel.Consumer{Processor: recorder}).ConsumeTraces(context.Background(), traces)) - assert.Equal(t, "process_tag_value", batches[0].Transactions[0].Metadata.Service.Version) - assert.Equal(t, "span_tag_value", batches[0].Transactions[1].Metadata.Service.Version) + batch := *batches[0] + assert.Equal(t, "process_tag_value", batch[0].Transaction.Metadata.Service.Version) + assert.Equal(t, "span_tag_value", batch[1].Transaction.Metadata.Service.Version) } func TestTracesLogging(t *testing.T) { @@ -1191,7 +1189,7 @@ func transformTransactionWithAttributes(t *testing.T, attrs map[string]pdata.Att otelSpan.Attributes().InitFromMap(attrs) spans.Spans().Append(otelSpan) events := transformTraces(t, traces) - return events.Transactions[0] + return events[0].Transaction } func transformSpanWithAttributes(t *testing.T, attrs map[string]pdata.AttributeValue, configFns ...func(pdata.Span)) *model.Span { @@ -1206,7 +1204,7 @@ func transformSpanWithAttributes(t *testing.T, attrs map[string]pdata.AttributeV otelSpan.Attributes().InitFromMap(attrs) spans.Spans().Append(otelSpan) events := transformTraces(t, traces) - return events.Spans[0] + return events[0].Span } func transformTransactionSpanEvents(t *testing.T, language string, spanEvents ...pdata.SpanEvent) (*model.Transaction, []*model.Error) { @@ -1223,16 +1221,21 @@ func transformTransactionSpanEvents(t *testing.T, language string, spanEvents .. spans.Spans().Append(otelSpan) events := transformTraces(t, traces) require.NotEmpty(t, events) - return events.Transactions[0], events.Errors + + errors := make([]*model.Error, len(events)-1) + for i, event := range events[1:] { + errors[i] = event.Error + } + return events[0].Transaction, errors } -func transformTraces(t *testing.T, traces pdata.Traces) *model.Batch { - var processed *model.Batch +func transformTraces(t *testing.T, traces pdata.Traces) model.Batch { + var processed model.Batch processor := model.ProcessBatchFunc(func(ctx context.Context, batch *model.Batch) error { if processed != nil { panic("already processes batch") } - processed = batch + processed = *batch return nil }) require.NoError(t, (&otel.Consumer{Processor: processor}).ConsumeTraces(context.Background(), traces)) diff --git a/processor/otel/metadata_test.go b/processor/otel/metadata_test.go index 647123e9eff..df281c19710 100644 --- a/processor/otel/metadata_test.go +++ b/processor/otel/metadata_test.go @@ -232,5 +232,5 @@ func transformResourceMetadata(t *testing.T, resourceAttrs map[string]pdata.Attr otelSpan.SetSpanID(pdata.NewSpanID([8]byte{2})) spans.Spans().Append(otelSpan) events := transformTraces(t, traces) - return events.Transactions[0].Metadata + return events[0].Transaction.Metadata } diff --git a/processor/otel/metrics.go b/processor/otel/metrics.go index 6f768535110..6b47316c21c 100644 --- a/processor/otel/metrics.go +++ b/processor/otel/metrics.go @@ -103,7 +103,7 @@ func (c *Consumer) convertInstrumentationLibraryMetrics( for _, m := range ms { m.Metadata = metadata m.Timestamp = m.Timestamp.Add(timeDelta) - out.Metricsets = append(out.Metricsets, m.Metricset) + *out = append(*out, model.APMEvent{Metricset: m.Metricset}) } if unsupported > 0 { atomic.AddInt64(&c.stats.unsupportedMetricsDropped, unsupported) diff --git a/processor/otel/metrics_test.go b/processor/otel/metrics_test.go index a0f1f532029..02084d1648a 100644 --- a/processor/otel/metrics_test.go +++ b/processor/otel/metrics_test.go @@ -381,5 +381,11 @@ func transformMetrics(t *testing.T, metrics pdata.Metrics) ([]*model.Metricset, err := consumer.ConsumeMetrics(context.Background(), metrics) require.NoError(t, err) require.Len(t, batches, 1) - return batches[0].Metricsets, consumer.Stats() + + batch := *batches[0] + metricsets := make([]*model.Metricset, len(batch)) + for i, event := range batch { + metricsets[i] = event.Metricset + } + return metricsets, consumer.Stats() } diff --git a/processor/otel/test_approved/jaeger_sampling_rate.approved.json b/processor/otel/test_approved/jaeger_sampling_rate.approved.json index e9c247bab46..5342958a4cd 100644 --- a/processor/otel/test_approved/jaeger_sampling_rate.approved.json +++ b/processor/otel/test_approved/jaeger_sampling_rate.approved.json @@ -50,12 +50,11 @@ "host": { "hostname": "host-abc" }, - "labels": { - "sampler_param": 2, - "sampler_type": "ratelimiting" + "parent": { + "id": "0000000000000001" }, "processor": { - "event": "transaction", + "event": "span", "name": "transaction" }, "service": { @@ -64,16 +63,18 @@ }, "name": "unknown" }, - "timestamp": { - "us": 1576500418000768 - }, - "transaction": { + "span": { "duration": { "us": 79000000 }, - "id": "", - "sampled": true, - "type": "custom" + "name": "", + "type": "app" + }, + "timestamp": { + "us": 1576500418000768 + }, + "trace": { + "id": "00000000000000010000000000000001" } }, { @@ -90,11 +91,12 @@ "host": { "hostname": "host-abc" }, - "parent": { - "id": "0000000000000001" + "labels": { + "sampler_param": 2, + "sampler_type": "ratelimiting" }, "processor": { - "event": "span", + "event": "transaction", "name": "transaction" }, "service": { @@ -103,18 +105,16 @@ }, "name": "unknown" }, - "span": { - "duration": { - "us": 79000000 - }, - "name": "", - "type": "app" - }, "timestamp": { "us": 1576500418000768 }, - "trace": { - "id": "00000000000000010000000000000001" + "transaction": { + "duration": { + "us": 79000000 + }, + "id": "", + "sampled": true, + "type": "custom" } } ] diff --git a/processor/stream/processor.go b/processor/stream/processor.go index bb01e3ce44f..e2b06a25e8d 100644 --- a/processor/stream/processor.go +++ b/processor/stream/processor.go @@ -177,7 +177,7 @@ func (p *Processor) readBatch( continue } event.RUM = p.isRUM - batch.Errors = append(batch.Errors, &event) + *batch = append(*batch, model.APMEvent{Error: &event}) n++ case metricsetEventType: var event model.Metricset @@ -185,7 +185,7 @@ func (p *Processor) readBatch( if handleDecodeErr(err, reader, result) { continue } - batch.Metricsets = append(batch.Metricsets, &event) + *batch = append(*batch, model.APMEvent{Metricset: &event}) n++ case spanEventType: var event model.Span @@ -194,7 +194,7 @@ func (p *Processor) readBatch( continue } event.RUM = p.isRUM - batch.Spans = append(batch.Spans, &event) + *batch = append(*batch, model.APMEvent{Span: &event}) n++ case transactionEventType: var event model.Transaction @@ -202,7 +202,7 @@ func (p *Processor) readBatch( if handleDecodeErr(err, reader, result) { continue } - batch.Transactions = append(batch.Transactions, &event) + *batch = append(*batch, model.APMEvent{Transaction: &event}) n++ case rumv3ErrorEventType: var event model.Error @@ -211,7 +211,7 @@ func (p *Processor) readBatch( continue } event.RUM = p.isRUM - batch.Errors = append(batch.Errors, &event) + *batch = append(*batch, model.APMEvent{Error: &event}) n++ case rumv3MetricsetEventType: var event model.Metricset @@ -219,7 +219,7 @@ func (p *Processor) readBatch( if handleDecodeErr(err, reader, result) { continue } - batch.Metricsets = append(batch.Metricsets, &event) + *batch = append(*batch, model.APMEvent{Metricset: &event}) n++ case rumv3TransactionEventType: var event rumv3.Transaction @@ -227,11 +227,13 @@ func (p *Processor) readBatch( if handleDecodeErr(err, reader, result) { continue } - batch.Transactions = append(batch.Transactions, &event.Transaction) - batch.Metricsets = append(batch.Metricsets, event.Metricsets...) + *batch = append(*batch, model.APMEvent{Transaction: &event.Transaction}) + for _, ms := range event.Metricsets { + *batch = append(*batch, model.APMEvent{Metricset: ms}) + } for _, span := range event.Spans { span.RUM = true - batch.Spans = append(batch.Spans, span) + *batch = append(*batch, model.APMEvent{Span: span}) } n += 1 + len(event.Metricsets) + len(event.Spans) default: @@ -301,7 +303,7 @@ func (p *Processor) HandleStream( if err := processor.ProcessBatch(ctx, &batch); err != nil { return err } - result.AddAccepted(batch.Len()) + result.AddAccepted(len(batch)) } if readErr == io.EOF { break diff --git a/processor/stream/test_approved_es_documents/testIntakeIntegrationEvents.approved.json b/processor/stream/test_approved_es_documents/testIntakeIntegrationEvents.approved.json index afc09db9548..c9220d740ec 100644 --- a/processor/stream/test_approved_es_documents/testIntakeIntegrationEvents.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeIntegrationEvents.approved.json @@ -5,18 +5,141 @@ "agent": { "ephemeral_id": "e71be9ac-93b0-44b9-a997-5638f6ccfc36", "name": "java", - "version": "1.10.0-SNAPSHOT" + "version": "1.10.0" }, "client": { - "ip": "12.53.12.1" + "ip": "192.168.0.1" }, "container": { "id": "8ec7ceb990749e79b37f6dc6cd3628633618d6ce412553a552a0fa6b69419ad4" }, - "data_stream.dataset": "apm", - "data_stream.type": "traces", - "event": { - "outcome": "success" + "data_stream.dataset": "apm.error", + "data_stream.type": "logs", + "error": { + "culprit": "opbeans.controllers.DTInterceptor.preHandle(DTInterceptor.java:73)", + "custom": { + "and_objects": { + "foo": [ + "bar", + "baz" + ] + }, + "my_key": 1, + "some_other_value": "foobar" + }, + "exception": [ + { + "attributes": { + "foo": "bar" + }, + "code": "42", + "handled": false, + "message": "Theusernamerootisunknown", + "module": "org.springframework.http.client", + "stacktrace": [ + { + "abs_path": "/tmp/AbstractPlainSocketImpl.java", + "context": { + "post": [ + "line4", + "line5" + ], + "pre": [ + "line1", + "line2" + ] + }, + "exclude_from_grouping": false, + "filename": "AbstractPlainSocketImpl.java", + "function": "connect", + "library_frame": true, + "line": { + "column": 4, + "context": "3", + "number": 3 + }, + "module": "java.net", + "vars": { + "key": "value" + } + }, + { + "exclude_from_grouping": false, + "filename": "AbstractClientHttpRequest.java", + "function": "execute", + "line": { + "number": 102 + }, + "vars": { + "key": "value" + } + } + ], + "type": "java.net.UnknownHostException" + }, + { + "message": "something wrong writing a file", + "type": "InternalDbError" + }, + { + "message": "disk spinning way too fast", + "type": "VeryInternalDbError" + }, + { + "message": "on top of it,internet doesn't work", + "parent": 1, + "type": "ConnectionError" + } + ], + "grouping_key": "9a4054e958afe722b5877e8fac578ff3", + "id": "9876543210abcdeffedcba0123456789", + "log": { + "level": "error", + "logger_name": "http404", + "message": "Request method 'POST' not supported", + "param_message": "Request method 'POST' /events/:event not supported", + "stacktrace": [ + { + "abs_path": "/tmp/Socket.java", + "classname": "Request::Socket", + "context": { + "post": [ + "line4", + "line5" + ], + "pre": [ + "line1", + "line2" + ] + }, + "exclude_from_grouping": false, + "filename": "Socket.java", + "function": "connect", + "library_frame": true, + "line": { + "column": 4, + "context": "line3", + "number": 3 + }, + "module": "java.net", + "vars": { + "key": "value" + } + }, + { + "abs_path": "/tmp/SimpleBufferingClientHttpRequest.java", + "exclude_from_grouping": false, + "filename": "SimpleBufferingClientHttpRequest.java", + "function": "executeInternal", + "line": { + "number": 102 + }, + "vars": { + "key": "value" + } + } + ] + } }, "host": { "architecture": "amd64", @@ -30,13 +153,7 @@ "http": { "request": { "body": { - "original": { - "additional": { - "bar": 123, - "req": "additionalinformation" - }, - "string": "helloworld" - } + "original": "HelloWorld" }, "cookies": { "c1": "v1", @@ -47,29 +164,30 @@ "SERVER_SOFTWARE": "nginx" }, "headers": { - "Content-Type": [ - "text/html" + "Content-Length": [ + "0" ], "Cookie": [ - "c1=v1,c2=v2" + "c1=v1", + "c2=v2" ], "Elastic-Apm-Traceparent": [ - "00-33a0bd4cceff0370a7c57d807032688e-69feaabc5b88d7e8-01" + "00-8c21b4b556467a0b17ae5da959b5f388-31301f1fb2998121-01" ], - "User-Agent": [ - "Mozilla/5.0(Macintosh;IntelMacOSX10_10_5)AppleWebKit/537.36(KHTML,likeGecko)Chrome/51.0.2704.103Safari/537.36", - "MozillaChromeEdge" + "Forwarded": [ + "for=192.168.0.1" + ], + "Host": [ + "opbeans-java:3000" ] }, "method": "POST", "socket": { "encrypted": true, - "remote_address": "12.53.12.1:8080" + "remote_address": "12.53.12.1" } }, "response": { - "decoded_body_size": 401.9, - "encoded_body_size": 356.9, "finished": true, "headers": { "Content-Type": [ @@ -77,8 +195,7 @@ ] }, "headers_sent": true, - "status_code": 200, - "transfer_size": 300 + "status_code": 200 }, "version": "1.1" }, @@ -99,7 +216,7 @@ "segment": 5 }, "parent": { - "id": "abcdefabcdef01234567" + "id": "9632587410abcdef" }, "process": { "args": [ @@ -110,22 +227,22 @@ "title": "/usr/lib/jvm/java-10-openjdk-amd64/bin/java" }, "processor": { - "event": "transaction", - "name": "transaction" + "event": "error", + "name": "error" }, "service": { "environment": "production", "framework": { - "name": "spring", - "version": "5.0.0" + "name": "Node", + "version": "1" }, "language": { "name": "Java", - "version": "10.0.2" + "version": "1.2" }, - "name": "experimental-java", + "name": "service1", "node": { - "name": "8ec7ceb990749e79b37f6dc6cd3628633618d6ce412553a552a0fa6b69419ad4" + "name": "node-xyz" }, "runtime": { "name": "Java", @@ -134,38 +251,18 @@ "version": "4.3.0" }, "source": { - "ip": "12.53.12.1" + "ip": "192.168.0.1" }, "timestamp": { "us": 1571657444929001 }, "trace": { - "id": "0acd456789abcdef0123456789abcdef" + "id": "0123456789abcdeffedcba0123456789" }, "transaction": { - "custom": { - "(": "notavalidregexandthatisfine", - "and_objects": { - "foo": [ - "bar", - "baz" - ] - }, - "my_key": 1, - "some_other_value": "foobar" - }, - "duration": { - "us": 32592 - }, - "id": "4340a8e0df1906ecbfa9", - "name": "ResourceHttpRequestHandler", - "result": "HTTP2xx", + "id": "1234567890987654", "sampled": true, - "span_count": { - "dropped": 0, - "started": 17 - }, - "type": "http" + "type": "request" }, "url": { "domain": "www.example.com", @@ -178,12 +275,9 @@ "scheme": "https" }, "user": { - "email": "foo@mail.com", + "email": "user@foo.mail", "id": "99", "name": "foo" - }, - "user_agent": { - "original": "Mozilla/5.0(Macintosh;IntelMacOSX10_10_5)AppleWebKit/537.36(KHTML,likeGecko)Chrome/51.0.2704.103Safari/537.36, MozillaChromeEdge" } }, { @@ -344,21 +438,19 @@ "agent": { "ephemeral_id": "e71be9ac-93b0-44b9-a997-5638f6ccfc36", "name": "java", - "version": "1.10.0" + "version": "1.10.0-SNAPSHOT" + }, + "client": { + "ip": "12.53.12.1" }, - "byte_counter": 1, "container": { "id": "8ec7ceb990749e79b37f6dc6cd3628633618d6ce412553a552a0fa6b69419ad4" }, - "data_stream.dataset": "apm.internal", - "data_stream.type": "metrics", - "dotted": { - "float": { - "gauge": 6.12 - } + "data_stream.dataset": "apm", + "data_stream.type": "traces", + "event": { + "outcome": "success" }, - "double_gauge": 3.141592653589793, - "float_gauge": 9.16, "host": { "architecture": "amd64", "hostname": "8ec7ceb99074", @@ -368,38 +460,80 @@ "platform": "Linux" } }, - "integer_gauge": 42767, - "kubernetes": { - "namespace": "default", - "node": { - "name": "node-name" - }, - "pod": { - "name": "instrumented-java-service", - "uid": "b17f231da0ad128dc6c6c0b2e82f6f303d3893e3" - } - }, - "labels": { - "ab_testing": true, - "code": 200, - "group": "experimental", - "segment": 5, - "success": true - }, - "long_gauge": 3147483648, - "negative": { - "d": { - "o": { - "t": { - "t": { - "e": { - "d": -1022 - } - } + "http": { + "request": { + "body": { + "original": { + "additional": { + "bar": 123, + "req": "additionalinformation" + }, + "string": "helloworld" } + }, + "cookies": { + "c1": "v1", + "c2": "v2" + }, + "env": { + "GATEWAY_INTERFACE": "CGI/1.1", + "SERVER_SOFTWARE": "nginx" + }, + "headers": { + "Content-Type": [ + "text/html" + ], + "Cookie": [ + "c1=v1,c2=v2" + ], + "Elastic-Apm-Traceparent": [ + "00-33a0bd4cceff0370a7c57d807032688e-69feaabc5b88d7e8-01" + ], + "User-Agent": [ + "Mozilla/5.0(Macintosh;IntelMacOSX10_10_5)AppleWebKit/537.36(KHTML,likeGecko)Chrome/51.0.2704.103Safari/537.36", + "MozillaChromeEdge" + ] + }, + "method": "POST", + "socket": { + "encrypted": true, + "remote_address": "12.53.12.1:8080" } + }, + "response": { + "decoded_body_size": 401.9, + "encoded_body_size": 356.9, + "finished": true, + "headers": { + "Content-Type": [ + "application/json" + ] + }, + "headers_sent": true, + "status_code": 200, + "transfer_size": 300 + }, + "version": "1.1" + }, + "kubernetes": { + "namespace": "default", + "node": { + "name": "node-name" + }, + "pod": { + "name": "instrumented-java-service", + "uid": "b17f231da0ad128dc6c6c0b2e82f6f303d3893e3" } }, + "labels": { + "ab_testing": true, + "group": "experimental", + "organization_uuid": "9f0e9d64-c185-4d21-a6f4-4673ed561ec8", + "segment": 5 + }, + "parent": { + "id": "abcdefabcdef01234567" + }, "process": { "args": [ "-v" @@ -409,8 +543,8 @@ "title": "/usr/lib/jvm/java-10-openjdk-amd64/bin/java" }, "processor": { - "event": "metric", - "name": "metric" + "event": "transaction", + "name": "transaction" }, "service": { "environment": "production", @@ -422,7 +556,7 @@ "name": "Java", "version": "10.0.2" }, - "name": "1234_service-12a3", + "name": "experimental-java", "node": { "name": "8ec7ceb990749e79b37f6dc6cd3628633618d6ce412553a552a0fa6b69419ad4" }, @@ -432,35 +566,57 @@ }, "version": "4.3.0" }, - "short_counter": 227, - "span": { - "self_time": { - "count": 1, - "sum": { - "us": 633.288 - } - }, - "subtype": "mysql", - "type": "db" + "source": { + "ip": "12.53.12.1" + }, + "timestamp": { + "us": 1571657444929001 + }, + "trace": { + "id": "0acd456789abcdef0123456789abcdef" }, "transaction": { - "breakdown": { - "count": 12 + "custom": { + "(": "notavalidregexandthatisfine", + "and_objects": { + "foo": [ + "bar", + "baz" + ] + }, + "my_key": 1, + "some_other_value": "foobar" }, "duration": { - "count": 2, - "sum": { - "us": 12 - } + "us": 32592 }, - "name": "GET/", - "self_time": { - "count": 2, - "sum": { - "us": 10 - } + "id": "4340a8e0df1906ecbfa9", + "name": "ResourceHttpRequestHandler", + "result": "HTTP2xx", + "sampled": true, + "span_count": { + "dropped": 0, + "started": 17 }, - "type": "request" + "type": "http" + }, + "url": { + "domain": "www.example.com", + "fragment": "#hash", + "full": "https://www.example.com/p/a/t/h?query=string#hash", + "original": "/p/a/t/h?query=string#hash", + "path": "/p/a/t/h", + "port": 8080, + "query": "?query=string", + "scheme": "https" + }, + "user": { + "email": "foo@mail.com", + "id": "99", + "name": "foo" + }, + "user_agent": { + "original": "Mozilla/5.0(Macintosh;IntelMacOSX10_10_5)AppleWebKit/537.36(KHTML,likeGecko)Chrome/51.0.2704.103Safari/537.36, MozillaChromeEdge" } }, { @@ -470,140 +626,19 @@ "name": "java", "version": "1.10.0" }, - "client": { - "ip": "192.168.0.1" - }, + "byte_counter": 1, "container": { "id": "8ec7ceb990749e79b37f6dc6cd3628633618d6ce412553a552a0fa6b69419ad4" }, - "data_stream.dataset": "apm.error", - "data_stream.type": "logs", - "error": { - "culprit": "opbeans.controllers.DTInterceptor.preHandle(DTInterceptor.java:73)", - "custom": { - "and_objects": { - "foo": [ - "bar", - "baz" - ] - }, - "my_key": 1, - "some_other_value": "foobar" - }, - "exception": [ - { - "attributes": { - "foo": "bar" - }, - "code": "42", - "handled": false, - "message": "Theusernamerootisunknown", - "module": "org.springframework.http.client", - "stacktrace": [ - { - "abs_path": "/tmp/AbstractPlainSocketImpl.java", - "context": { - "post": [ - "line4", - "line5" - ], - "pre": [ - "line1", - "line2" - ] - }, - "exclude_from_grouping": false, - "filename": "AbstractPlainSocketImpl.java", - "function": "connect", - "library_frame": true, - "line": { - "column": 4, - "context": "3", - "number": 3 - }, - "module": "java.net", - "vars": { - "key": "value" - } - }, - { - "exclude_from_grouping": false, - "filename": "AbstractClientHttpRequest.java", - "function": "execute", - "line": { - "number": 102 - }, - "vars": { - "key": "value" - } - } - ], - "type": "java.net.UnknownHostException" - }, - { - "message": "something wrong writing a file", - "type": "InternalDbError" - }, - { - "message": "disk spinning way too fast", - "type": "VeryInternalDbError" - }, - { - "message": "on top of it,internet doesn't work", - "parent": 1, - "type": "ConnectionError" - } - ], - "grouping_key": "9a4054e958afe722b5877e8fac578ff3", - "id": "9876543210abcdeffedcba0123456789", - "log": { - "level": "error", - "logger_name": "http404", - "message": "Request method 'POST' not supported", - "param_message": "Request method 'POST' /events/:event not supported", - "stacktrace": [ - { - "abs_path": "/tmp/Socket.java", - "classname": "Request::Socket", - "context": { - "post": [ - "line4", - "line5" - ], - "pre": [ - "line1", - "line2" - ] - }, - "exclude_from_grouping": false, - "filename": "Socket.java", - "function": "connect", - "library_frame": true, - "line": { - "column": 4, - "context": "line3", - "number": 3 - }, - "module": "java.net", - "vars": { - "key": "value" - } - }, - { - "abs_path": "/tmp/SimpleBufferingClientHttpRequest.java", - "exclude_from_grouping": false, - "filename": "SimpleBufferingClientHttpRequest.java", - "function": "executeInternal", - "line": { - "number": 102 - }, - "vars": { - "key": "value" - } - } - ] + "data_stream.dataset": "apm.internal", + "data_stream.type": "metrics", + "dotted": { + "float": { + "gauge": 6.12 } }, + "double_gauge": 3.141592653589793, + "float_gauge": 9.16, "host": { "architecture": "amd64", "hostname": "8ec7ceb99074", @@ -613,55 +648,7 @@ "platform": "Linux" } }, - "http": { - "request": { - "body": { - "original": "HelloWorld" - }, - "cookies": { - "c1": "v1", - "c2": "v2" - }, - "env": { - "GATEWAY_INTERFACE": "CGI/1.1", - "SERVER_SOFTWARE": "nginx" - }, - "headers": { - "Content-Length": [ - "0" - ], - "Cookie": [ - "c1=v1", - "c2=v2" - ], - "Elastic-Apm-Traceparent": [ - "00-8c21b4b556467a0b17ae5da959b5f388-31301f1fb2998121-01" - ], - "Forwarded": [ - "for=192.168.0.1" - ], - "Host": [ - "opbeans-java:3000" - ] - }, - "method": "POST", - "socket": { - "encrypted": true, - "remote_address": "12.53.12.1" - } - }, - "response": { - "finished": true, - "headers": { - "Content-Type": [ - "application/json" - ] - }, - "headers_sent": true, - "status_code": 200 - }, - "version": "1.1" - }, + "integer_gauge": 42767, "kubernetes": { "namespace": "default", "node": { @@ -674,12 +661,24 @@ }, "labels": { "ab_testing": true, + "code": 200, "group": "experimental", - "organization_uuid": "9f0e9d64-c185-4d21-a6f4-4673ed561ec8", - "segment": 5 + "segment": 5, + "success": true }, - "parent": { - "id": "9632587410abcdef" + "long_gauge": 3147483648, + "negative": { + "d": { + "o": { + "t": { + "t": { + "e": { + "d": -1022 + } + } + } + } + } }, "process": { "args": [ @@ -690,22 +689,22 @@ "title": "/usr/lib/jvm/java-10-openjdk-amd64/bin/java" }, "processor": { - "event": "error", - "name": "error" + "event": "metric", + "name": "metric" }, "service": { "environment": "production", "framework": { - "name": "Node", - "version": "1" + "name": "spring", + "version": "5.0.0" }, "language": { "name": "Java", - "version": "1.2" + "version": "10.0.2" }, - "name": "service1", + "name": "1234_service-12a3", "node": { - "name": "node-xyz" + "name": "8ec7ceb990749e79b37f6dc6cd3628633618d6ce412553a552a0fa6b69419ad4" }, "runtime": { "name": "Java", @@ -713,34 +712,35 @@ }, "version": "4.3.0" }, - "source": { - "ip": "192.168.0.1" - }, - "timestamp": { - "us": 1571657444929001 - }, - "trace": { - "id": "0123456789abcdeffedcba0123456789" + "short_counter": 227, + "span": { + "self_time": { + "count": 1, + "sum": { + "us": 633.288 + } + }, + "subtype": "mysql", + "type": "db" }, "transaction": { - "id": "1234567890987654", - "sampled": true, + "breakdown": { + "count": 12 + }, + "duration": { + "count": 2, + "sum": { + "us": 12 + } + }, + "name": "GET/", + "self_time": { + "count": 2, + "sum": { + "us": 10 + } + }, "type": "request" - }, - "url": { - "domain": "www.example.com", - "fragment": "#hash", - "full": "https://www.example.com/p/a/t/h?query=string#hash", - "original": "/p/a/t/h?query=string#hash", - "path": "/p/a/t/h", - "port": 8080, - "query": "?query=string", - "scheme": "https" - }, - "user": { - "email": "user@foo.mail", - "id": "99", - "name": "foo" } } ] diff --git a/processor/stream/test_approved_es_documents/testIntakeIntegrationMinimalService.approved.json b/processor/stream/test_approved_es_documents/testIntakeIntegrationMinimalService.approved.json index 47679320d4b..cc5806da98b 100644 --- a/processor/stream/test_approved_es_documents/testIntakeIntegrationMinimalService.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeIntegrationMinimalService.approved.json @@ -1,67 +1,67 @@ { "events": [ { - "@timestamp": "2017-05-30T18:53:42.281Z", + "@timestamp": "2018-08-09T15:04:05.999Z", "agent": { "name": "elastic-node", "version": "3.14.0" }, - "data_stream.dataset": "apm.app.1234_service_12a3", - "data_stream.type": "metrics", - "go": { - "memstats": { - "heap": { - "sys": { - "bytes": 61235 - } - } + "data_stream.dataset": "apm.error", + "data_stream.type": "logs", + "error": { + "grouping_key": "d6b3f958dfea98dc9ed2b57d5f0c48bb", + "id": "abcdef0123456789", + "log": { + "level": "custom log level", + "message": "Cannot read property 'baz' of undefined" } }, "host": { "ip": "192.0.0.1" }, "processor": { - "event": "metric", - "name": "metric" + "event": "error", + "name": "error" }, "service": { "language": { "name": "ecmascript" }, "name": "1234_service-12a3" + }, + "timestamp": { + "us": 1533827045999000 } }, { - "@timestamp": "2018-08-09T15:04:05.999Z", + "@timestamp": "2017-05-30T18:53:42.281Z", "agent": { "name": "elastic-node", "version": "3.14.0" }, - "data_stream.dataset": "apm.error", - "data_stream.type": "logs", - "error": { - "grouping_key": "d6b3f958dfea98dc9ed2b57d5f0c48bb", - "id": "abcdef0123456789", - "log": { - "level": "custom log level", - "message": "Cannot read property 'baz' of undefined" + "data_stream.dataset": "apm.app.1234_service_12a3", + "data_stream.type": "metrics", + "go": { + "memstats": { + "heap": { + "sys": { + "bytes": 61235 + } + } } }, "host": { "ip": "192.0.0.1" }, "processor": { - "event": "error", - "name": "error" + "event": "metric", + "name": "metric" }, "service": { "language": { "name": "ecmascript" }, "name": "1234_service-12a3" - }, - "timestamp": { - "us": 1533827045999000 } } ] diff --git a/processor/stream/test_approved_es_documents/testIntakeRUMV3Events.approved.json b/processor/stream/test_approved_es_documents/testIntakeRUMV3Events.approved.json index 69a85676529..4567d59d489 100644 --- a/processor/stream/test_approved_es_documents/testIntakeRUMV3Events.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeRUMV3Events.approved.json @@ -148,6 +148,177 @@ "original": "rum-2.0" } }, + { + "@timestamp": "2018-08-01T10:00:00.000Z", + "agent": { + "name": "js-base", + "version": "4.8.1" + }, + "client": { + "ip": "192.0.0.1" + }, + "data_stream.dataset": "apm.internal", + "data_stream.type": "metrics", + "labels": { + "testTagKey": "testTagValue" + }, + "processor": { + "event": "metric", + "name": "metric" + }, + "service": { + "environment": "prod", + "framework": { + "name": "angular", + "version": "2" + }, + "language": { + "name": "javascript", + "version": "6" + }, + "name": "apm-a-rum-test-e2e-general-usecase", + "runtime": { + "name": "v8", + "version": "8.0" + }, + "version": "0.0.1" + }, + "transaction": { + "breakdown": { + "count": 1 + }, + "duration": { + "count": 1, + "sum": { + "us": 295 + } + }, + "name": "general-usecase-initial-p-load", + "type": "p-load" + }, + "user": { + "email": "user@email.com", + "id": "123", + "name": "John Doe" + }, + "user_agent": { + "original": "rum-2.0" + } + }, + { + "@timestamp": "2018-08-01T10:00:00.000Z", + "agent": { + "name": "js-base", + "version": "4.8.1" + }, + "client": { + "ip": "192.0.0.1" + }, + "data_stream.dataset": "apm.internal", + "data_stream.type": "metrics", + "labels": { + "testTagKey": "testTagValue" + }, + "processor": { + "event": "metric", + "name": "metric" + }, + "service": { + "environment": "prod", + "framework": { + "name": "angular", + "version": "2" + }, + "language": { + "name": "javascript", + "version": "6" + }, + "name": "apm-a-rum-test-e2e-general-usecase", + "runtime": { + "name": "v8", + "version": "8.0" + }, + "version": "0.0.1" + }, + "span": { + "self_time": { + "count": 1, + "sum": { + "us": 1 + } + }, + "type": "Request" + }, + "transaction": { + "name": "general-usecase-initial-p-load", + "type": "p-load" + }, + "user": { + "email": "user@email.com", + "id": "123", + "name": "John Doe" + }, + "user_agent": { + "original": "rum-2.0" + } + }, + { + "@timestamp": "2018-08-01T10:00:00.000Z", + "agent": { + "name": "js-base", + "version": "4.8.1" + }, + "client": { + "ip": "192.0.0.1" + }, + "data_stream.dataset": "apm.internal", + "data_stream.type": "metrics", + "labels": { + "testTagKey": "testTagValue" + }, + "processor": { + "event": "metric", + "name": "metric" + }, + "service": { + "environment": "prod", + "framework": { + "name": "angular", + "version": "2" + }, + "language": { + "name": "javascript", + "version": "6" + }, + "name": "apm-a-rum-test-e2e-general-usecase", + "runtime": { + "name": "v8", + "version": "8.0" + }, + "version": "0.0.1" + }, + "span": { + "self_time": { + "count": 1, + "sum": { + "us": 1 + } + }, + "type": "Response" + }, + "transaction": { + "name": "general-usecase-initial-p-load", + "type": "p-load" + }, + "user": { + "email": "user@email.com", + "id": "123", + "name": "John Doe" + }, + "user_agent": { + "original": "rum-2.0" + } + }, { "@timestamp": "2018-08-01T10:00:00.004Z", "agent": { @@ -844,177 +1015,6 @@ "original": "rum-2.0" } }, - { - "@timestamp": "2018-08-01T10:00:00.000Z", - "agent": { - "name": "js-base", - "version": "4.8.1" - }, - "client": { - "ip": "192.0.0.1" - }, - "data_stream.dataset": "apm.internal", - "data_stream.type": "metrics", - "labels": { - "testTagKey": "testTagValue" - }, - "processor": { - "event": "metric", - "name": "metric" - }, - "service": { - "environment": "prod", - "framework": { - "name": "angular", - "version": "2" - }, - "language": { - "name": "javascript", - "version": "6" - }, - "name": "apm-a-rum-test-e2e-general-usecase", - "runtime": { - "name": "v8", - "version": "8.0" - }, - "version": "0.0.1" - }, - "transaction": { - "breakdown": { - "count": 1 - }, - "duration": { - "count": 1, - "sum": { - "us": 295 - } - }, - "name": "general-usecase-initial-p-load", - "type": "p-load" - }, - "user": { - "email": "user@email.com", - "id": "123", - "name": "John Doe" - }, - "user_agent": { - "original": "rum-2.0" - } - }, - { - "@timestamp": "2018-08-01T10:00:00.000Z", - "agent": { - "name": "js-base", - "version": "4.8.1" - }, - "client": { - "ip": "192.0.0.1" - }, - "data_stream.dataset": "apm.internal", - "data_stream.type": "metrics", - "labels": { - "testTagKey": "testTagValue" - }, - "processor": { - "event": "metric", - "name": "metric" - }, - "service": { - "environment": "prod", - "framework": { - "name": "angular", - "version": "2" - }, - "language": { - "name": "javascript", - "version": "6" - }, - "name": "apm-a-rum-test-e2e-general-usecase", - "runtime": { - "name": "v8", - "version": "8.0" - }, - "version": "0.0.1" - }, - "span": { - "self_time": { - "count": 1, - "sum": { - "us": 1 - } - }, - "type": "Request" - }, - "transaction": { - "name": "general-usecase-initial-p-load", - "type": "p-load" - }, - "user": { - "email": "user@email.com", - "id": "123", - "name": "John Doe" - }, - "user_agent": { - "original": "rum-2.0" - } - }, - { - "@timestamp": "2018-08-01T10:00:00.000Z", - "agent": { - "name": "js-base", - "version": "4.8.1" - }, - "client": { - "ip": "192.0.0.1" - }, - "data_stream.dataset": "apm.internal", - "data_stream.type": "metrics", - "labels": { - "testTagKey": "testTagValue" - }, - "processor": { - "event": "metric", - "name": "metric" - }, - "service": { - "environment": "prod", - "framework": { - "name": "angular", - "version": "2" - }, - "language": { - "name": "javascript", - "version": "6" - }, - "name": "apm-a-rum-test-e2e-general-usecase", - "runtime": { - "name": "v8", - "version": "8.0" - }, - "version": "0.0.1" - }, - "span": { - "self_time": { - "count": 1, - "sum": { - "us": 1 - } - }, - "type": "Response" - }, - "transaction": { - "name": "general-usecase-initial-p-load", - "type": "p-load" - }, - "user": { - "email": "user@email.com", - "id": "123", - "name": "John Doe" - }, - "user_agent": { - "original": "rum-2.0" - } - }, { "@timestamp": "2018-08-01T10:00:00.000Z", "agent": { diff --git a/sampling/sampling.go b/sampling/sampling.go index a5d4deaf386..3c820815447 100644 --- a/sampling/sampling.go +++ b/sampling/sampling.go @@ -36,23 +36,21 @@ var ( // of events retained in the batch. func NewDiscardUnsampledBatchProcessor() model.BatchProcessor { return model.ProcessBatchFunc(func(ctx context.Context, batch *model.Batch) error { - var dropped int64 - transactions := batch.Transactions - for i := 0; i < len(transactions); { - tx := transactions[i] - if tx.Sampled == nil || *tx.Sampled { + events := *batch + for i := 0; i < len(events); { + event := events[i] + if event.Transaction == nil || event.Transaction.Sampled == nil || *event.Transaction.Sampled { i++ continue } - n := len(transactions) - transactions[i], transactions[n-1] = transactions[n-1], transactions[i] - transactions = transactions[:n-1] - dropped++ + n := len(events) + events[i], events[n-1] = events[n-1], events[i] + events = events[:n-1] } - if dropped > 0 { - transactionsDroppedCounter.Add(dropped) + if dropped := len(*batch) - len(events); dropped > 0 { + transactionsDroppedCounter.Add(int64(dropped)) } - batch.Transactions = transactions + *batch = events return nil }) } diff --git a/sampling/sampling_test.go b/sampling/sampling_test.go index 5da80643a9a..2378b3a9b0d 100644 --- a/sampling/sampling_test.go +++ b/sampling/sampling_test.go @@ -39,17 +39,24 @@ func TestNewDiscardUnsampledBatchProcessor(t *testing.T) { t5 := &model.Transaction{Sampled: newBool(true)} batch := model.Batch{ - Transactions: []*model.Transaction{t1, t2, t3, t4, t5}, - Spans: []*model.Span{span}, + {Transaction: t1}, + {Transaction: t2}, + {Span: span}, + {Transaction: t3}, + {Transaction: t4}, + {Transaction: t5}, } + err := batchProcessor.ProcessBatch(context.Background(), &batch) assert.NoError(t, err) // Note that t3 gets sent to the back of the slice; // this reporter is not order-preserving. assert.Equal(t, model.Batch{ - Transactions: []*model.Transaction{t1, t5, t3}, - Spans: []*model.Span{span}, + {Transaction: t1}, + {Transaction: t5}, + {Span: span}, + {Transaction: t3}, }, batch) expectedMonitoring := monitoring.MakeFlatSnapshot() diff --git a/testdata/jaeger/batch_1.approved.json b/testdata/jaeger/batch_1.approved.json index 6fb18952ac0..9edeea86c82 100644 --- a/testdata/jaeger/batch_1.approved.json +++ b/testdata/jaeger/batch_1.approved.json @@ -90,6 +90,48 @@ "id": "00000000000000007be2fd98d0973be3" } }, + { + "@timestamp": "2019-12-20T07:41:45.006Z", + "agent": { + "ephemeral_id": "2e3f8db3eb77fae0", + "name": "Jaeger/Go", + "version": "2.20.1" + }, + "error": { + "exception": [ + { + "message": "redis timeout" + } + ], + "grouping_key": "dd09a7d0d9dde0adfcd694967c5a88de", + "log": { + "message": "redis timeout" + } + }, + "host": { + "hostname": "host01", + "ip": "10.0.0.13" + }, + "parent": { + "id": "333295bfb438ea03" + }, + "processor": { + "event": "error", + "name": "error" + }, + "service": { + "language": { + "name": "Go" + }, + "name": "redis" + }, + "timestamp": { + "us": 1576827705006847 + }, + "trace": { + "id": "00000000000000007be2fd98d0973be3" + } + }, { "@timestamp": "2019-12-20T07:41:45.007Z", "agent": { @@ -315,6 +357,48 @@ "id": "00000000000000007be2fd98d0973be3" } }, + { + "@timestamp": "2019-12-20T07:41:45.089Z", + "agent": { + "ephemeral_id": "2e3f8db3eb77fae0", + "name": "Jaeger/Go", + "version": "2.20.1" + }, + "error": { + "exception": [ + { + "message": "redis timeout" + } + ], + "grouping_key": "dd09a7d0d9dde0adfcd694967c5a88de", + "log": { + "message": "redis timeout" + } + }, + "host": { + "hostname": "host01", + "ip": "10.0.0.13" + }, + "parent": { + "id": "614811d6c498bfb0" + }, + "processor": { + "event": "error", + "name": "error" + }, + "service": { + "language": { + "name": "Go" + }, + "name": "redis" + }, + "timestamp": { + "us": 1576827705089372 + }, + "trace": { + "id": "00000000000000007be2fd98d0973be3" + } + }, { "@timestamp": "2019-12-20T07:41:45.089Z", "agent": { @@ -547,22 +631,27 @@ "name": "Jaeger/Go", "version": "2.20.1" }, - "event": { - "outcome": "unknown" + "error": { + "exception": [ + { + "message": "redis timeout" + } + ], + "grouping_key": "dd09a7d0d9dde0adfcd694967c5a88de", + "log": { + "message": "redis timeout" + } }, "host": { "hostname": "host01", "ip": "10.0.0.13" }, - "labels": { - "param_driverID": "T752547C" - }, "parent": { - "id": "7be2fd98d0973be3" + "id": "0242ee3774d9eab1" }, "processor": { - "event": "span", - "name": "transaction" + "event": "error", + "name": "error" }, "service": { "language": { @@ -570,23 +659,15 @@ }, "name": "redis" }, - "span": { - "duration": { - "us": 14029 - }, - "id": "6a63d1e81cfc7d95", - "name": "GetDriver", - "type": "app" - }, "timestamp": { - "us": 1576827705172618 + "us": 1576827705172347 }, "trace": { "id": "00000000000000007be2fd98d0973be3" } }, { - "@timestamp": "2019-12-20T07:41:45.186Z", + "@timestamp": "2019-12-20T07:41:45.172Z", "agent": { "ephemeral_id": "2e3f8db3eb77fae0", "name": "Jaeger/Go", @@ -600,7 +681,7 @@ "ip": "10.0.0.13" }, "labels": { - "param_driverID": "T757338C" + "param_driverID": "T752547C" }, "parent": { "id": "7be2fd98d0973be3" @@ -617,89 +698,42 @@ }, "span": { "duration": { - "us": 10431 + "us": 14029 }, - "id": "2b4c28f02b272f17", + "id": "6a63d1e81cfc7d95", "name": "GetDriver", "type": "app" }, "timestamp": { - "us": 1576827705186670 + "us": 1576827705172618 }, "trace": { "id": "00000000000000007be2fd98d0973be3" } }, { - "@timestamp": "2019-12-20T07:41:45.006Z", + "@timestamp": "2019-12-20T07:41:45.186Z", "agent": { "ephemeral_id": "2e3f8db3eb77fae0", "name": "Jaeger/Go", "version": "2.20.1" }, - "error": { - "exception": [ - { - "message": "redis timeout" - } - ], - "grouping_key": "dd09a7d0d9dde0adfcd694967c5a88de", - "log": { - "message": "redis timeout" - } + "event": { + "outcome": "unknown" }, "host": { "hostname": "host01", "ip": "10.0.0.13" }, - "parent": { - "id": "333295bfb438ea03" - }, - "processor": { - "event": "error", - "name": "error" - }, - "service": { - "language": { - "name": "Go" - }, - "name": "redis" - }, - "timestamp": { - "us": 1576827705006847 - }, - "trace": { - "id": "00000000000000007be2fd98d0973be3" - } - }, - { - "@timestamp": "2019-12-20T07:41:45.089Z", - "agent": { - "ephemeral_id": "2e3f8db3eb77fae0", - "name": "Jaeger/Go", - "version": "2.20.1" - }, - "error": { - "exception": [ - { - "message": "redis timeout" - } - ], - "grouping_key": "dd09a7d0d9dde0adfcd694967c5a88de", - "log": { - "message": "redis timeout" - } - }, - "host": { - "hostname": "host01", - "ip": "10.0.0.13" + "labels": { + "param_driverID": "T757338C" }, "parent": { - "id": "614811d6c498bfb0" + "id": "7be2fd98d0973be3" }, "processor": { - "event": "error", - "name": "error" + "event": "span", + "name": "transaction" }, "service": { "language": { @@ -707,50 +741,16 @@ }, "name": "redis" }, - "timestamp": { - "us": 1576827705089372 - }, - "trace": { - "id": "00000000000000007be2fd98d0973be3" - } - }, - { - "@timestamp": "2019-12-20T07:41:45.172Z", - "agent": { - "ephemeral_id": "2e3f8db3eb77fae0", - "name": "Jaeger/Go", - "version": "2.20.1" - }, - "error": { - "exception": [ - { - "message": "redis timeout" - } - ], - "grouping_key": "dd09a7d0d9dde0adfcd694967c5a88de", - "log": { - "message": "redis timeout" - } - }, - "host": { - "hostname": "host01", - "ip": "10.0.0.13" - }, - "parent": { - "id": "0242ee3774d9eab1" - }, - "processor": { - "event": "error", - "name": "error" - }, - "service": { - "language": { - "name": "Go" + "span": { + "duration": { + "us": 10431 }, - "name": "redis" + "id": "2b4c28f02b272f17", + "name": "GetDriver", + "type": "app" }, "timestamp": { - "us": 1576827705172347 + "us": 1576827705186670 }, "trace": { "id": "00000000000000007be2fd98d0973be3" diff --git a/x-pack/apm-server/aggregation/spanmetrics/aggregator.go b/x-pack/apm-server/aggregation/spanmetrics/aggregator.go index fdeaf1ee825..cfecac7c0ae 100644 --- a/x-pack/apm-server/aggregation/spanmetrics/aggregator.go +++ b/x-pack/apm-server/aggregation/spanmetrics/aggregator.go @@ -158,14 +158,14 @@ func (a *Aggregator) publish(ctx context.Context) error { } now := time.Now() - metricsets := make([]*model.Metricset, 0, size) + batch := make(model.Batch, 0, size) for key, metrics := range a.inactive.m { metricset := makeMetricset(now, key, metrics, a.config.Interval.Milliseconds()) - metricsets = append(metricsets, &metricset) + batch = append(batch, model.APMEvent{Metricset: &metricset}) delete(a.inactive.m, key) } - a.config.Logger.Debugf("publishing %d metricsets", len(metricsets)) - return a.config.BatchProcessor.ProcessBatch(ctx, &model.Batch{Metricsets: metricsets}) + a.config.Logger.Debugf("publishing %d metricsets", len(batch)) + return a.config.BatchProcessor.ProcessBatch(ctx, &batch) } // ProcessBatch aggregates all spans contained in "b", adding to it any @@ -176,9 +176,12 @@ func (a *Aggregator) publish(ctx context.Context) error { func (a *Aggregator) ProcessBatch(ctx context.Context, b *model.Batch) error { a.mu.RLock() defer a.mu.RUnlock() - for _, span := range b.Spans { - if metricset := a.processSpan(span); metricset != nil { - b.Metricsets = append(b.Metricsets, metricset) + for _, event := range *b { + if event.Span == nil { + continue + } + if metricset := a.processSpan(event.Span); metricset != nil { + *b = append(*b, model.APMEvent{Metricset: metricset}) } } return nil diff --git a/x-pack/apm-server/aggregation/spanmetrics/aggregator_test.go b/x-pack/apm-server/aggregation/spanmetrics/aggregator_test.go index 271f4e81476..f7a4141e369 100644 --- a/x-pack/apm-server/aggregation/spanmetrics/aggregator_test.go +++ b/x-pack/apm-server/aggregation/spanmetrics/aggregator_test.go @@ -29,7 +29,7 @@ func BenchmarkAggregateSpan(b *testing.B) { for pb.Next() { agg.ProcessBatch( context.Background(), - &model.Batch{Spans: []*model.Span{span}}, + &model.Batch{{Span: span}}, ) } }) @@ -66,7 +66,7 @@ func TestNewAggregatorConfigInvalid(t *testing.T) { } func TestAggregatorRun(t *testing.T) { - batches := make(chan *model.Batch, 1) + batches := make(chan model.Batch, 1) agg, err := NewAggregator(AggregatorConfig{ BatchProcessor: makeChanBatchProcessor(batches), Interval: 10 * time.Millisecond, @@ -100,11 +100,11 @@ func TestAggregatorRun(t *testing.T) { go func(in input) { defer wg.Done() span := makeSpan(in.serviceName, in.agentName, in.destination, in.outcome, 100*time.Millisecond, in.count) - batch := &model.Batch{Spans: []*model.Span{span}} + batch := model.Batch{{Span: span}} for i := 0; i < 100; i++ { - err := agg.ProcessBatch(context.Background(), batch) + err := agg.ProcessBatch(context.Background(), &batch) require.NoError(t, err) - assert.Empty(t, batch.Metricsets) + assert.Equal(t, model.Batch{{Span: span}}, batch) } }(in) } @@ -115,10 +115,7 @@ func TestAggregatorRun(t *testing.T) { defer agg.Stop(context.Background()) batch := expectBatch(t, batches) - for _, ms := range batch.Metricsets { - require.NotZero(t, ms.Timestamp) - ms.Timestamp = time.Time{} - } + metricsets := batchMetricsets(t, batch) assert.ElementsMatch(t, []*model.Metricset{{ Name: "service_destination", @@ -184,7 +181,7 @@ func TestAggregatorRun(t *testing.T) { {Name: "span.destination.service.response_time.sum.us", Value: 10000000.0}, {Name: "metricset.period", Value: 10}, }, - }}, batch.Metricsets) + }}, metricsets) select { case <-batches: @@ -194,7 +191,7 @@ func TestAggregatorRun(t *testing.T) { } func TestAggregatorOverflow(t *testing.T) { - batches := make(chan *model.Batch, 1) + batches := make(chan model.Batch, 1) agg, err := NewAggregator(AggregatorConfig{ BatchProcessor: makeChanBatchProcessor(batches), Interval: 10 * time.Millisecond, @@ -204,32 +201,28 @@ func TestAggregatorOverflow(t *testing.T) { // The first two transaction groups will not require immediate publication, // as we have configured the spanmetrics with a maximum of two buckets. - var batch model.Batch - for i := 0; i < 10; i++ { - batch.Spans = append(batch.Spans, - makeSpan("service", "agent", "destination1", "success", 100*time.Millisecond, 1), - makeSpan("service", "agent", "destination2", "success", 100*time.Millisecond, 1), - ) + batch := make(model.Batch, 20) + for i := 0; i < len(batch); i += 2 { + batch[i].Span = makeSpan("service", "agent", "destination1", "success", 100*time.Millisecond, 1) + batch[i+1].Span = makeSpan("service", "agent", "destination2", "success", 100*time.Millisecond, 1) } err = agg.ProcessBatch(context.Background(), &batch) require.NoError(t, err) - assert.Empty(t, batch.Metricsets) + assert.Empty(t, batchMetricsets(t, batch)) // The third group will return a metricset for immediate publication. for i := 0; i < 2; i++ { - batch.Spans = append(batch.Spans, - makeSpan("service", "agent", "destination3", "success", 100*time.Millisecond, 1), - ) + batch = append(batch, model.APMEvent{ + Span: makeSpan("service", "agent", "destination3", "success", 100*time.Millisecond, 1), + }) } err = agg.ProcessBatch(context.Background(), &batch) require.NoError(t, err) - assert.Len(t, batch.Metricsets, 2) - for _, m := range batch.Metricsets { - require.NotNil(t, m) - require.False(t, m.Timestamp.IsZero()) + metricsets := batchMetricsets(t, batch) + assert.Len(t, metricsets, 2) - m.Timestamp = time.Time{} + for _, m := range metricsets { assert.Equal(t, &model.Metricset{ Name: "service_destination", Metadata: model.Metadata{ @@ -274,18 +267,18 @@ func makeErrBatchProcessor(err error) model.BatchProcessor { return model.ProcessBatchFunc(func(context.Context, *model.Batch) error { return err }) } -func makeChanBatchProcessor(ch chan<- *model.Batch) model.BatchProcessor { +func makeChanBatchProcessor(ch chan<- model.Batch) model.BatchProcessor { return model.ProcessBatchFunc(func(ctx context.Context, batch *model.Batch) error { select { case <-ctx.Done(): return ctx.Err() - case ch <- batch: + case ch <- *batch: return nil } }) } -func expectBatch(t *testing.T, ch <-chan *model.Batch) *model.Batch { +func expectBatch(t *testing.T, ch <-chan model.Batch) model.Batch { t.Helper() select { case batch := <-ch: @@ -295,3 +288,16 @@ func expectBatch(t *testing.T, ch <-chan *model.Batch) *model.Batch { } panic("unreachable") } + +func batchMetricsets(t testing.TB, batch model.Batch) []*model.Metricset { + var metricsets []*model.Metricset + for _, event := range batch { + if event.Metricset == nil { + continue + } + require.NotZero(t, event.Metricset.Timestamp) + event.Metricset.Timestamp = time.Time{} + metricsets = append(metricsets, event.Metricset) + } + return metricsets +} diff --git a/x-pack/apm-server/aggregation/txmetrics/aggregator.go b/x-pack/apm-server/aggregation/txmetrics/aggregator.go index e44a10d8823..f4afecd46c2 100644 --- a/x-pack/apm-server/aggregation/txmetrics/aggregator.go +++ b/x-pack/apm-server/aggregation/txmetrics/aggregator.go @@ -217,19 +217,19 @@ func (a *Aggregator) publish(ctx context.Context) error { // the specific time period (date_range) on the metrics documents. now := time.Now() - metricsets := make([]*model.Metricset, 0, a.inactive.entries) + batch := make(model.Batch, 0, a.inactive.entries) for hash, entries := range a.inactive.m { for _, entry := range entries { counts, values := entry.transactionMetrics.histogramBuckets() metricset := makeMetricset(entry.transactionAggregationKey, hash, now, counts, values) - metricsets = append(metricsets, &metricset) + batch = append(batch, model.APMEvent{Metricset: &metricset}) } delete(a.inactive.m, hash) } a.inactive.entries = 0 - a.config.Logger.Debugf("publishing %d metricsets", len(metricsets)) - return a.config.BatchProcessor.ProcessBatch(ctx, &model.Batch{Metricsets: metricsets}) + a.config.Logger.Debugf("publishing %d metricsets", len(batch)) + return a.config.BatchProcessor.ProcessBatch(ctx, &batch) } // ProcessBatch aggregates all transactions contained in "b", adding to it any @@ -239,9 +239,12 @@ func (a *Aggregator) publish(ctx context.Context) error { // events, so that the metricsets requiring immediate publication can be // included in the same batch. func (a *Aggregator) ProcessBatch(ctx context.Context, b *model.Batch) error { - for _, tx := range b.Transactions { - if metricset := a.AggregateTransaction(tx); metricset != nil { - b.Metricsets = append(b.Metricsets, metricset) + for _, event := range *b { + if event.Transaction == nil { + continue + } + if metricset := a.AggregateTransaction(event.Transaction); metricset != nil { + *b = append(*b, model.APMEvent{Metricset: metricset}) } } return nil diff --git a/x-pack/apm-server/aggregation/txmetrics/aggregator_test.go b/x-pack/apm-server/aggregation/txmetrics/aggregator_test.go index f6319ad6194..84ad166fd6f 100644 --- a/x-pack/apm-server/aggregation/txmetrics/aggregator_test.go +++ b/x-pack/apm-server/aggregation/txmetrics/aggregator_test.go @@ -63,7 +63,7 @@ func TestNewAggregatorConfigInvalid(t *testing.T) { } func TestProcessTransformablesOverflow(t *testing.T) { - batches := make(chan *model.Batch, 1) + batches := make(chan model.Batch, 1) core, observed := observer.New(zapcore.DebugLevel) logger := logp.NewLogger("foo", zap.WrapCore(func(in zapcore.Core) zapcore.Core { @@ -81,34 +81,29 @@ func TestProcessTransformablesOverflow(t *testing.T) { // The first two transaction groups will not require immediate publication, // as we have configured the txmetrics with a maximum of two buckets. - var batch model.Batch - for i := 0; i < 10; i++ { - batch.Transactions = append(batch.Transactions, - &model.Transaction{Name: "foo", RepresentativeCount: 1}, - &model.Transaction{Name: "bar", RepresentativeCount: 1}, - ) + batch := make(model.Batch, 20) + for i := 0; i < len(batch); i += 2 { + batch[i].Transaction = &model.Transaction{Name: "foo", RepresentativeCount: 1} + batch[i+1].Transaction = &model.Transaction{Name: "bar", RepresentativeCount: 1} } err = agg.ProcessBatch(context.Background(), &batch) require.NoError(t, err) - assert.Empty(t, batch.Metricsets) + assert.Empty(t, batchMetricsets(t, batch)) // The third transaction group will return a metricset for immediate publication. for i := 0; i < 2; i++ { - batch.Transactions = append(batch.Transactions, &model.Transaction{ + batch = append(batch, model.APMEvent{Transaction: &model.Transaction{ Name: "baz", Duration: float64(time.Minute / time.Millisecond), RepresentativeCount: 1, - }) + }}) } err = agg.ProcessBatch(context.Background(), &batch) require.NoError(t, err) - assert.Len(t, batch.Metricsets, 2) - - for _, m := range batch.Metricsets { - require.NotNil(t, m) - require.False(t, m.Timestamp.IsZero()) + metricsets := batchMetricsets(t, batch) + assert.Len(t, metricsets, 2) - m.Timestamp = time.Time{} + for _, m := range metricsets { assert.Equal(t, &model.Metricset{ Name: "transaction", Metadata: model.Metadata{}, @@ -142,7 +137,7 @@ func TestProcessTransformablesOverflow(t *testing.T) { } func TestAggregatorRun(t *testing.T) { - batches := make(chan *model.Batch, 1) + batches := make(chan model.Batch, 1) agg, err := txmetrics.NewAggregator(txmetrics.AggregatorConfig{ BatchProcessor: makeChanBatchProcessor(batches), MaxTransactionGroups: 2, @@ -170,15 +165,16 @@ func TestAggregatorRun(t *testing.T) { defer agg.Stop(context.Background()) batch := expectBatch(t, batches) - require.Len(t, batch.Metricsets, 2) - sort.Slice(batch.Metricsets, func(i, j int) bool { - return batch.Metricsets[i].Transaction.Name < batch.Metricsets[j].Transaction.Name + metricsets := batchMetricsets(t, batch) + require.Len(t, metricsets, 2) + sort.Slice(metricsets, func(i, j int) bool { + return metricsets[i].Transaction.Name < metricsets[j].Transaction.Name }) - assert.Equal(t, "T-1000", batch.Metricsets[0].Transaction.Name) - assert.Equal(t, []int64{1000}, batch.Metricsets[0].Samples[0].Counts) - assert.Equal(t, "T-800", batch.Metricsets[1].Transaction.Name) - assert.Equal(t, []int64{800}, batch.Metricsets[1].Samples[0].Counts) + assert.Equal(t, "T-1000", metricsets[0].Transaction.Name) + assert.Equal(t, []int64{1000}, metricsets[0].Samples[0].Counts) + assert.Equal(t, "T-800", metricsets[1].Transaction.Name) + assert.Equal(t, []int64{800}, metricsets[1].Samples[0].Counts) select { case <-batches: @@ -188,7 +184,7 @@ func TestAggregatorRun(t *testing.T) { } func TestAggregatorRunPublishErrors(t *testing.T) { - batches := make(chan *model.Batch, 1) + batches := make(chan model.Batch, 1) chanBatchProcessor := makeChanBatchProcessor(batches) processBatchErr := errors.New("report failed") var batchProcessor model.ProcessBatchFunc = func(ctx context.Context, batch *model.Batch) error { @@ -237,7 +233,7 @@ func TestAggregatorRunPublishErrors(t *testing.T) { } func TestAggregateRepresentativeCount(t *testing.T) { - batches := make(chan *model.Batch, 1) + batches := make(chan model.Batch, 1) agg, err := txmetrics.NewAggregator(txmetrics.AggregatorConfig{ BatchProcessor: makeChanBatchProcessor(batches), MaxTransactionGroups: 1, @@ -304,9 +300,10 @@ func TestAggregateRepresentativeCount(t *testing.T) { // receive round(1+1.5)=3; the fractional values should not have been // truncated. batch := expectBatch(t, batches) - require.Len(t, batch.Metricsets, 1) - require.Len(t, batch.Metricsets[0].Samples, 1) - assert.Equal(t, []int64{3 /*round(1+1.5)*/}, batch.Metricsets[0].Samples[0].Counts) + metricsets := batchMetricsets(t, batch) + require.Len(t, metricsets, 1) + require.Len(t, metricsets[0].Samples, 1) + assert.Equal(t, []int64{3 /*round(1+1.5)*/}, metricsets[0].Samples[0].Counts) } func TestHDRHistogramSignificantFigures(t *testing.T) { @@ -319,7 +316,7 @@ func TestHDRHistogramSignificantFigures(t *testing.T) { func testHDRHistogramSignificantFigures(t *testing.T, sigfigs int) { t.Run(fmt.Sprintf("%d_sigfigs", sigfigs), func(t *testing.T) { - batches := make(chan *model.Batch, 1) + batches := make(chan model.Batch, 1) agg, err := txmetrics.NewAggregator(txmetrics.AggregatorConfig{ BatchProcessor: makeChanBatchProcessor(batches), MaxTransactionGroups: 2, @@ -353,16 +350,17 @@ func testHDRHistogramSignificantFigures(t *testing.T, sigfigs int) { defer agg.Stop(context.Background()) batch := expectBatch(t, batches) - require.Len(t, batch.Metricsets, 1) + metricsets := batchMetricsets(t, batch) + require.Len(t, metricsets, 1) - require.Len(t, batch.Metricsets[0].Samples, 1) - assert.Len(t, batch.Metricsets[0].Samples[0].Counts, len(batch.Metricsets[0].Samples[0].Values)) - assert.Len(t, batch.Metricsets[0].Samples[0].Counts, sigfigs) + require.Len(t, metricsets[0].Samples, 1) + assert.Len(t, metricsets[0].Samples[0].Counts, len(metricsets[0].Samples[0].Values)) + assert.Len(t, metricsets[0].Samples[0].Counts, sigfigs) }) } func TestAggregationFields(t *testing.T) { - batches := make(chan *model.Batch, 1) + batches := make(chan model.Batch, 1) agg, err := txmetrics.NewAggregator(txmetrics.AggregatorConfig{ BatchProcessor: makeChanBatchProcessor(batches), MaxTransactionGroups: 1000, @@ -438,11 +436,11 @@ func TestAggregationFields(t *testing.T) { addExpectedCount(4) batch := expectBatch(t, batches) - for _, ms := range batch.Metricsets { - ms.Timestamp = time.Time{} + metricsets := batchMetricsets(t, batch) + for _, ms := range metricsets { ms.TimeseriesInstanceID = "" } - assert.ElementsMatch(t, expected, batch.Metricsets) + assert.ElementsMatch(t, expected, metricsets) } func BenchmarkAggregateTransaction(b *testing.B) { @@ -471,18 +469,18 @@ func makeErrBatchProcessor(err error) model.ProcessBatchFunc { return func(context.Context, *model.Batch) error { return err } } -func makeChanBatchProcessor(ch chan<- *model.Batch) model.ProcessBatchFunc { +func makeChanBatchProcessor(ch chan<- model.Batch) model.ProcessBatchFunc { return func(ctx context.Context, batch *model.Batch) error { select { case <-ctx.Done(): return ctx.Err() - case ch <- batch: + case ch <- *batch: return nil } } } -func expectBatch(t *testing.T, ch <-chan *model.Batch) *model.Batch { +func expectBatch(t *testing.T, ch <-chan model.Batch) model.Batch { t.Helper() select { case batch := <-ch: @@ -492,3 +490,16 @@ func expectBatch(t *testing.T, ch <-chan *model.Batch) *model.Batch { } panic("unreachable") } + +func batchMetricsets(t testing.TB, batch model.Batch) []*model.Metricset { + var metricsets []*model.Metricset + for _, event := range batch { + if event.Metricset == nil { + continue + } + require.NotZero(t, event.Metricset.Timestamp) + event.Metricset.Timestamp = time.Time{} + metricsets = append(metricsets, event.Metricset) + } + return metricsets +} diff --git a/x-pack/apm-server/sampling/eventstorage/storage.go b/x-pack/apm-server/sampling/eventstorage/storage.go index a9716e34e53..1f7f8159dce 100644 --- a/x-pack/apm-server/sampling/eventstorage/storage.go +++ b/x-pack/apm-server/sampling/eventstorage/storage.go @@ -222,7 +222,7 @@ func (rw *ReadWriter) ReadEvents(traceID string, out *model.Batch) error { }); err != nil { return err } - out.Transactions = append(out.Transactions, &event) + *out = append(*out, model.APMEvent{Transaction: &event}) case entryMetaSpan: var event model.Span if err := item.Value(func(data []byte) error { @@ -230,7 +230,7 @@ func (rw *ReadWriter) ReadEvents(traceID string, out *model.Batch) error { }); err != nil { return err } - out.Spans = append(out.Spans, &event) + *out = append(*out, model.APMEvent{Span: &event}) default: // Unknown entry meta: ignore. continue diff --git a/x-pack/apm-server/sampling/eventstorage/storage_bench_test.go b/x-pack/apm-server/sampling/eventstorage/storage_bench_test.go index 90a9e60a527..3a8c5834798 100644 --- a/x-pack/apm-server/sampling/eventstorage/storage_bench_test.go +++ b/x-pack/apm-server/sampling/eventstorage/storage_bench_test.go @@ -84,14 +84,14 @@ func BenchmarkReadEvents(b *testing.B) { b.ResetTimer() var batch model.Batch for i := 0; i < b.N; i++ { - batch.Reset() + batch = batch[:0] if err := readWriter.ReadEvents(traceUUID.String(), &batch); err != nil { b.Fatal(err) } - if batch.Len() != count { + if len(batch) != count { panic(fmt.Errorf( "event count mismatch: expected %d, got %d", - count, batch.Len(), + count, len(batch), )) } } diff --git a/x-pack/apm-server/sampling/eventstorage/storage_test.go b/x-pack/apm-server/sampling/eventstorage/storage_test.go index b238a7fc760..88b4a3d923d 100644 --- a/x-pack/apm-server/sampling/eventstorage/storage_test.go +++ b/x-pack/apm-server/sampling/eventstorage/storage_test.go @@ -48,7 +48,7 @@ func testWriteEvents(t *testing.T, numSpans int) { } assert.NoError(t, readWriter.WriteTransaction(transaction)) - var spans []*model.Span + var spanEvents []model.APMEvent for i := 0; i < numSpans; i++ { spanUUID := uuid.Must(uuid.NewV4()) span := &model.Span{ @@ -56,21 +56,19 @@ func testWriteEvents(t *testing.T, numSpans int) { ID: spanUUID.String(), } assert.NoError(t, readWriter.WriteSpan(span)) - spans = append(spans, span) + spanEvents = append(spanEvents, model.APMEvent{Span: span}) } afterWrite := time.Now() // We can read our writes without flushing. var batch model.Batch assert.NoError(t, readWriter.ReadEvents(traceUUID.String(), &batch)) - assert.ElementsMatch(t, []*model.Transaction{transaction}, batch.Transactions) - assert.ElementsMatch(t, spans, batch.Spans) + assert.ElementsMatch(t, append(spanEvents, model.APMEvent{Transaction: transaction}), batch) // Flush in order for the writes to be visible to other readers. assert.NoError(t, readWriter.Flush()) - var recordedTransactions []*model.Transaction - var recordedSpans []*model.Span + var recorded []model.APMEvent assert.NoError(t, db.View(func(txn *badger.Txn) error { iter := txn.NewIterator(badger.IteratorOptions{ Prefix: []byte(traceUUID.String()), @@ -98,11 +96,11 @@ func testWriteEvents(t *testing.T, numSpans int) { switch meta := item.UserMeta(); meta { case 'T': tx := &model.Transaction{} - recordedTransactions = append(recordedTransactions, tx) + recorded = append(recorded, model.APMEvent{Transaction: tx}) value = tx case 'S': span := &model.Span{} - recordedSpans = append(recordedSpans, span) + recorded = append(recorded, model.APMEvent{Span: span}) value = span default: t.Fatalf("invalid meta %q", meta) @@ -113,8 +111,7 @@ func testWriteEvents(t *testing.T, numSpans int) { } return nil })) - assert.ElementsMatch(t, batch.Spans, recordedSpans) - assert.ElementsMatch(t, batch.Transactions, recordedTransactions) + assert.ElementsMatch(t, batch, recorded) } func TestWriteTraceSampled(t *testing.T) { @@ -208,8 +205,10 @@ func TestReadEvents(t *testing.T) { var events model.Batch assert.NoError(t, reader.ReadEvents(string(traceID[:]), &events)) - assert.Equal(t, []*model.Transaction{{Name: "transaction"}}, events.Transactions) - assert.Equal(t, []*model.Span{{Name: "span"}}, events.Spans) + assert.Equal(t, model.Batch{ + {Transaction: &model.Transaction{Name: "transaction"}}, + {Span: &model.Span{Name: "span"}}, + }, events) } func TestReadEventsDecodeError(t *testing.T) { diff --git a/x-pack/apm-server/sampling/processor.go b/x-pack/apm-server/sampling/processor.go index d4561bbbecf..c1d53ffc706 100644 --- a/x-pack/apm-server/sampling/processor.go +++ b/x-pack/apm-server/sampling/processor.go @@ -146,42 +146,41 @@ func (p *Processor) CollectMonitoring(_ monitoring.Mode, V monitoring.Visitor) { // // All other trace events will either be dropped (e.g. known to not // be tail-sampled), or stored for possible later publication. -func (p *Processor) ProcessBatch(ctx context.Context, events *model.Batch) error { +func (p *Processor) ProcessBatch(ctx context.Context, batch *model.Batch) error { p.storageMu.RLock() defer p.storageMu.RUnlock() if p.storage == nil { return ErrStopped } - for i := 0; i < len(events.Transactions); i++ { - atomic.AddInt64(&p.eventMetrics.processed, 1) - report, stored, err := p.processTransaction(events.Transactions[i]) - if err != nil { - return err - } - if !report { - // We shouldn't report this event, so remove it from the slice. - n := len(events.Transactions) - events.Transactions[i], events.Transactions[n-1] = events.Transactions[n-1], events.Transactions[i] - events.Transactions = events.Transactions[:n-1] - i-- - } - p.updateProcessorMetrics(report, stored) - } - for i := 0; i < len(events.Spans); i++ { - atomic.AddInt64(&p.eventMetrics.processed, 1) - report, stored, err := p.processSpan(events.Spans[i]) - if err != nil { - return err + events := *batch + for i := 0; i < len(events); i++ { + event := events[i] + var report, stored bool + if event.Transaction != nil { + var err error + atomic.AddInt64(&p.eventMetrics.processed, 1) + report, stored, err = p.processTransaction(event.Transaction) + if err != nil { + return err + } + } else if event.Span != nil { + var err error + atomic.AddInt64(&p.eventMetrics.processed, 1) + report, stored, err = p.processSpan(event.Span) + if err != nil { + return err + } } if !report { // We shouldn't report this event, so remove it from the slice. - n := len(events.Spans) - events.Spans[i], events.Spans[n-1] = events.Spans[n-1], events.Spans[i] - events.Spans = events.Spans[:n-1] + n := len(events) + events[i], events[n-1] = events[n-1], events[i] + events = events[:n-1] i-- } p.updateProcessorMetrics(report, stored) } + *batch = events return nil } @@ -449,7 +448,7 @@ func (p *Processor) Run() error { if err := p.storage.ReadEvents(traceID, &events); err != nil { return err } - if n := events.Len(); n > 0 { + if n := len(events); n > 0 { p.logger.Debugf("reporting %d events", n) if remoteDecision { // Remote decisions may be received multiple times, @@ -458,14 +457,15 @@ func (p *Processor) Run() error { // deleted. We delete events from local storage so // we don't publish duplicates; delivery is therefore // at-most-once, not guaranteed. - for _, tx := range events.Transactions { - if err := p.storage.DeleteTransaction(tx); err != nil { - return errors.Wrap(err, "failed to delete transaction from local storage") - } - } - for _, span := range events.Spans { - if err := p.storage.DeleteSpan(span); err != nil { - return errors.Wrap(err, "failed to delete span from local storage") + for _, event := range events { + if event.Transaction != nil { + if err := p.storage.DeleteTransaction(event.Transaction); err != nil { + return errors.Wrap(err, "failed to delete transaction from local storage") + } + } else if event.Span != nil { + if err := p.storage.DeleteSpan(event.Span); err != nil { + return errors.Wrap(err, "failed to delete span from local storage") + } } } } diff --git a/x-pack/apm-server/sampling/processor_bench_test.go b/x-pack/apm-server/sampling/processor_bench_test.go index 39c56507577..2a20fc0034d 100644 --- a/x-pack/apm-server/sampling/processor_bench_test.go +++ b/x-pack/apm-server/sampling/processor_bench_test.go @@ -47,10 +47,13 @@ func BenchmarkProcess(b *testing.B) { ID: hex.EncodeToString(spanID), ParentID: spanParentID, } - if err := processor.ProcessBatch(context.Background(), &model.Batch{ - Transactions: []*model.Transaction{transaction}, - Spans: []*model.Span{span, span, span}, - }); err != nil { + batch := model.Batch{ + {Transaction: transaction}, + {Span: span}, + {Span: span}, + {Span: span}, + } + if err := processor.ProcessBatch(context.Background(), &batch); err != nil { b.Fatal(err) } } diff --git a/x-pack/apm-server/sampling/processor_test.go b/x-pack/apm-server/sampling/processor_test.go index 78ac79bfdcd..8d964b21c2f 100644 --- a/x-pack/apm-server/sampling/processor_test.go +++ b/x-pack/apm-server/sampling/processor_test.go @@ -36,15 +36,15 @@ func TestProcessUnsampled(t *testing.T) { go processor.Run() defer processor.Stop(context.Background()) - in := &model.Batch{ - Transactions: []*model.Transaction{{ + in := model.Batch{{ + Transaction: &model.Transaction{ TraceID: "0102030405060708090a0b0c0d0e0f10", ID: "0102030405060708", Sampled: newBool(false), - }}, - } - out := cloneBatch(in) - err = processor.ProcessBatch(context.Background(), out) + }, + }} + out := in[:] + err = processor.ProcessBatch(context.Background(), &out) require.NoError(t, err) // Unsampled transaction should be reported immediately. @@ -94,19 +94,21 @@ func TestProcessAlreadyTailSampled(t *testing.T) { ID: "0102030405060711", } - batch := &model.Batch{ - Transactions: []*model.Transaction{transaction1, transaction2}, - Spans: []*model.Span{span1, span2}, + batch := model.Batch{ + {Transaction: transaction1}, + {Transaction: transaction2}, + {Span: span1}, + {Span: span2}, } - err = processor.ProcessBatch(context.Background(), batch) + err = processor.ProcessBatch(context.Background(), &batch) require.NoError(t, err) // Tail sampling decision already made. The first transaction and span should be // reported immediately, whereas the second ones should be written storage since // they were received after the trace sampling entry expired. - assert.Equal(t, &model.Batch{ - Transactions: []*model.Transaction{transaction1}, - Spans: []*model.Span{span1}, + assert.Equal(t, model.Batch{ + {Transaction: transaction1}, + {Span: span1}, }, batch) expectedMonitoring := monitoring.MakeFlatSnapshot() @@ -130,8 +132,8 @@ func TestProcessAlreadyTailSampled(t *testing.T) { err = reader.ReadEvents(traceID2, &batch) assert.NoError(t, err) assert.Equal(t, model.Batch{ - Spans: []*model.Span{span2}, - Transactions: []*model.Transaction{transaction2}, + {Transaction: transaction2}, + {Span: span2}, }, batch) }) } @@ -149,37 +151,34 @@ func TestProcessLocalTailSampling(t *testing.T) { traceID1 := "0102030405060708090a0b0c0d0e0f10" traceID2 := "0102030405060708090a0b0c0d0e0f11" trace1Events := model.Batch{ - Transactions: []*model.Transaction{{ + {Transaction: &model.Transaction{ TraceID: traceID1, ID: "0102030405060708", Duration: 123, }}, - Spans: []*model.Span{{ + {Span: &model.Span{ TraceID: traceID1, ID: "0102030405060709", Duration: 123, }}, } trace2Events := model.Batch{ - Transactions: []*model.Transaction{{ + {Transaction: &model.Transaction{ TraceID: traceID2, ID: "0102030405060710", Duration: 456, }}, - Spans: []*model.Span{{ + {Span: &model.Span{ TraceID: traceID2, ID: "0102030405060711", Duration: 456, }}, } - in := &model.Batch{ - Transactions: append(trace1Events.Transactions[:], trace2Events.Transactions...), - Spans: append(trace1Events.Spans[:], trace2Events.Spans...), - } - err = processor.ProcessBatch(context.Background(), in) + in := append(trace1Events[:], trace2Events...) + err = processor.ProcessBatch(context.Background(), &in) require.NoError(t, err) - assert.Equal(t, 0, in.Len()) + assert.Empty(t, in) // Start periodic tail-sampling. We start the processor after processing // events to ensure all events are processed before any local sampling @@ -242,7 +241,7 @@ func TestProcessLocalTailSampling(t *testing.T) { // Even though the trace is unsampled, the events will be // available in storage until the TTL expires, as they're // written there first. - batch.Reset() + batch = batch[:0] err = reader.ReadEvents(unsampledTraceID, &batch) assert.NoError(t, err) assert.Equal(t, unsampledTraceEvents, batch) @@ -262,16 +261,16 @@ func TestProcessLocalTailSamplingUnsampled(t *testing.T) { for i := range traceIDs { traceID := uuid.Must(uuid.NewV4()).String() traceIDs[i] = traceID - batch := model.Batch{ - Transactions: []*model.Transaction{{ + batch := model.Batch{{ + Transaction: &model.Transaction{ TraceID: traceID, ID: traceID, Duration: 1, - }}, - } + }, + }} err := processor.ProcessBatch(context.Background(), &batch) require.NoError(t, err) - assert.Equal(t, 0, batch.Len()) + assert.Empty(t, batch) } // Stop the processor so we can access the database. @@ -320,12 +319,12 @@ func TestProcessLocalTailSamplingPolicyOrder(t *testing.T) { rng := rand.New(rand.NewSource(0)) metadata := model.Metadata{Service: model.Service{Name: "service_name"}} numTransactions := 100 - events := &model.Batch{Transactions: make([]*model.Transaction, numTransactions)} - for i := 0; i < 100; i++ { + events := make(model.Batch, numTransactions) + for i := range events { var traceIDBytes [16]byte _, err := rng.Read(traceIDBytes[:]) require.NoError(t, err) - events.Transactions[i] = &model.Transaction{ + events[i].Transaction = &model.Transaction{ Metadata: metadata, Name: "trace_name", TraceID: fmt.Sprintf("%x", traceIDBytes[:]), @@ -334,9 +333,9 @@ func TestProcessLocalTailSamplingPolicyOrder(t *testing.T) { } } - err = processor.ProcessBatch(context.Background(), events) + err = processor.ProcessBatch(context.Background(), &events) require.NoError(t, err) - assert.Equal(t, 0, events.Len()) + assert.Empty(t, events) // Start periodic tail-sampling. We start the processor after processing // events to ensure all events are processed before any local sampling @@ -374,12 +373,12 @@ func TestProcessRemoteTailSampling(t *testing.T) { subscriber := pubsubtest.SubscriberChan(subscriberChan) config.Elasticsearch = pubsubtest.Client(publisher, subscriber) - reported := make(chan *model.Batch) + reported := make(chan model.Batch) config.BatchProcessor = model.ProcessBatchFunc(func(ctx context.Context, batch *model.Batch) error { select { case <-ctx.Done(): return ctx.Err() - case reported <- batch: + case reported <- *batch: return nil } }) @@ -391,18 +390,18 @@ func TestProcessRemoteTailSampling(t *testing.T) { traceID1 := "0102030405060708090a0b0c0d0e0f10" traceID2 := "0102030405060708090a0b0c0d0e0f11" - trace1Events := &model.Batch{ - Spans: []*model.Span{{ + trace1Events := model.Batch{{ + Span: &model.Span{ TraceID: traceID1, ID: "0102030405060709", Duration: 123, - }}, - } + }, + }} - in := cloneBatch(trace1Events) - err = processor.ProcessBatch(context.Background(), in) + in := trace1Events[:] + err = processor.ProcessBatch(context.Background(), &in) require.NoError(t, err) - assert.Equal(t, 0, in.Len()) + assert.Empty(t, in) // Simulate receiving remote sampling decisions multiple times, // to show that we don't report duplicate events. @@ -411,7 +410,7 @@ func TestProcessRemoteTailSampling(t *testing.T) { subscriberChan <- traceID2 subscriberChan <- traceID1 - var events *model.Batch + var events model.Batch select { case events = <-reported: case <-time.After(10 * time.Second): @@ -456,7 +455,7 @@ func TestProcessRemoteTailSampling(t *testing.T) { batch = model.Batch{} err = reader.ReadEvents(traceID2, &batch) assert.NoError(t, err) - assert.Zero(t, batch) + assert.Empty(t, batch) }) } @@ -472,16 +471,16 @@ func TestGroupsMonitoring(t *testing.T) { defer processor.Stop(context.Background()) for i := 0; i < config.MaxDynamicServices+1; i++ { - err := processor.ProcessBatch(context.Background(), &model.Batch{ - Transactions: []*model.Transaction{{ + err := processor.ProcessBatch(context.Background(), &model.Batch{{ + Transaction: &model.Transaction{ Metadata: model.Metadata{ Service: model.Service{Name: fmt.Sprintf("service_%d", i)}, }, TraceID: uuid.Must(uuid.NewV4()).String(), ID: "0102030405060709", Duration: 123, - }}, - }) + }, + }}) require.NoError(t, err) } @@ -502,16 +501,16 @@ func TestStorageMonitoring(t *testing.T) { defer processor.Stop(context.Background()) for i := 0; i < 100; i++ { traceID := uuid.Must(uuid.NewV4()).String() - batch := model.Batch{ - Transactions: []*model.Transaction{{ + batch := model.Batch{{ + Transaction: &model.Transaction{ TraceID: traceID, ID: traceID, Duration: 123, - }}, - } + }, + }} err := processor.ProcessBatch(context.Background(), &batch) require.NoError(t, err) - assert.Equal(t, 0, batch.Len()) + assert.Empty(t, batch) } // Stop the processor and create a new one, which will reopen storage @@ -545,16 +544,16 @@ func TestStorageGC(t *testing.T) { defer processor.Stop(context.Background()) for i := 0; i < n; i++ { traceID := uuid.Must(uuid.NewV4()).String() - batch := model.Batch{ - Spans: []*model.Span{{ + batch := model.Batch{{ + Span: &model.Span{ TraceID: traceID, ID: traceID, Duration: 123, - }}, - } + }, + }} err := processor.ProcessBatch(context.Background(), &batch) require.NoError(t, err) - assert.Equal(t, 0, batch.Len()) + assert.Empty(t, batch) } } @@ -721,13 +720,6 @@ func newBool(v bool) *bool { return &v } -func cloneBatch(in *model.Batch) *model.Batch { - return &model.Batch{ - Transactions: in.Transactions[:], - Spans: in.Spans[:], - } -} - // waitFileModified waits up to 10 seconds for filename to exist and for its // modification time to be greater than "after", and returns the file content // and file info (including modification time).