diff --git a/beater/api/intake/handler_test.go b/beater/api/intake/handler_test.go index 875d5d658aa..8a19f4b0d58 100644 --- a/beater/api/intake/handler_test.go +++ b/beater/api/intake/handler_test.go @@ -95,7 +95,12 @@ func TestIntakeHandler(t *testing.T) { code: http.StatusAccepted, id: request.IDResponseValidAccepted, }, "TooLarge": { - path: "errors.ndjson", processor: &stream.Processor{}, + path: "errors.ndjson", + processor: func() *stream.Processor { + p := stream.BackendProcessor(config.DefaultConfig()) + p.MaxEventSize = 10 + return p + }(), code: http.StatusBadRequest, id: request.IDResponseErrorsRequestTooLarge}, "Closing": { path: "errors.ndjson", reporter: beatertest.ErrorReporterFn(publish.ErrChannelClosed), diff --git a/beater/api/intake/test_approved/BodyReader.approved.json b/beater/api/intake/test_approved/BodyReader.approved.json index fc636bfdcb3..116bbf02348 100644 --- a/beater/api/intake/test_approved/BodyReader.approved.json +++ b/beater/api/intake/test_approved/BodyReader.approved.json @@ -2,7 +2,7 @@ "accepted": 0, "errors": [ { - "message": "EOF while reading metadata" + "message": "validation error: 'metadata' required" } ] } diff --git a/beater/api/intake/test_approved/InvalidEvent.approved.json b/beater/api/intake/test_approved/InvalidEvent.approved.json index 703797831cb..b18aa4919f9 100644 --- a/beater/api/intake/test_approved/InvalidEvent.approved.json +++ b/beater/api/intake/test_approved/InvalidEvent.approved.json @@ -3,7 +3,7 @@ "errors": [ { "document": "{ \"transaction\": { \"id\": 12345, \"trace_id\": \"0123456789abcdef0123456789abcdef\", \"parent_id\": \"abcdefabcdef01234567\", \"type\": \"request\", \"duration\": 32.592981, \"span_count\": { \"started\": 21 } } } ", - "message": "failed to validate transaction: error validating JSON: I[#] S[#] doesn't validate with \"transaction#\"\n I[#] S[#/allOf/3] allOf failed\n I[#/id] S[#/allOf/3/properties/id/type] expected string, but got number" + "message": "decode error: data read error: v2.transactionRoot.Transaction: v2.transaction.ID: ReadString: expects \" or n, but found 1, error found in #10 byte of ...| { \"id\": 12345, \"tra|..., bigger context ...|{ \"transaction\": { \"id\": 12345, \"trace_id\": \"0123456789abcdef0123456789abcde|..." } ] } diff --git a/beater/api/intake/test_approved/InvalidJSONEvent.approved.json b/beater/api/intake/test_approved/InvalidJSONEvent.approved.json index ac1e798d6f8..ae54db10f8d 100644 --- a/beater/api/intake/test_approved/InvalidJSONEvent.approved.json +++ b/beater/api/intake/test_approved/InvalidJSONEvent.approved.json @@ -3,7 +3,7 @@ "errors": [ { "document": "{ \"invalid-json\" }", - "message": "data read error: invalid character '}' after object key" + "message": "invalid-json: did not recognize object type" } ] } diff --git a/beater/api/intake/test_approved/InvalidJSONMetadata.approved.json b/beater/api/intake/test_approved/InvalidJSONMetadata.approved.json index b1f27fbb794..8481bde2ec4 100644 --- a/beater/api/intake/test_approved/InvalidJSONMetadata.approved.json +++ b/beater/api/intake/test_approved/InvalidJSONMetadata.approved.json @@ -3,7 +3,7 @@ "errors": [ { "document": "{\"metadata\": {\"invalid-json\"}}", - "message": "data read error: invalid character '}' after object key" + "message": "decode error: data read error: v2.metadataRoot.Metadata: v2.metadata.readFieldHash: expect :, but found }, error found in #10 byte of ...|lid-json\"}}|..., bigger context ...|{\"metadata\": {\"invalid-json\"}}|..." } ] } diff --git a/beater/api/intake/test_approved/InvalidMetadata.approved.json b/beater/api/intake/test_approved/InvalidMetadata.approved.json index da67acd9c67..b36495f24f1 100644 --- a/beater/api/intake/test_approved/InvalidMetadata.approved.json +++ b/beater/api/intake/test_approved/InvalidMetadata.approved.json @@ -3,7 +3,7 @@ "errors": [ { "document": "{\"metadata\": {\"user\": null}}", - "message": "failed to validate metadata: error validating JSON: I[#] S[#] doesn't validate with \"metadata#\"\n I[#] S[#/required] missing properties: \"service\"" + "message": "validation error: 'metadata' required" } ] } diff --git a/beater/api/intake/test_approved/InvalidMetadata2.approved.json b/beater/api/intake/test_approved/InvalidMetadata2.approved.json index 26640294ec7..3b487d4e70f 100644 --- a/beater/api/intake/test_approved/InvalidMetadata2.approved.json +++ b/beater/api/intake/test_approved/InvalidMetadata2.approved.json @@ -3,7 +3,7 @@ "errors": [ { "document": "{\"not\": \"metadata\"}", - "message": "did not recognize object type" + "message": "validation error: 'metadata' required" } ] } diff --git a/beater/api/intake/test_approved/TooLarge.approved.json b/beater/api/intake/test_approved/TooLarge.approved.json index bb3e5059a90..c18fbfeef24 100644 --- a/beater/api/intake/test_approved/TooLarge.approved.json +++ b/beater/api/intake/test_approved/TooLarge.approved.json @@ -2,6 +2,7 @@ "accepted": 0, "errors": [ { + "document": "{\"metadata", "message": "event exceeded the permitted size." } ] diff --git a/beater/api/intake/test_approved/UnrecognizedEvent.approved.json b/beater/api/intake/test_approved/UnrecognizedEvent.approved.json index 0dfff3f52d8..a3231874cb8 100644 --- a/beater/api/intake/test_approved/UnrecognizedEvent.approved.json +++ b/beater/api/intake/test_approved/UnrecognizedEvent.approved.json @@ -3,7 +3,7 @@ "errors": [ { "document": "{\"tennis-court\": {\"name\": \"Centre Court, Wimbledon\"}}", - "message": "did not recognize object type" + "message": "tennis-court: did not recognize object type" } ] } diff --git a/beater/api/profile/handler.go b/beater/api/profile/handler.go index 86c0fd95d39..ebe1335441d 100644 --- a/beater/api/profile/handler.go +++ b/beater/api/profile/handler.go @@ -32,10 +32,9 @@ import ( "github.com/elastic/apm-server/beater/request" "github.com/elastic/apm-server/decoder" "github.com/elastic/apm-server/model" - "github.com/elastic/apm-server/model/modeldecoder" + v2 "github.com/elastic/apm-server/model/modeldecoder/v2" "github.com/elastic/apm-server/publish" "github.com/elastic/apm-server/transform" - "github.com/elastic/apm-server/validation" ) var ( @@ -104,26 +103,19 @@ func Handler(report publish.Reporter) request.Handler { } } r := &decoder.LimitedReader{R: part, N: metadataContentLengthLimit} - raw, err := decoder.DecodeJSONData(r) - if err != nil { + dec := decoder.NewJSONDecoder(r) + metadata := model.Metadata{ + UserAgent: model.UserAgent{Original: c.RequestMetadata.UserAgent}, + Client: model.Client{IP: c.RequestMetadata.ClientIP}, + System: model.System{IP: c.RequestMetadata.SystemIP}} + if err := v2.DecodeMetadata(dec, &metadata); err != nil { if r.N < 0 { return nil, requestError{ id: request.IDResponseErrorsRequestTooLarge, err: err, } } - return nil, requestError{ - id: request.IDResponseErrorsDecode, - err: errors.Wrap(err, "failed to decode metadata JSON"), - } - } - metadata := model.Metadata{ - UserAgent: model.UserAgent{Original: c.RequestMetadata.UserAgent}, - Client: model.Client{IP: c.RequestMetadata.ClientIP}, - System: model.System{IP: c.RequestMetadata.SystemIP}} - if err := modeldecoder.DecodeMetadata(raw, false, &metadata); err != nil { - var ve *validation.Error - if errors.As(err, &ve) { + if _, ok := err.(v2.ValidationError); ok { return nil, requestError{ id: request.IDResponseErrorsValidate, err: errors.Wrap(err, "invalid metadata"), @@ -131,7 +123,7 @@ func Handler(report publish.Reporter) request.Handler { } return nil, requestError{ id: request.IDResponseErrorsDecode, - err: errors.Wrap(err, "failed to decode metadata"), + err: errors.Wrap(err, "invalid metadata"), } } profileMetadata = metadata diff --git a/beater/api/profile/handler_test.go b/beater/api/profile/handler_test.go index ce5c19518fa..1004028588b 100644 --- a/beater/api/profile/handler_test.go +++ b/beater/api/profile/handler_test.go @@ -128,16 +128,16 @@ func TestHandler(t *testing.T) { id: request.IDResponseValidAccepted, parts: []part{ heapProfilePart(), - part{ + { name: "profile", // No messageType param specified, so pprof is assumed. contentType: "application/x-protobuf", body: heapProfileBody(), }, - part{ + { name: "metadata", contentType: "application/json", - body: strings.NewReader(`{"service":{"name":"foo","agent":{"name":"go","version":"1.0"}}}`), + body: strings.NewReader(`{"service":{"name":"foo","agent":{"name":"java","version":"1.2.0"}}}`), }, }, body: prettyJSON(map[string]interface{}{"accepted": 2}), diff --git a/beater/test_approved_es_documents/TestPublishIntegrationErrors.approved.json b/beater/test_approved_es_documents/TestPublishIntegrationErrors.approved.json index 3e8f82f3a10..5c72cddb4d7 100644 --- a/beater/test_approved_es_documents/TestPublishIntegrationErrors.approved.json +++ b/beater/test_approved_es_documents/TestPublishIntegrationErrors.approved.json @@ -259,7 +259,7 @@ "Mozilla Chrome Edge" ] }, - "method": "post", + "method": "POST", "referrer": "http://localhost:8000/test/e2e/", "socket": { "encrypted": true, diff --git a/beater/test_approved_es_documents/TestPublishIntegrationEvents.approved.json b/beater/test_approved_es_documents/TestPublishIntegrationEvents.approved.json index 4a64a72f1ac..2e4175201c6 100644 --- a/beater/test_approved_es_documents/TestPublishIntegrationEvents.approved.json +++ b/beater/test_approved_es_documents/TestPublishIntegrationEvents.approved.json @@ -62,7 +62,7 @@ "MozillaChromeEdge" ] }, - "method": "post", + "method": "POST", "socket": { "encrypted": true, "remote_address": "12.53.12.1:8080" @@ -294,7 +294,7 @@ "us": 3781 }, "http": { - "method": "get", + "method": "GET", "response": { "decoded_body_size": 401, "encoded_body_size": 356, @@ -666,7 +666,7 @@ "opbeans-java:3000" ] }, - "method": "post", + "method": "POST", "socket": { "encrypted": true, "remote_address": "12.53.12.1" diff --git a/beater/test_approved_es_documents/TestPublishIntegrationSpans.approved.json b/beater/test_approved_es_documents/TestPublishIntegrationSpans.approved.json index 70168c0b4a2..170e9a36b5b 100644 --- a/beater/test_approved_es_documents/TestPublishIntegrationSpans.approved.json +++ b/beater/test_approved_es_documents/TestPublishIntegrationSpans.approved.json @@ -646,7 +646,7 @@ "us": 3781 }, "http": { - "method": "get", + "method": "GET", "response": { "decoded_body_size": 401, "encoded_body_size": 356, diff --git a/beater/test_approved_es_documents/TestPublishIntegrationTransactions.approved.json b/beater/test_approved_es_documents/TestPublishIntegrationTransactions.approved.json index 25f5af42bf6..c08ccbeed90 100644 --- a/beater/test_approved_es_documents/TestPublishIntegrationTransactions.approved.json +++ b/beater/test_approved_es_documents/TestPublishIntegrationTransactions.approved.json @@ -212,7 +212,7 @@ "Mozilla Chrome Edge" ] }, - "method": "post", + "method": "POST", "referrer": "http://localhost:8000/test/e2e/", "socket": { "encrypted": true, @@ -395,7 +395,7 @@ }, "http": { "request": { - "method": "post", + "method": "POST", "socket": { "remote_address": "192.0.1" } diff --git a/changelogs/head.asciidoc b/changelogs/head.asciidoc index 5a00b5fcd5d..5fa1dfdc8d8 100644 --- a/changelogs/head.asciidoc +++ b/changelogs/head.asciidoc @@ -15,6 +15,7 @@ https://github.com/elastic/apm-server/compare/7.9\...master[View commits] [float] ==== Intake API Changes +* Changed error messages for invalid events due to internal changes of decoder logic {pull}4261[4261] [float] ==== Added diff --git a/decoder/decoder.go b/decoder/decoder.go index 439bd90c2f6..fbe6d02e537 100644 --- a/decoder/decoder.go +++ b/decoder/decoder.go @@ -24,22 +24,21 @@ import ( ) //TODO(simitt): look into config options for performance tuning -var jsonit = jsoniter.ConfigCompatibleWithStandardLibrary +var json = jsoniter.ConfigCompatibleWithStandardLibrary type Decoder interface { Decode(v interface{}) error } -// JSONIterDecoder can decode from a given reader, using jsoniter -// TODO(simitt): rename to JSONDecoder when everything is integrated -type JSONIterDecoder struct { +type JSONDecoder struct { *jsoniter.Decoder + reader io.Reader } -// NewJSONIteratorDecoder returns a *json.Decoder where numbers are unmarshaled +// NewJSONDecoder returns a *json.Decoder where numbers are unmarshaled // as a Number instead of a float64 into an interface{} -func NewJSONIteratorDecoder(r io.Reader) JSONIterDecoder { - d := jsonit.NewDecoder(r) +func NewJSONDecoder(r io.Reader) JSONDecoder { + d := json.NewDecoder(r) d.UseNumber() - return JSONIterDecoder{Decoder: d} + return JSONDecoder{Decoder: d, reader: r} } diff --git a/decoder/req_decoder_test.go b/decoder/decoder_test.go similarity index 92% rename from decoder/req_decoder_test.go rename to decoder/decoder_test.go index 905e08f8034..3e81bea2f57 100644 --- a/decoder/req_decoder_test.go +++ b/decoder/decoder_test.go @@ -28,9 +28,11 @@ import ( ) func TestDecodeJSONData(t *testing.T) { - decoded, err := decoder.DecodeJSONData(strings.NewReader( + d := decoder.NewJSONDecoder(strings.NewReader( `{"id":"85925e55b43f4342","system": {"hostname":"prod1.example.com"},"number":123}`, )) + var decoded map[string]interface{} + err := d.Decode(&decoded) assert.Nil(t, err) assert.Equal(t, map[string]interface{}{ "id": "85925e55b43f4342", diff --git a/decoder/req_decoder.go b/decoder/req_decoder.go index 3bc930644e4..b5274d842ac 100644 --- a/decoder/req_decoder.go +++ b/decoder/req_decoder.go @@ -20,7 +20,6 @@ package decoder import ( "compress/gzip" "compress/zlib" - "encoding/json" "io" "net/http" @@ -85,18 +84,3 @@ func CompressedRequestReader(req *http.Request) (io.ReadCloser, error) { readerCounter.Inc() return reader, nil } - -func DecodeJSONData(reader io.Reader) (map[string]interface{}, error) { - v := make(map[string]interface{}) - d := NewJSONDecoder(reader) - if err := d.Decode(&v); err != nil { - return nil, err - } - return v, nil -} - -func NewJSONDecoder(r io.Reader) *json.Decoder { - d := json.NewDecoder(r) - d.UseNumber() - return d -} diff --git a/decoder/stream_decoder.go b/decoder/stream_decoder.go index 7abf66d940c..22bc001cdac 100644 --- a/decoder/stream_decoder.go +++ b/decoder/stream_decoder.go @@ -20,8 +20,9 @@ package decoder import ( "bufio" "bytes" - "encoding/json" "io" + + jsoniter "github.com/json-iterator/go" ) // NewNDJSONStreamDecoder returns a new NDJSONStreamDecoder which decodes @@ -40,9 +41,10 @@ type NDJSONStreamDecoder struct { lineReader *LineReader isEOF bool + latestError error latestLine []byte latestLineReader bytes.Reader - decoder *json.Decoder + decoder *jsoniter.Decoder } // Reset sets sr's underlying io.Reader to r, and resets any reading/decoding state. @@ -51,35 +53,46 @@ func (dec *NDJSONStreamDecoder) Reset(r io.Reader) { dec.lineReader.Reset(dec.bufioReader) dec.isEOF = false dec.latestLine = nil - dec.latestLineReader.Reset(nil) + dec.resetLatestLineReader() } func (dec *NDJSONStreamDecoder) resetDecoder() { - dec.decoder = NewJSONDecoder(&dec.latestLineReader) + dec.decoder = json.NewDecoder(&dec.latestLineReader) + dec.decoder.UseNumber() } // Decode decodes the next line into v. func (dec *NDJSONStreamDecoder) Decode(v interface{}) error { - buf, readErr := dec.readLine() - if len(buf) == 0 || (readErr != nil && !dec.isEOF) { - return readErr + defer dec.resetLatestLineReader() + if dec.latestLineReader.Size() == 0 { + dec.ReadAhead() + } + if len(dec.latestLine) == 0 || (dec.latestError != nil && !dec.isEOF) { + return dec.latestError } if err := dec.decoder.Decode(v); err != nil { dec.resetDecoder() // clear out decoding state return JSONDecodeError("data read error: " + err.Error()) } - return readErr // this might be io.EOF + return dec.latestError // this might be io.EOF } -func (dec *NDJSONStreamDecoder) readLine() ([]byte, error) { +// ReadAhead reads the next NDJSON line, buffering it for a subsequent call to Decode. +func (dec *NDJSONStreamDecoder) ReadAhead() ([]byte, error) { // readLine can return valid data in `buf` _and_ also an io.EOF line, readErr := dec.lineReader.ReadLine() dec.latestLine = line dec.latestLineReader.Reset(dec.latestLine) + dec.latestError = readErr dec.isEOF = readErr == io.EOF return line, readErr } +func (dec *NDJSONStreamDecoder) resetLatestLineReader() { + dec.latestLineReader.Reset(nil) + dec.latestError = nil +} + // IsEOF signals whether the underlying reader reached the end func (dec *NDJSONStreamDecoder) IsEOF() bool { return dec.isEOF } diff --git a/decoder/stream_decoder_test.go b/decoder/stream_decoder_test.go index f17272ce0f9..bba8abce03e 100644 --- a/decoder/stream_decoder_test.go +++ b/decoder/stream_decoder_test.go @@ -19,6 +19,8 @@ package decoder import ( "bytes" + "fmt" + "io" "strings" "testing" @@ -49,8 +51,8 @@ func TestNDStreamReader(t *testing.T) { latestLine: `{"key": "value2", "t`, }, { - out: nil, - errPattern: "invalid character", + out: map[string]interface{}{}, + errPattern: "data read error", latestLine: `{invalid-json}`, }, { @@ -64,20 +66,49 @@ func TestNDStreamReader(t *testing.T) { n := NewNDJSONStreamDecoder(buf, 20) for idx, test := range expected { - var out map[string]interface{} - err := n.Decode(&out) - assert.Equal(t, test.out, out, "Failed at idx %v", idx) - if test.errPattern == "" { - assert.Nil(t, err) - } else { - require.NotNil(t, err, "Failed at idx %v", idx) - assert.Contains(t, err.Error(), test.errPattern, "Failed at idx %v", idx) - } - assert.Equal(t, test.isEOF, n.IsEOF()) - if test.latestLine == "" { - assert.Nil(t, n.LatestLine(), "Failed at idx %v", idx) - } else { - assert.Equal(t, []byte(test.latestLine), n.LatestLine(), "Failed at idx %v", idx) + t.Run(fmt.Sprintf("%v", idx), func(t *testing.T) { + var out map[string]interface{} + err := n.Decode(&out) + assert.Equal(t, test.out, out, "Failed at idx %v", idx) + if test.errPattern == "" { + assert.Nil(t, err) + } else { + require.NotNil(t, err, "Failed at idx %v", idx) + assert.Contains(t, err.Error(), test.errPattern, "Failed at idx %v", idx) + } + assert.Equal(t, test.isEOF, n.IsEOF()) + if test.latestLine == "" { + assert.Nil(t, n.LatestLine(), "Failed at idx %v", idx) + } else { + assert.Equal(t, []byte(test.latestLine), n.LatestLine(), "Failed at idx %v", idx) + } + }) + } +} + +func TestNDStreamReaderReadAhead(t *testing.T) { + lines := []string{ + `{"key":"value1"}`, + `{"a": "b"}`, + } + buf := bytes.NewBufferString(strings.Join(lines, "\n")) + n := NewNDJSONStreamDecoder(buf, 100) + + // Decode reads the next line if it hasn't been buffered already + var out map[string]interface{} + require.NoError(t, n.Decode(&out)) + assert.Equal(t, map[string]interface{}{"key": "value1"}, out) + // ReadAhead buffers the next line, to be consumed by the next call to `Decode` + var readAheadOut, decodeOut map[string]interface{} + b, errAhead := n.ReadAhead() + require.NoError(t, json.Unmarshal(b, &readAheadOut)) + assert.Equal(t, map[string]interface{}{"a": "b"}, readAheadOut) + errDecode := n.Decode(&decodeOut) + assert.Equal(t, readAheadOut, decodeOut) + // ReadAhead and Decode return an error for EOF + for _, err := range []error{errAhead, errDecode} { + if assert.Error(t, err) { + assert.Equal(t, io.EOF, err) } } } diff --git a/docs/data/elasticsearch/generated/errors.json b/docs/data/elasticsearch/generated/errors.json index 26da94671cf..f1d108baaed 100644 --- a/docs/data/elasticsearch/generated/errors.json +++ b/docs/data/elasticsearch/generated/errors.json @@ -310,7 +310,7 @@ "Mozilla Chrome Edge" ] }, - "method": "post", + "method": "POST", "referrer": "http://localhost:8000/test/e2e/", "socket": { "encrypted": true, diff --git a/docs/data/elasticsearch/generated/spans.json b/docs/data/elasticsearch/generated/spans.json index 69ea7b3d0e6..6c888737a92 100644 --- a/docs/data/elasticsearch/generated/spans.json +++ b/docs/data/elasticsearch/generated/spans.json @@ -47,7 +47,7 @@ "us": 3781 }, "http": { - "method": "get", + "method": "GET", "response": { "status_code": 200 }, diff --git a/docs/data/elasticsearch/generated/transactions.json b/docs/data/elasticsearch/generated/transactions.json index 755282d5319..8d3046c5aea 100644 --- a/docs/data/elasticsearch/generated/transactions.json +++ b/docs/data/elasticsearch/generated/transactions.json @@ -361,7 +361,7 @@ "Mozilla Chrome Edge" ] }, - "method": "post", + "method": "POST", "referrer": "http://localhost:8000/test/e2e/", "socket": { "encrypted": true, diff --git a/model/modeldecoder/modeldecodertest/stream_decoder.go b/model/modeldecoder/modeldecodertest/stream_decoder.go deleted file mode 100644 index 55468450030..00000000000 --- a/model/modeldecoder/modeldecodertest/stream_decoder.go +++ /dev/null @@ -1,94 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package modeldecodertest - -import ( - "bufio" - "bytes" - "io" - - jsoniter "github.com/json-iterator/go" - - "github.com/elastic/apm-server/decoder" -) - -var json = jsoniter.ConfigFastest - -// TODO(simitt): -// copied decoder from decoder package -// remove this file again when the ReadAhaed functionality is introduced to -// the decoder.StreamDecoder - -func newNDJSONStreamDecoder(r io.Reader, maxLineLength int) *ndjsonStreamDecoder { - var dec ndjsonStreamDecoder - dec.bufioReader = bufio.NewReaderSize(r, maxLineLength) - dec.lineReader = decoder.NewLineReader(dec.bufioReader, maxLineLength) - dec.resetDecoder() - return &dec -} - -type ndjsonStreamDecoder struct { - bufioReader *bufio.Reader - lineReader *decoder.LineReader - - isEOF bool - latestError error - latestLine []byte - latestLineReader bytes.Reader - decoder *jsoniter.Decoder -} - -func (dec *ndjsonStreamDecoder) resetDecoder() { - dec.decoder = json.NewDecoder(&dec.latestLineReader) - dec.decoder.UseNumber() -} - -// Decode decodes the next line into v. -func (dec *ndjsonStreamDecoder) decode(v interface{}) error { - defer dec.resetLatestLineReader() - if dec.latestLineReader.Size() == 0 { - dec.readAhead() - } - if len(dec.latestLine) == 0 || (dec.latestError != nil && !dec.isEOF) { - return dec.latestError - } - if err := dec.decoder.Decode(v); err != nil { - dec.resetDecoder() // clear out decoding state - return jsonDecodeError("data read error: " + err.Error()) - } - return dec.latestError // this might be io.EOF -} - -func (dec *ndjsonStreamDecoder) readAhead() ([]byte, error) { - // readLine can return valid data in `buf` _and_ also an io.EOF - line, readErr := dec.lineReader.ReadLine() - dec.latestLine = line - dec.latestLineReader.Reset(dec.latestLine) - dec.latestError = readErr - dec.isEOF = readErr == io.EOF - return line, readErr -} - -func (dec *ndjsonStreamDecoder) resetLatestLineReader() { - dec.latestLineReader.Reset(nil) - dec.latestError = nil -} - -type jsonDecodeError string - -func (s jsonDecodeError) Error() string { return string(s) } diff --git a/model/modeldecoder/modeldecodertest/testdata.go b/model/modeldecoder/modeldecodertest/testdata.go index 6079ee7854d..a50ee330d50 100644 --- a/model/modeldecoder/modeldecodertest/testdata.go +++ b/model/modeldecoder/modeldecodertest/testdata.go @@ -19,6 +19,7 @@ package modeldecodertest import ( "bytes" + "encoding/json" "errors" "io" "testing" @@ -32,7 +33,7 @@ import ( // it skips events with a different type than the given eventType // and decodes the first matching event type func DecodeData(t *testing.T, r io.Reader, eventType string, out interface{}) { - dec := newNDJSONStreamDecoder(r, 300*1024) + dec := decoder.NewNDJSONStreamDecoder(r, 300*1024) var et string var err error for et != eventType { @@ -40,7 +41,7 @@ func DecodeData(t *testing.T, r io.Reader, eventType string, out interface{}) { require.NoError(t, err) } // decode data - require.NoError(t, dec.decode(&out)) + require.NoError(t, dec.Decode(&out)) } // DecodeDataWithReplacement decodes input from the io.Reader and replaces data for the @@ -64,11 +65,11 @@ func DecodeDataWithReplacement(t *testing.T, r io.Reader, eventType string, newD // unmarshal data into struct b, err := json.Marshal(data[eventType]) require.NoError(t, err) - require.NoError(t, decoder.NewJSONIteratorDecoder(bytes.NewReader(b)).Decode(out)) + require.NoError(t, decoder.NewJSONDecoder(bytes.NewReader(b)).Decode(out)) } -func readEventType(d *ndjsonStreamDecoder) (string, error) { - body, err := d.readAhead() +func readEventType(d *decoder.NDJSONStreamDecoder) (string, error) { + body, err := d.ReadAhead() if err != nil && err != io.EOF { return "", err } diff --git a/model/modeldecoder/nullable/nullable.go b/model/modeldecoder/nullable/nullable.go index 8d53a3a6986..5d5947ff644 100644 --- a/model/modeldecoder/nullable/nullable.go +++ b/model/modeldecoder/nullable/nullable.go @@ -68,7 +68,8 @@ func init() { case jsoniter.NilValue: iter.ReadNil() default: - (*((*Interface)(ptr))).Val = iter.Read() + v := iter.Read() + (*((*Interface)(ptr))).Val = v (*((*Interface)(ptr))).isSet = true } }) diff --git a/model/modeldecoder/rumv3/decoder.go b/model/modeldecoder/rumv3/decoder.go index ad6481e475b..5c413801a8b 100644 --- a/model/modeldecoder/rumv3/decoder.go +++ b/model/modeldecoder/rumv3/decoder.go @@ -271,11 +271,13 @@ func mapToTransactionModel(t *transaction, metadata *model.Metadata, reqTime tim sampled = t.Sampled.Val } out.Sampled = &sampled - - // TODO(simitt): set accordingly, once this is fixed: - // https://github.com/elastic/apm-server/issues/4188 - // if t.SampleRate.IsSet() {} - + if t.SampleRate.IsSet() { + if t.SampleRate.Val > 0 { + out.RepresentativeCount = 1 / t.SampleRate.Val + } + } else { + out.RepresentativeCount = 1 + } if t.SpanCount.Dropped.IsSet() { dropped := t.SpanCount.Dropped.Val out.SpanCount.Dropped = &dropped diff --git a/model/modeldecoder/rumv3/metadata_test.go b/model/modeldecoder/rumv3/metadata_test.go index 7aa0d3c70e7..4a186c8ec59 100644 --- a/model/modeldecoder/rumv3/metadata_test.go +++ b/model/modeldecoder/rumv3/metadata_test.go @@ -34,7 +34,7 @@ import ( func TestMetadataResetModelOnRelease(t *testing.T) { inp := `{"m":{"se":{"n":"service-a"}}}` m := fetchMetadataRoot() - require.NoError(t, decoder.NewJSONIteratorDecoder(strings.NewReader(inp)).Decode(m)) + require.NoError(t, decoder.NewJSONDecoder(strings.NewReader(inp)).Decode(m)) require.True(t, m.IsSet()) releaseMetadataRoot(m) assert.False(t, m.IsSet()) @@ -44,13 +44,13 @@ func TestDecodeNestedMetadata(t *testing.T) { t.Run("decode", func(t *testing.T) { var out model.Metadata testMinValidMetadata := `{"m":{"se":{"n":"name","a":{"n":"go","ve":"1.0.0"}}}}` - dec := decoder.NewJSONIteratorDecoder(strings.NewReader(testMinValidMetadata)) + dec := decoder.NewJSONDecoder(strings.NewReader(testMinValidMetadata)) require.NoError(t, DecodeNestedMetadata(dec, &out)) assert.Equal(t, model.Metadata{Service: model.Service{ Name: "name", Agent: model.Agent{Name: "go", Version: "1.0.0"}}}, out) - err := DecodeNestedMetadata(decoder.NewJSONIteratorDecoder(strings.NewReader(`malformed`)), &out) + err := DecodeNestedMetadata(decoder.NewJSONDecoder(strings.NewReader(`malformed`)), &out) require.Error(t, err) assert.Contains(t, err.Error(), "decode") }) @@ -58,7 +58,7 @@ func TestDecodeNestedMetadata(t *testing.T) { t.Run("validate", func(t *testing.T) { inp := `{}` var out model.Metadata - err := DecodeNestedMetadata(decoder.NewJSONIteratorDecoder(strings.NewReader(inp)), &out) + err := DecodeNestedMetadata(decoder.NewJSONDecoder(strings.NewReader(inp)), &out) require.Error(t, err) assert.Contains(t, err.Error(), "validation") }) diff --git a/model/modeldecoder/rumv3/transaction_test.go b/model/modeldecoder/rumv3/transaction_test.go index 54df33e01b3..3fbfeba6c7a 100644 --- a/model/modeldecoder/rumv3/transaction_test.go +++ b/model/modeldecoder/rumv3/transaction_test.go @@ -36,7 +36,7 @@ import ( func TestResetTransactionOnRelease(t *testing.T) { inp := `{"x":{"n":"tr-a"}}` tr := fetchTransactionRoot() - require.NoError(t, decoder.NewJSONIteratorDecoder(strings.NewReader(inp)).Decode(tr)) + require.NoError(t, decoder.NewJSONDecoder(strings.NewReader(inp)).Decode(tr)) require.True(t, tr.IsSet()) releaseTransactionRoot(tr) assert.False(t, tr.IsSet()) @@ -47,7 +47,7 @@ func TestDecodeNestedTransaction(t *testing.T) { now := time.Now() input := modeldecoder.Input{Metadata: model.Metadata{}, RequestTime: now, Config: modeldecoder.Config{Experimental: true}} str := `{"x":{"d":100,"id":"100","tid":"1","t":"request","yc":{"sd":2},"exper":"test"}}` - dec := decoder.NewJSONIteratorDecoder(strings.NewReader(str)) + dec := decoder.NewJSONDecoder(strings.NewReader(str)) var out model.Transaction require.NoError(t, DecodeNestedTransaction(dec, &input, &out)) assert.Equal(t, "request", out.Type) @@ -57,19 +57,19 @@ func TestDecodeNestedTransaction(t *testing.T) { // experimental should only be set if allowed by configuration input = modeldecoder.Input{Metadata: model.Metadata{}, RequestTime: now, Config: modeldecoder.Config{Experimental: false}} - dec = decoder.NewJSONIteratorDecoder(strings.NewReader(str)) + dec = decoder.NewJSONDecoder(strings.NewReader(str)) out = model.Transaction{} require.NoError(t, DecodeNestedTransaction(dec, &input, &out)) assert.Nil(t, out.Experimental) - err := DecodeNestedTransaction(decoder.NewJSONIteratorDecoder(strings.NewReader(`malformed`)), &input, &out) + err := DecodeNestedTransaction(decoder.NewJSONDecoder(strings.NewReader(`malformed`)), &input, &out) require.Error(t, err) assert.Contains(t, err.Error(), "decode") }) t.Run("validate", func(t *testing.T) { var out model.Transaction - err := DecodeNestedTransaction(decoder.NewJSONIteratorDecoder(strings.NewReader(`{}`)), &modeldecoder.Input{}, &out) + err := DecodeNestedTransaction(decoder.NewJSONDecoder(strings.NewReader(`{}`)), &modeldecoder.Input{}, &out) require.Error(t, err) assert.Contains(t, err.Error(), "validation") }) diff --git a/model/modeldecoder/v2/decoder.go b/model/modeldecoder/v2/decoder.go index 4383d7c14b2..b865bc16262 100644 --- a/model/modeldecoder/v2/decoder.go +++ b/model/modeldecoder/v2/decoder.go @@ -27,6 +27,8 @@ import ( "sync" "time" + "github.com/pkg/errors" + "github.com/elastic/apm-server/decoder" "github.com/elastic/apm-server/model" "github.com/elastic/apm-server/model/modeldecoder" @@ -34,6 +36,32 @@ import ( "github.com/elastic/apm-server/utility" ) +// DecodeError represents an error due to JSON decoding. +type DecodeError struct { + err error +} + +func (e DecodeError) Error() string { + return errors.Wrap(e.err, "decode error").Error() +} + +func (e *DecodeError) Unwrap() error { + return e.err +} + +// ValidationError represents an error due to JSON validation. +type ValidationError struct { + err error +} + +func (e ValidationError) Error() string { + return errors.Wrap(e.err, "validation error").Error() +} + +func (e *ValidationError) Unwrap() error { + return e.err +} + var ( errorRootPool = sync.Pool{ New: func() interface{} { @@ -122,12 +150,12 @@ func DecodeNestedError(d decoder.Decoder, input *modeldecoder.Input, out *model. defer releaseErrorRoot(root) var err error if err = d.Decode(&root); err != nil && err != io.EOF { - return fmt.Errorf("decode error %w", err) + return DecodeError{err} } if err := root.validate(); err != nil { - return fmt.Errorf("validation error %w", err) + return ValidationError{err} } - mapToErrorModel(&root.Error, &input.Metadata, input.RequestTime, input.Config.Experimental, out) + mapToErrorModel(&root.Error, &input.Metadata, input.RequestTime, input.Config, out) return err } @@ -141,12 +169,12 @@ func DecodeNestedSpan(d decoder.Decoder, input *modeldecoder.Input, out *model.S defer releaseSpanRoot(root) var err error if err = d.Decode(&root); err != nil && err != io.EOF { - return fmt.Errorf("decode error %w", err) + return DecodeError{err} } if err := root.validate(); err != nil { - return fmt.Errorf("validation error %w", err) + return ValidationError{err} } - mapToSpanModel(&root.Span, &input.Metadata, input.RequestTime, input.Config.Experimental, out) + mapToSpanModel(&root.Span, &input.Metadata, input.RequestTime, input.Config, out) return err } @@ -160,12 +188,12 @@ func DecodeNestedTransaction(d decoder.Decoder, input *modeldecoder.Input, out * defer releaseTransactionRoot(root) var err error if err = d.Decode(&root); err != nil && err != io.EOF { - return fmt.Errorf("decode error %w", err) + return DecodeError{err} } if err := root.validate(); err != nil { - return fmt.Errorf("validation error %w", err) + return ValidationError{err} } - mapToTransactionModel(&root.Transaction, &input.Metadata, input.RequestTime, input.Config.Experimental, out) + mapToTransactionModel(&root.Transaction, &input.Metadata, input.RequestTime, input.Config, out) return err } @@ -174,10 +202,10 @@ func decodeMetadata(decFn func(d decoder.Decoder, m *metadataRoot) error, d deco defer releaseMetadataRoot(m) var err error if err = decFn(d, m); err != nil && err != io.EOF { - return fmt.Errorf("decode error %w", err) + return DecodeError{err} } if err := m.validate(); err != nil { - return fmt.Errorf("validation error %w", err) + return ValidationError{err} } mapToMetadataModel(&m.Metadata, out) return err @@ -207,7 +235,7 @@ func mapToClientModel(from contextRequest, out *model.Metadata) { } } -func mapToErrorModel(from *errorEvent, metadata *model.Metadata, reqTime time.Time, experimental bool, out *model.Error) { +func mapToErrorModel(from *errorEvent, metadata *model.Metadata, reqTime time.Time, config modeldecoder.Config, out *model.Error) { // set metadata information out.Metadata = *metadata if from == nil { @@ -222,6 +250,9 @@ func mapToErrorModel(from *errorEvent, metadata *model.Metadata, reqTime time.Ti // map errorEvent specific data if from.Context.IsSet() { + if config.Experimental && from.Context.Experimental.IsSet() { + out.Experimental = from.Context.Experimental.Val + } // metadata labels and context labels are merged only in the output model if len(from.Context.Tags) > 0 { labels := model.Labels(from.Context.Tags.Clone()) @@ -312,10 +343,6 @@ func mapToErrorModel(from *errorEvent, metadata *model.Metadata, reqTime time.Ti if from.TransactionID.IsSet() { out.TransactionID = from.TransactionID.Val } - if experimental { - out.Experimental = from.Experimental.Val - } - out.RUM = false } func mapToExceptionModel(from errorException, out *model.Exception) { @@ -397,7 +424,8 @@ func mapToMetadataModel(from *metadata, out *model.Metadata) { // Process if len(from.Process.Argv) > 0 { - out.Process.Argv = from.Process.Argv + out.Process.Argv = make([]string, len(from.Process.Argv)) + copy(out.Process.Argv, from.Process.Argv) } if from.Process.Pid.IsSet() { out.Process.Pid = from.Process.Pid.Val @@ -644,7 +672,7 @@ func mapToServiceModel(from contextService, out *model.Service) { } } -func mapToSpanModel(from *span, metadata *model.Metadata, reqTime time.Time, experimental bool, out *model.Span) { +func mapToSpanModel(from *span, metadata *model.Metadata, reqTime time.Time, config modeldecoder.Config, out *model.Span) { // set metadata information for span out.Metadata = *metadata if from == nil { @@ -676,7 +704,8 @@ func mapToSpanModel(from *span, metadata *model.Metadata, reqTime time.Time, exp } } if len(from.ChildIDs) > 0 { - out.ChildIDs = from.ChildIDs + out.ChildIDs = make([]string, len(from.ChildIDs)) + copy(out.ChildIDs, from.ChildIDs) } if from.Context.Database.IsSet() { db := model.DB{} @@ -734,6 +763,9 @@ func mapToSpanModel(from *span, metadata *model.Metadata, reqTime time.Time, exp } out.DestinationService = &service } + if config.Experimental && from.Context.Experimental.IsSet() { + out.Experimental = from.Context.Experimental.Val + } if from.Context.HTTP.IsSet() { http := model.HTTP{} if from.Context.HTTP.Method.IsSet() { @@ -802,9 +834,6 @@ func mapToSpanModel(from *span, metadata *model.Metadata, reqTime time.Time, exp if from.Duration.IsSet() { out.Duration = from.Duration.Val } - if experimental { - out.Experimental = from.Experimental.Val - } if from.ID.IsSet() { out.ID = from.ID.Val } @@ -859,7 +888,6 @@ func mapToSpanModel(from *span, metadata *model.Metadata, reqTime time.Time, exp if from.TransactionID.IsSet() { out.TransactionID = from.TransactionID.Val } - out.RUM = false } func mapToStracktraceModel(from []stacktraceFrame, out model.Stacktrace) { @@ -914,7 +942,7 @@ func mapToStracktraceModel(from []stacktraceFrame, out model.Stacktrace) { } } -func mapToTransactionModel(from *transaction, metadata *model.Metadata, reqTime time.Time, experimental bool, out *model.Transaction) { +func mapToTransactionModel(from *transaction, metadata *model.Metadata, reqTime time.Time, config modeldecoder.Config, out *model.Transaction) { // set metadata information out.Metadata = *metadata if from == nil { @@ -933,6 +961,9 @@ func mapToTransactionModel(from *transaction, metadata *model.Metadata, reqTime custom := model.Custom(from.Context.Custom.Clone()) out.Custom = &custom } + if config.Experimental && from.Context.Experimental.IsSet() { + out.Experimental = from.Context.Experimental.Val + } // metadata labels and context labels are merged only in the output model if len(from.Context.Tags) > 0 { labels := model.Labels(from.Context.Tags.Clone()) @@ -1017,17 +1048,18 @@ func mapToTransactionModel(from *transaction, metadata *model.Metadata, reqTime if from.Result.IsSet() { out.Result = from.Result.Val } - sampled := true if from.Sampled.IsSet() { sampled = from.Sampled.Val } out.Sampled = &sampled - - // TODO(simitt): set accordingly, once this is fixed: - // https://github.com/elastic/apm-server/issues/4188 - // if t.SampleRate.IsSet() {} - + if from.SampleRate.IsSet() { + if from.SampleRate.Val > 0 { + out.RepresentativeCount = 1 / from.SampleRate.Val + } + } else { + out.RepresentativeCount = 1 + } if from.SpanCount.Dropped.IsSet() { dropped := from.SpanCount.Dropped.Val out.SpanCount.Dropped = &dropped @@ -1072,9 +1104,6 @@ func mapToTransactionModel(from *transaction, metadata *model.Metadata, reqTime } } } - if experimental { - out.Experimental = from.Experimental.Val - } } func mapToUserAgentModel(from nullable.HTTPHeader, out *model.Metadata) { diff --git a/model/modeldecoder/v2/error_test.go b/model/modeldecoder/v2/error_test.go index 57dfae86fe0..12e1c26854d 100644 --- a/model/modeldecoder/v2/error_test.go +++ b/model/modeldecoder/v2/error_test.go @@ -18,7 +18,6 @@ package v2 import ( - "fmt" "net" "net/http" "strings" @@ -38,7 +37,7 @@ import ( func TestResetErrorOnRelease(t *testing.T) { inp := `{"error":{"id":"tr-a"}}` root := fetchErrorRoot() - require.NoError(t, decoder.NewJSONIteratorDecoder(strings.NewReader(inp)).Decode(root)) + require.NoError(t, decoder.NewJSONDecoder(strings.NewReader(inp)).Decode(root)) require.True(t, root.IsSet()) releaseErrorRoot(root) assert.False(t, root.IsSet()) @@ -48,16 +47,16 @@ func TestDecodeNestedError(t *testing.T) { t.Run("decode", func(t *testing.T) { now := time.Now() input := modeldecoder.Input{Metadata: model.Metadata{}, RequestTime: now, Config: modeldecoder.Config{Experimental: true}} - str := `{"error":{"id":"a-b-c","timestamp":1599996822281000,"log":{"message":"abc"},"experimental":"exp"}}` - dec := decoder.NewJSONIteratorDecoder(strings.NewReader(str)) + str := `{"error":{"id":"a-b-c","timestamp":1599996822281000,"log":{"message":"abc"},"context":{"experimental":"exp"}}}` + dec := decoder.NewJSONDecoder(strings.NewReader(str)) var out model.Error require.NoError(t, DecodeNestedError(dec, &input, &out)) assert.Equal(t, "exp", out.Experimental) assert.Equal(t, "2020-09-13 11:33:42.281 +0000 UTC", out.Timestamp.String()) input = modeldecoder.Input{Metadata: model.Metadata{}, RequestTime: now, Config: modeldecoder.Config{Experimental: false}} - str = `{"error":{"id":"a-b-c","log":{"message":"abc"},"experimental":"exp"}}` - dec = decoder.NewJSONIteratorDecoder(strings.NewReader(str)) + str = `{"error":{"id":"a-b-c","log":{"message":"abc"},"context":{"experimental":"exp"}}}` + dec = decoder.NewJSONDecoder(strings.NewReader(str)) out = model.Error{} require.NoError(t, DecodeNestedError(dec, &input, &out)) // experimental should only be set if allowed by configuration @@ -65,14 +64,14 @@ func TestDecodeNestedError(t *testing.T) { // if no timestamp is provided, fall back to request time assert.Equal(t, now, out.Timestamp) - err := DecodeNestedError(decoder.NewJSONIteratorDecoder(strings.NewReader(`malformed`)), &input, &out) + err := DecodeNestedError(decoder.NewJSONDecoder(strings.NewReader(`malformed`)), &input, &out) require.Error(t, err) assert.Contains(t, err.Error(), "decode") }) t.Run("validate", func(t *testing.T) { var out model.Error - err := DecodeNestedError(decoder.NewJSONIteratorDecoder(strings.NewReader(`{}`)), &modeldecoder.Input{}, &out) + err := DecodeNestedError(decoder.NewJSONDecoder(strings.NewReader(`{}`)), &modeldecoder.Input{}, &out) require.Error(t, err) assert.Contains(t, err.Error(), "validation") }) @@ -100,7 +99,7 @@ func TestDecodeMapToErrorModel(t *testing.T) { // do not overwrite metadata with zero transaction values var input errorEvent var out model.Error - mapToErrorModel(&input, initializedMeta(), time.Now(), true, &out) + mapToErrorModel(&input, initializedMeta(), time.Now(), modeldecoder.Config{}, &out) // iterate through metadata model and assert values are set modeldecodertest.AssertStructValues(t, &out.Metadata, exceptions, "meta", 1, false, localhostIP, time.Now()) }) @@ -113,7 +112,7 @@ func TestDecodeMapToErrorModel(t *testing.T) { input.Context.Request.Headers.Val.Add("user-agent", "first") input.Context.Request.Headers.Val.Add("user-agent", "second") input.Context.Request.Headers.Val.Add("x-real-ip", gatewayIP.String()) - mapToErrorModel(&input, initializedMeta(), time.Now(), true, &out) + mapToErrorModel(&input, initializedMeta(), time.Now(), modeldecoder.Config{}, &out) // user-agent should be set to context request header values assert.Equal(t, "first, second", out.Metadata.UserAgent.Original) @@ -134,7 +133,7 @@ func TestDecodeMapToErrorModel(t *testing.T) { input.Context.Request.Headers.Set(http.Header{}) input.Context.Request.Headers.Val.Add("x-real-ip", gatewayIP.String()) input.Context.Request.Socket.RemoteAddress.Set(randomIP.String()) - mapToErrorModel(&input, &model.Metadata{}, time.Now(), false, &out) + mapToErrorModel(&input, &model.Metadata{}, time.Now(), modeldecoder.Config{}, &out) assert.Equal(t, gatewayIP, out.Metadata.Client.IP, out.Metadata.Client.IP.String()) }) @@ -142,7 +141,7 @@ func TestDecodeMapToErrorModel(t *testing.T) { var input errorEvent var out model.Error input.Context.Request.Socket.RemoteAddress.Set(randomIP.String()) - mapToErrorModel(&input, &model.Metadata{}, time.Now(), false, &out) + mapToErrorModel(&input, &model.Metadata{}, time.Now(), modeldecoder.Config{}, &out) assert.Equal(t, randomIP, out.Metadata.Client.IP, out.Metadata.Client.IP.String()) }) @@ -167,12 +166,8 @@ func TestDecodeMapToErrorModel(t *testing.T) { var out model.Error eventTime, reqTime := time.Now(), time.Now().Add(time.Second) modeldecodertest.SetStructValues(&input, "overwritten", 5000, true, eventTime) - mapToErrorModel(&input, initializedMeta(), reqTime, true, &out) - fmt.Println(len(out.Log.Stacktrace)) - fmt.Println(out.Log.Stacktrace[0].Vars) + mapToErrorModel(&input, initializedMeta(), reqTime, modeldecoder.Config{Experimental: true}, &out) input.Reset() - fmt.Println(len(out.Log.Stacktrace)) - fmt.Println(out.Log.Stacktrace[0].Vars) modeldecodertest.AssertStructValues(t, &out, exceptions, "overwritten", 5000, true, localhostIP, eventTime) assert.False(t, out.RUM) }) @@ -181,10 +176,9 @@ func TestDecodeMapToErrorModel(t *testing.T) { var input errorEvent input.Context.Page.URL.Set("https://my.site.test:9201") var out model.Error - mapToErrorModel(&input, initializedMeta(), time.Now(), false, &out) + mapToErrorModel(&input, initializedMeta(), time.Now(), modeldecoder.Config{}, &out) assert.Equal(t, "https://my.site.test:9201", *out.Page.URL.Full) assert.Equal(t, 9201, *out.Page.URL.Port) assert.Equal(t, "https", *out.Page.URL.Scheme) }) - } diff --git a/model/modeldecoder/v2/metadata_test.go b/model/modeldecoder/v2/metadata_test.go index e62f14aa1bc..999db60e2e4 100644 --- a/model/modeldecoder/v2/metadata_test.go +++ b/model/modeldecoder/v2/metadata_test.go @@ -34,7 +34,7 @@ import ( func TestResetMetadataOnRelease(t *testing.T) { inp := `{"metadata":{"service":{"name":"service-a"}}}` m := fetchMetadataRoot() - require.NoError(t, decoder.NewJSONIteratorDecoder(strings.NewReader(inp)).Decode(m)) + require.NoError(t, decoder.NewJSONDecoder(strings.NewReader(inp)).Decode(m)) require.True(t, m.IsSet()) releaseMetadataRoot(m) assert.False(t, m.IsSet()) @@ -53,13 +53,13 @@ func TestDecodeMetadata(t *testing.T) { } { t.Run("decode", func(t *testing.T) { var out model.Metadata - dec := decoder.NewJSONIteratorDecoder(strings.NewReader(tc.input)) + dec := decoder.NewJSONDecoder(strings.NewReader(tc.input)) require.NoError(t, tc.decodeFn(dec, &out)) assert.Equal(t, model.Metadata{Service: model.Service{ Name: "user-service", Agent: model.Agent{Name: "go", Version: "1.0.0"}}}, out) - err := tc.decodeFn(decoder.NewJSONIteratorDecoder(strings.NewReader(`malformed`)), &out) + err := tc.decodeFn(decoder.NewJSONDecoder(strings.NewReader(`malformed`)), &out) require.Error(t, err) assert.Contains(t, err.Error(), "decode") }) @@ -67,7 +67,7 @@ func TestDecodeMetadata(t *testing.T) { t.Run("validate", func(t *testing.T) { inp := `{}` var out model.Metadata - err := tc.decodeFn(decoder.NewJSONIteratorDecoder(strings.NewReader(inp)), &out) + err := tc.decodeFn(decoder.NewJSONDecoder(strings.NewReader(inp)), &out) require.Error(t, err) assert.Contains(t, err.Error(), "validation") }) diff --git a/model/modeldecoder/v2/model.go b/model/modeldecoder/v2/model.go index 6ad0bd7d09a..b43952b2389 100644 --- a/model/modeldecoder/v2/model.go +++ b/model/modeldecoder/v2/model.go @@ -54,14 +54,15 @@ type transactionRoot struct { // other structs type context struct { - Custom common.MapStr `json:"custom" validate:"patternKeys=regexpNoDotAsteriskQuote"` - Message contextMessage `json:"message"` - Page contextPage `json:"page"` - Response contextResponse `json:"response"` - Request contextRequest `json:"request"` - Service contextService `json:"service"` - Tags common.MapStr `json:"tags" validate:"patternKeys=regexpNoDotAsteriskQuote,typesVals=string;bool;number,maxVals=1024"` - User user `json:"user"` + Custom common.MapStr `json:"custom" validate:"patternKeys=regexpNoDotAsteriskQuote"` + Experimental nullable.Interface `json:"experimental"` + Message contextMessage `json:"message"` + Page contextPage `json:"page"` + Response contextResponse `json:"response"` + Request contextRequest `json:"request"` + Service contextService `json:"service"` + Tags common.MapStr `json:"tags" validate:"patternKeys=regexpNoDotAsteriskQuote,typesVals=string;bool;number,maxVals=1024"` + User user `json:"user"` } type contextMessage struct { @@ -92,9 +93,9 @@ type contextRequest struct { HTTPVersion nullable.String `json:"http_version" validate:"max=1024"` Method nullable.String `json:"method" validate:"required,max=1024"` Socket contextRequestSocket `json:"socket"` - //TODO(simitt): context.request.url is currently required, - // but none of its attributes is required, which could lead to - // an empty URL struct - no difference to making it optional + // context.request.url was required in json schema, + // but none of its attributes is required, which could lead to + // an empty URL struct - no difference to making it optional URL contextRequestURL `json:"url"` } @@ -164,7 +165,6 @@ type errorEvent struct { Context context `json:"context"` Culprit nullable.String `json:"culprit" validate:"max=1024"` Exception errorException `json:"exception"` - Experimental nullable.Interface `json:"experimental"` ID nullable.String `json:"id" validate:"required,max=1024"` Log errorLog `json:"log"` ParentID nullable.String `json:"parent_id" validate:"requiredIfAny=transaction_id;trace_id,max=1024"` @@ -318,7 +318,6 @@ type span struct { ChildIDs []string `json:"child_ids" validate:"max=1024"` Context spanContext `json:"context"` Duration nullable.Float64 `json:"duration" validate:"required,min=0"` - Experimental nullable.Interface `json:"experimental"` ID nullable.String `json:"id" validate:"required,max=1024"` Name nullable.String `json:"name" validate:"required,max=1024"` Outcome nullable.String `json:"outcome" validate:"enum=enumOutcome"` @@ -336,12 +335,13 @@ type span struct { } type spanContext struct { - Database spanContextDatabase `json:"db"` - Destination spanContextDestination `json:"destination"` - HTTP spanContextHTTP `json:"http"` - Message contextMessage `json:"message"` - Service contextService `json:"service"` - Tags common.MapStr `json:"tags" validate:"patternKeys=regexpNoDotAsteriskQuote,typesVals=string;bool;number,maxVals=1024"` + Database spanContextDatabase `json:"db"` + Destination spanContextDestination `json:"destination"` + Experimental nullable.Interface `json:"experimental"` + HTTP spanContextHTTP `json:"http"` + Message contextMessage `json:"message"` + Service contextService `json:"service"` + Tags common.MapStr `json:"tags" validate:"patternKeys=regexpNoDotAsteriskQuote,typesVals=string;bool;number,maxVals=1024"` } type spanContextDatabase struct { @@ -399,7 +399,6 @@ type stacktraceFrame struct { type transaction struct { Context context `json:"context"` Duration nullable.Float64 `json:"duration" validate:"required,min=0"` - Experimental nullable.Interface `json:"experimental"` ID nullable.String `json:"id" validate:"required,max=1024"` Marks transactionMarks `json:"marks"` Name nullable.String `json:"name" validate:"max=1024"` diff --git a/model/modeldecoder/v2/model_generated.go b/model/modeldecoder/v2/model_generated.go index 1ef2b24b014..1df1fbedbf9 100644 --- a/model/modeldecoder/v2/model_generated.go +++ b/model/modeldecoder/v2/model_generated.go @@ -619,13 +619,12 @@ func (val *transactionRoot) validate() error { } func (val *transaction) IsSet() bool { - return val.Context.IsSet() || val.Duration.IsSet() || val.Experimental.IsSet() || val.ID.IsSet() || val.Marks.IsSet() || val.Name.IsSet() || val.Outcome.IsSet() || val.ParentID.IsSet() || val.Result.IsSet() || val.Sampled.IsSet() || val.SampleRate.IsSet() || val.SpanCount.IsSet() || val.Timestamp.IsSet() || val.TraceID.IsSet() || val.Type.IsSet() || val.UserExperience.IsSet() + return val.Context.IsSet() || val.Duration.IsSet() || val.ID.IsSet() || val.Marks.IsSet() || val.Name.IsSet() || val.Outcome.IsSet() || val.ParentID.IsSet() || val.Result.IsSet() || val.Sampled.IsSet() || val.SampleRate.IsSet() || val.SpanCount.IsSet() || val.Timestamp.IsSet() || val.TraceID.IsSet() || val.Type.IsSet() || val.UserExperience.IsSet() } func (val *transaction) Reset() { val.Context.Reset() val.Duration.Reset() - val.Experimental.Reset() val.ID.Reset() val.Marks.Reset() val.Name.Reset() @@ -709,13 +708,14 @@ func (val *transaction) validate() error { } func (val *context) IsSet() bool { - return len(val.Custom) > 0 || val.Message.IsSet() || val.Page.IsSet() || val.Response.IsSet() || val.Request.IsSet() || val.Service.IsSet() || len(val.Tags) > 0 || val.User.IsSet() + return len(val.Custom) > 0 || val.Experimental.IsSet() || val.Message.IsSet() || val.Page.IsSet() || val.Response.IsSet() || val.Request.IsSet() || val.Service.IsSet() || len(val.Tags) > 0 || val.User.IsSet() } func (val *context) Reset() { for k := range val.Custom { delete(val.Custom, k) } + val.Experimental.Reset() val.Message.Reset() val.Page.Reset() val.Response.Reset() @@ -1297,14 +1297,13 @@ func (val *errorRoot) validate() error { } func (val *errorEvent) IsSet() bool { - return val.Context.IsSet() || val.Culprit.IsSet() || val.Exception.IsSet() || val.Experimental.IsSet() || val.ID.IsSet() || val.Log.IsSet() || val.ParentID.IsSet() || val.Timestamp.IsSet() || val.TraceID.IsSet() || val.Transaction.IsSet() || val.TransactionID.IsSet() + return val.Context.IsSet() || val.Culprit.IsSet() || val.Exception.IsSet() || val.ID.IsSet() || val.Log.IsSet() || val.ParentID.IsSet() || val.Timestamp.IsSet() || val.TraceID.IsSet() || val.Transaction.IsSet() || val.TransactionID.IsSet() } func (val *errorEvent) Reset() { val.Context.Reset() val.Culprit.Reset() val.Exception.Reset() - val.Experimental.Reset() val.ID.Reset() val.Log.Reset() val.ParentID.Reset() @@ -1541,7 +1540,7 @@ func (val *spanRoot) validate() error { } func (val *span) IsSet() bool { - return val.Action.IsSet() || len(val.ChildIDs) > 0 || val.Context.IsSet() || val.Duration.IsSet() || val.Experimental.IsSet() || val.ID.IsSet() || val.Name.IsSet() || val.Outcome.IsSet() || val.ParentID.IsSet() || val.SampleRate.IsSet() || len(val.Stacktrace) > 0 || val.Start.IsSet() || val.Subtype.IsSet() || val.Sync.IsSet() || val.Timestamp.IsSet() || val.TraceID.IsSet() || val.TransactionID.IsSet() || val.Type.IsSet() + return val.Action.IsSet() || len(val.ChildIDs) > 0 || val.Context.IsSet() || val.Duration.IsSet() || val.ID.IsSet() || val.Name.IsSet() || val.Outcome.IsSet() || val.ParentID.IsSet() || val.SampleRate.IsSet() || len(val.Stacktrace) > 0 || val.Start.IsSet() || val.Subtype.IsSet() || val.Sync.IsSet() || val.Timestamp.IsSet() || val.TraceID.IsSet() || val.TransactionID.IsSet() || val.Type.IsSet() } func (val *span) Reset() { @@ -1549,7 +1548,6 @@ func (val *span) Reset() { val.ChildIDs = val.ChildIDs[:0] val.Context.Reset() val.Duration.Reset() - val.Experimental.Reset() val.ID.Reset() val.Name.Reset() val.Outcome.Reset() @@ -1649,12 +1647,13 @@ func (val *span) validate() error { } func (val *spanContext) IsSet() bool { - return val.Database.IsSet() || val.Destination.IsSet() || val.HTTP.IsSet() || val.Message.IsSet() || val.Service.IsSet() || len(val.Tags) > 0 + return val.Database.IsSet() || val.Destination.IsSet() || val.Experimental.IsSet() || val.HTTP.IsSet() || val.Message.IsSet() || val.Service.IsSet() || len(val.Tags) > 0 } func (val *spanContext) Reset() { val.Database.Reset() val.Destination.Reset() + val.Experimental.Reset() val.HTTP.Reset() val.Message.Reset() val.Service.Reset() diff --git a/model/modeldecoder/v2/model_test.go b/model/modeldecoder/v2/model_test.go index c9273fb5d0e..992f5db32d2 100644 --- a/model/modeldecoder/v2/model_test.go +++ b/model/modeldecoder/v2/model_test.go @@ -182,7 +182,7 @@ func TestReset(t *testing.T) { return ns } decode := func(t *testing.T, inp string, out interface{}) { - require.NoError(t, decoder.NewJSONIteratorDecoder(strings.NewReader(inp)).Decode(&out)) + require.NoError(t, decoder.NewJSONDecoder(strings.NewReader(inp)).Decode(&out)) } t.Run("struct", func(t *testing.T) { var out metadataServiceNode diff --git a/model/modeldecoder/v2/span_test.go b/model/modeldecoder/v2/span_test.go index d679870c3ad..6783a831018 100644 --- a/model/modeldecoder/v2/span_test.go +++ b/model/modeldecoder/v2/span_test.go @@ -35,7 +35,7 @@ import ( func TestResetSpanOnRelease(t *testing.T) { inp := `{"span":{"name":"tr-a"}}` root := fetchSpanRoot() - require.NoError(t, decoder.NewJSONIteratorDecoder(strings.NewReader(inp)).Decode(root)) + require.NoError(t, decoder.NewJSONDecoder(strings.NewReader(inp)).Decode(root)) require.True(t, root.IsSet()) releaseSpanRoot(root) assert.False(t, root.IsSet()) @@ -45,18 +45,18 @@ func TestDecodeNestedSpan(t *testing.T) { t.Run("decode", func(t *testing.T) { input := modeldecoder.Input{Metadata: model.Metadata{}, RequestTime: time.Now(), Config: modeldecoder.Config{}} str := `{"span":{"duration":100,"id":"a-b-c","name":"s","parent_id":"parent-123","trace_id":"trace-ab","type":"db","start":143}}` - dec := decoder.NewJSONIteratorDecoder(strings.NewReader(str)) + dec := decoder.NewJSONDecoder(strings.NewReader(str)) var out model.Span require.NoError(t, DecodeNestedSpan(dec, &input, &out)) - err := DecodeNestedSpan(decoder.NewJSONIteratorDecoder(strings.NewReader(`malformed`)), &input, &out) + err := DecodeNestedSpan(decoder.NewJSONDecoder(strings.NewReader(`malformed`)), &input, &out) require.Error(t, err) assert.Contains(t, err.Error(), "decode") }) t.Run("validate", func(t *testing.T) { var out model.Span - err := DecodeNestedSpan(decoder.NewJSONIteratorDecoder(strings.NewReader(`{}`)), &modeldecoder.Input{}, &out) + err := DecodeNestedSpan(decoder.NewJSONDecoder(strings.NewReader(`{}`)), &modeldecoder.Input{}, &out) require.Error(t, err) assert.Contains(t, err.Error(), "validation") }) @@ -83,7 +83,7 @@ func TestDecodeMapToSpanModel(t *testing.T) { var input span var out model.Span modeldecodertest.SetStructValues(&input, "overwritten", 5000, true, time.Now().Add(time.Hour)) - mapToSpanModel(&input, initializedMeta(), time.Now(), true, &out) + mapToSpanModel(&input, initializedMeta(), time.Now(), modeldecoder.Config{}, &out) // iterate through metadata model and assert values are set modeldecodertest.AssertStructValues(t, &out.Metadata, exceptions, "meta", 1, false, ip, time.Now()) }) @@ -91,16 +91,16 @@ func TestDecodeMapToSpanModel(t *testing.T) { t.Run("experimental", func(t *testing.T) { // experimental enabled input := modeldecoder.Input{Metadata: model.Metadata{}, RequestTime: time.Now(), Config: modeldecoder.Config{Experimental: true}} - str := `{"span":{"experimental":"exp","duration":100,"id":"a-b-c","name":"s","parent_id":"parent-123","trace_id":"trace-ab","type":"db","start":143}}` - dec := decoder.NewJSONIteratorDecoder(strings.NewReader(str)) + str := `{"span":{"context":{"experimental":"exp"},"duration":100,"id":"a-b-c","name":"s","parent_id":"parent-123","trace_id":"trace-ab","type":"db","start":143}}` + dec := decoder.NewJSONDecoder(strings.NewReader(str)) var out model.Span require.NoError(t, DecodeNestedSpan(dec, &input, &out)) assert.Equal(t, "exp", out.Experimental) // experimental disabled input = modeldecoder.Input{Metadata: model.Metadata{}, RequestTime: time.Now(), Config: modeldecoder.Config{Experimental: false}} - str = `{"span":{"experimental":"exp","duration":100,"id":"a-b-c","name":"s","parent_id":"parent-123","trace_id":"trace-ab","type":"db","start":143}}` - dec = decoder.NewJSONIteratorDecoder(strings.NewReader(str)) + str = `{"span":{"context":{"experimental":"exp"},"duration":100,"id":"a-b-c","name":"s","parent_id":"parent-123","trace_id":"trace-ab","type":"db","start":143}}` + dec = decoder.NewJSONDecoder(strings.NewReader(str)) out = model.Span{} require.NoError(t, DecodeNestedSpan(dec, &input, &out)) // experimental should only be set if allowed by configuration @@ -130,7 +130,7 @@ func TestDecodeMapToSpanModel(t *testing.T) { var out model.Span eventTime, reqTime := time.Now(), time.Now().Add(time.Second) modeldecodertest.SetStructValues(&input, "overwritten", 5000, true, eventTime) - mapToSpanModel(&input, initializedMeta(), reqTime, true, &out) + mapToSpanModel(&input, initializedMeta(), reqTime, modeldecoder.Config{Experimental: true}, &out) input.Reset() modeldecodertest.AssertStructValues(t, &out, exceptions, "overwritten", 5000, true, ip, eventTime) assert.False(t, out.RUM) @@ -143,14 +143,14 @@ func TestDecodeMapToSpanModel(t *testing.T) { reqTime := time.Now().Add(time.Hour) // add start to requestTime if eventTime is zero and start is given modeldecodertest.SetStructValues(&input, "init", i, true, time.Time{}) - mapToSpanModel(&input, initializedMeta(), reqTime, true, &out) + mapToSpanModel(&input, initializedMeta(), reqTime, modeldecoder.Config{}, &out) timestamp := reqTime.Add(time.Duration((float64(i) + 0.5) * float64(time.Millisecond))) assert.Equal(t, timestamp, out.Timestamp) // set requestTime if eventTime is zero and start is not set out = model.Span{} modeldecodertest.SetStructValues(&input, "init", i, true, time.Time{}) input.Start.Reset() - mapToSpanModel(&input, initializedMeta(), reqTime, true, &out) + mapToSpanModel(&input, initializedMeta(), reqTime, modeldecoder.Config{}, &out) require.Nil(t, out.Start) assert.Equal(t, reqTime, out.Timestamp) }) @@ -161,16 +161,16 @@ func TestDecodeMapToSpanModel(t *testing.T) { modeldecodertest.SetStructValues(&input, "init", 5000, true, time.Now()) // sample rate is set to > 0 input.SampleRate.Set(0.25) - mapToSpanModel(&input, initializedMeta(), time.Now(), true, &out) + mapToSpanModel(&input, initializedMeta(), time.Now(), modeldecoder.Config{}, &out) assert.Equal(t, 4.0, out.RepresentativeCount) // sample rate is not set out.RepresentativeCount = 0.0 input.SampleRate.Reset() - mapToSpanModel(&input, initializedMeta(), time.Now(), true, &out) + mapToSpanModel(&input, initializedMeta(), time.Now(), modeldecoder.Config{}, &out) assert.Equal(t, 0.0, out.RepresentativeCount) // sample rate is set to 0 input.SampleRate.Set(0) - mapToSpanModel(&input, initializedMeta(), time.Now(), true, &out) + mapToSpanModel(&input, initializedMeta(), time.Now(), modeldecoder.Config{}, &out) assert.Equal(t, 0.0, out.RepresentativeCount) }) @@ -208,7 +208,7 @@ func TestDecodeMapToSpanModel(t *testing.T) { input.Action.Reset() } var out model.Span - mapToSpanModel(&input, initializedMeta(), time.Now(), true, &out) + mapToSpanModel(&input, initializedMeta(), time.Now(), modeldecoder.Config{}, &out) assert.Equal(t, tc.typ, out.Type) if tc.subtype == "" { assert.Nil(t, out.Subtype) @@ -225,5 +225,4 @@ func TestDecodeMapToSpanModel(t *testing.T) { }) } }) - } diff --git a/model/modeldecoder/v2/transaction_test.go b/model/modeldecoder/v2/transaction_test.go index 574a051ee72..4d9127da39c 100644 --- a/model/modeldecoder/v2/transaction_test.go +++ b/model/modeldecoder/v2/transaction_test.go @@ -37,7 +37,7 @@ import ( func TestResetTransactionOnRelease(t *testing.T) { inp := `{"transaction":{"name":"tr-a"}}` root := fetchTransactionRoot() - require.NoError(t, decoder.NewJSONIteratorDecoder(strings.NewReader(inp)).Decode(root)) + require.NoError(t, decoder.NewJSONDecoder(strings.NewReader(inp)).Decode(root)) require.True(t, root.IsSet()) releaseTransactionRoot(root) assert.False(t, root.IsSet()) @@ -47,8 +47,8 @@ func TestDecodeNestedTransaction(t *testing.T) { t.Run("decode", func(t *testing.T) { now := time.Now() input := modeldecoder.Input{Metadata: model.Metadata{}, RequestTime: now, Config: modeldecoder.Config{Experimental: true}} - str := `{"transaction":{"duration":100,"timestamp":1599996822281000,"id":"100","trace_id":"1","type":"request","span_count":{"started":2},"experimental":"exp"}}` - dec := decoder.NewJSONIteratorDecoder(strings.NewReader(str)) + str := `{"transaction":{"duration":100,"timestamp":1599996822281000,"id":"100","trace_id":"1","type":"request","span_count":{"started":2},"context":{"experimental":"exp"}}}` + dec := decoder.NewJSONDecoder(strings.NewReader(str)) var out model.Transaction require.NoError(t, DecodeNestedTransaction(dec, &input, &out)) assert.Equal(t, "request", out.Type) @@ -56,8 +56,8 @@ func TestDecodeNestedTransaction(t *testing.T) { assert.Equal(t, "2020-09-13 11:33:42.281 +0000 UTC", out.Timestamp.String()) input = modeldecoder.Input{Metadata: model.Metadata{}, RequestTime: now, Config: modeldecoder.Config{Experimental: false}} - str = `{"transaction":{"duration":100,"id":"100","trace_id":"1","type":"request","span_count":{"started":2},"experimental":"exp"}}` - dec = decoder.NewJSONIteratorDecoder(strings.NewReader(str)) + str = `{"transaction":{"duration":100,"id":"100","trace_id":"1","type":"request","span_count":{"started":2},"context":{"experimental":"exp"}}}` + dec = decoder.NewJSONDecoder(strings.NewReader(str)) out = model.Transaction{} require.NoError(t, DecodeNestedTransaction(dec, &input, &out)) // experimental should only be set if allowed by configuration @@ -65,14 +65,14 @@ func TestDecodeNestedTransaction(t *testing.T) { // if no timestamp is provided, fall back to request time assert.Equal(t, now, out.Timestamp) - err := DecodeNestedTransaction(decoder.NewJSONIteratorDecoder(strings.NewReader(`malformed`)), &input, &out) + err := DecodeNestedTransaction(decoder.NewJSONDecoder(strings.NewReader(`malformed`)), &input, &out) require.Error(t, err) assert.Contains(t, err.Error(), "decode") }) t.Run("validate", func(t *testing.T) { var out model.Transaction - err := DecodeNestedTransaction(decoder.NewJSONIteratorDecoder(strings.NewReader(`{}`)), &modeldecoder.Input{}, &out) + err := DecodeNestedTransaction(decoder.NewJSONDecoder(strings.NewReader(`{}`)), &modeldecoder.Input{}, &out) require.Error(t, err) assert.Contains(t, err.Error(), "validation") }) @@ -102,7 +102,7 @@ func TestDecodeMapToTransactionModel(t *testing.T) { // do not overwrite metadata with zero transaction values var input transaction var out model.Transaction - mapToTransactionModel(&input, initializedMeta(), time.Now(), true, &out) + mapToTransactionModel(&input, initializedMeta(), time.Now(), modeldecoder.Config{Experimental: true}, &out) // iterate through metadata model and assert values are set modeldecodertest.AssertStructValues(t, &out.Metadata, exceptions, "meta", 1, false, localhostIP, time.Now()) }) @@ -115,7 +115,7 @@ func TestDecodeMapToTransactionModel(t *testing.T) { input.Context.Request.Headers.Val.Add("user-agent", "first") input.Context.Request.Headers.Val.Add("user-agent", "second") input.Context.Request.Headers.Val.Add("x-real-ip", gatewayIP.String()) - mapToTransactionModel(&input, initializedMeta(), time.Now(), true, &out) + mapToTransactionModel(&input, initializedMeta(), time.Now(), modeldecoder.Config{Experimental: true}, &out) // user-agent should be set to context request header values assert.Equal(t, "first, second", out.Metadata.UserAgent.Original) @@ -136,7 +136,7 @@ func TestDecodeMapToTransactionModel(t *testing.T) { input.Context.Request.Headers.Set(http.Header{}) input.Context.Request.Headers.Val.Add("x-real-ip", gatewayIP.String()) input.Context.Request.Socket.RemoteAddress.Set(randomIP.String()) - mapToTransactionModel(&input, &model.Metadata{}, time.Now(), false, &out) + mapToTransactionModel(&input, &model.Metadata{}, time.Now(), modeldecoder.Config{Experimental: false}, &out) assert.Equal(t, gatewayIP, out.Metadata.Client.IP, out.Metadata.Client.IP.String()) }) @@ -144,7 +144,7 @@ func TestDecodeMapToTransactionModel(t *testing.T) { var input transaction var out model.Transaction input.Context.Request.Socket.RemoteAddress.Set(randomIP.String()) - mapToTransactionModel(&input, &model.Metadata{}, time.Now(), false, &out) + mapToTransactionModel(&input, &model.Metadata{}, time.Now(), modeldecoder.Config{Experimental: false}, &out) assert.Equal(t, randomIP, out.Metadata.Client.IP, out.Metadata.Client.IP.String()) }) @@ -153,7 +153,7 @@ func TestDecodeMapToTransactionModel(t *testing.T) { var input transaction var out model.Transaction input.Context.User.Email.Set("test@user.com") - mapToTransactionModel(&input, initializedMeta(), time.Now(), false, &out) + mapToTransactionModel(&input, initializedMeta(), time.Now(), modeldecoder.Config{Experimental: false}, &out) assert.Equal(t, "test@user.com", out.Metadata.User.Email) assert.Zero(t, out.Metadata.User.ID) assert.Zero(t, out.Metadata.User.Name) @@ -177,13 +177,13 @@ func TestDecodeMapToTransactionModel(t *testing.T) { var out model.Transaction eventTime, reqTime := time.Now(), time.Now().Add(time.Second) modeldecodertest.SetStructValues(&input, "overwritten", 5000, true, eventTime) - mapToTransactionModel(&input, initializedMeta(), reqTime, true, &out) + mapToTransactionModel(&input, initializedMeta(), reqTime, modeldecoder.Config{Experimental: true}, &out) modeldecodertest.AssertStructValues(t, &out, exceptions, "overwritten", 5000, true, localhostIP, eventTime) // set requestTime if eventTime is zero modeldecodertest.SetStructValues(&input, "overwritten", 5000, true, time.Time{}) out = model.Transaction{} - mapToTransactionModel(&input, initializedMeta(), reqTime, true, &out) + mapToTransactionModel(&input, initializedMeta(), reqTime, modeldecoder.Config{Experimental: true}, &out) input.Reset() modeldecodertest.AssertStructValues(t, &out, exceptions, "overwritten", 5000, true, localhostIP, reqTime) @@ -193,10 +193,30 @@ func TestDecodeMapToTransactionModel(t *testing.T) { var input transaction input.Context.Page.URL.Set("https://my.site.test:9201") var out model.Transaction - mapToTransactionModel(&input, initializedMeta(), time.Now(), false, &out) + mapToTransactionModel(&input, initializedMeta(), time.Now(), modeldecoder.Config{Experimental: false}, &out) assert.Equal(t, "https://my.site.test:9201", *out.Page.URL.Full) assert.Equal(t, 9201, *out.Page.URL.Port) assert.Equal(t, "https", *out.Page.URL.Scheme) }) + t.Run("sample-rate", func(t *testing.T) { + var input transaction + var out model.Transaction + modeldecodertest.SetStructValues(&input, "init", 5000, true, time.Now()) + // sample rate is set to > 0 + input.SampleRate.Set(0.25) + mapToTransactionModel(&input, initializedMeta(), time.Now(), modeldecoder.Config{}, &out) + assert.Equal(t, 4.0, out.RepresentativeCount) + // sample rate is not set -> Representative Count should be 1 by default + out.RepresentativeCount = 0.0 //reset to zero value + input.SampleRate.Reset() + mapToTransactionModel(&input, initializedMeta(), time.Now(), modeldecoder.Config{}, &out) + assert.Equal(t, 1.0, out.RepresentativeCount) + // sample rate is set to 0 + out.RepresentativeCount = 0.0 //reset to zero value + input.SampleRate.Set(0) + mapToTransactionModel(&input, initializedMeta(), time.Now(), modeldecoder.Config{}, &out) + assert.Equal(t, 0.0, out.RepresentativeCount) + }) + } diff --git a/processor/stream/package_tests/error_attrs_test.go b/processor/stream/package_tests/error_attrs_test.go index 52459203497..431a118d954 100644 --- a/processor/stream/package_tests/error_attrs_test.go +++ b/processor/stream/package_tests/error_attrs_test.go @@ -101,11 +101,7 @@ func errorRequiredKeys() *tests.Set { "error.log.message", "error.exception.stacktrace.filename", "error.log.stacktrace.filename", - "error.exception.stacktrace.classname", - "error.log.stacktrace.classname", "error.context.request.method", - "error.context.request.url", - "error.trace_id", "error.parent_id", ) @@ -199,66 +195,69 @@ func TestPayloadDataForError(t *testing.T) { errorProcSetup().DataValidation(t, []tests.SchemaTestData{ {Key: "error", - Invalid: []tests.Invalid{{Msg: `invalid input type`, Values: val{false}}}}, + Invalid: []tests.Invalid{{Msg: `decode error`, Values: val{false}}}}, {Key: "error.exception.code", Valid: val{"success", ""}, - Invalid: []tests.Invalid{{Msg: `exception/properties/code/type`, Values: val{false}}}}, + Invalid: []tests.Invalid{{Msg: `validation error`, Values: val{false}}}}, {Key: "error.exception.attributes", Valid: val{map[string]interface{}{}}, - Invalid: []tests.Invalid{{Msg: `exception/properties/attributes/type`, Values: val{123}}}}, + Invalid: []tests.Invalid{{Msg: `decode error`, Values: val{123}}}}, {Key: "error.timestamp", Valid: val{json.Number("1496170422281000")}, Invalid: []tests.Invalid{ - {Msg: `timestamp/type`, Values: val{"1496170422281000"}}}}, + {Msg: `decode error`, Values: val{"1496170422281000"}}}}, {Key: "error.log.stacktrace.post_context", Valid: val{[]interface{}{}, []interface{}{"context"}}, Invalid: []tests.Invalid{ - {Msg: `log/properties/stacktrace/items/properties/post_context/items/type`, Values: val{[]interface{}{123}}}, - {Msg: `log/properties/stacktrace/items/properties/post_context/type`, Values: val{"test"}}}}, + {Msg: `decode error`, Values: val{[]interface{}{123}}}, + {Msg: `decode error`, Values: val{"test"}}}}, {Key: "error.log.stacktrace.pre_context", Valid: val{[]interface{}{}, []interface{}{"context"}}, Invalid: []tests.Invalid{ - {Msg: `log/properties/stacktrace/items/properties/pre_context/items/type`, Values: val{[]interface{}{123}}}, - {Msg: `log/properties/stacktrace/items/properties/pre_context/type`, Values: val{"test"}}}}, + {Msg: `decode error`, Values: val{[]interface{}{123}}}, + {Msg: `decode error`, Values: val{"test"}}}}, {Key: "error.exception.stacktrace.post_context", Valid: val{[]interface{}{}, []interface{}{"context"}}, Invalid: []tests.Invalid{ - {Msg: `exception/properties/stacktrace/items/properties/post_context/items/type`, Values: val{[]interface{}{123}}}, - {Msg: `exception/properties/stacktrace/items/properties/post_context/type`, Values: val{"test"}}}}, + {Msg: `decode error`, Values: val{[]interface{}{123}}}, + {Msg: `decode error`, Values: val{"test"}}}}, {Key: "error.exception.stacktrace.pre_context", Valid: val{[]interface{}{}, []interface{}{"context"}}, Invalid: []tests.Invalid{ - {Msg: `exception/properties/stacktrace/items/properties/pre_context/items/type`, Values: val{[]interface{}{123}}}, - {Msg: `exception/properties/stacktrace/items/properties/pre_context/type`, Values: val{"test"}}}}, + {Msg: `decode error`, Values: val{[]interface{}{123}}}, + {Msg: `decode error`, Values: val{"test"}}}}, {Key: "error.context.custom", Valid: val{obj{"whatever": obj{"comes": obj{"end": -45}}}, obj{"whatever": 123}}, Invalid: []tests.Invalid{ - {Msg: `context/properties/custom/additionalproperties`, Values: val{ + {Msg: `validation error`, Values: val{ obj{"what.ever": 123}, obj{"what*ever": 123}, obj{"what\"ever": 123}}}, - {Msg: `context/properties/custom/type`, Values: val{"context"}}}}, + {Msg: `decode error`, Values: val{"context"}}}}, {Key: "error.context.request.body", Valid: val{tests.Str1025, obj{}}, - Invalid: []tests.Invalid{{Msg: `/context/properties/request/properties/body/type`, Values: val{102}}}}, + Invalid: []tests.Invalid{{Msg: `validation error`, Values: val{102}}}}, {Key: "error.context.request.headers", Valid: val{ obj{"User-Agent": "go-1.1"}, obj{"foo-bar": "a,b"}, obj{"foo": []interface{}{"a", "b"}}}, - Invalid: []tests.Invalid{{Msg: `properties/headers`, Values: val{102, obj{"foo": obj{"bar": "a"}}}}}}, + Invalid: []tests.Invalid{{Msg: `decode error`, Values: val{102, obj{"foo": obj{"bar": "a"}}}}}}, {Key: "error.context.response.headers", Valid: val{ obj{"User-Agent": "go-1.1"}, obj{"foo-bar": "a,b"}, obj{"foo": []interface{}{"a", "b"}}}, - Invalid: []tests.Invalid{{Msg: `properties/headers`, Values: val{102, obj{"foo": obj{"bar": "a"}}}}}}, + Invalid: []tests.Invalid{{Msg: `decode error`, Values: val{102, obj{"foo": obj{"bar": "a"}}}}}}, {Key: "error.context.request.env", Valid: val{obj{}}, - Invalid: []tests.Invalid{{Msg: `/context/properties/request/properties/env/type`, Values: val{102, "a"}}}}, + Invalid: []tests.Invalid{{Msg: `decode error`, Values: val{102, "a"}}}}, {Key: "error.context.request.cookies", Valid: val{obj{}}, - Invalid: []tests.Invalid{{Msg: `/context/properties/request/properties/cookies/type`, Values: val{102, "a"}}}}, + Invalid: []tests.Invalid{{Msg: `decode error`, Values: val{102, "a"}}}}, {Key: "error.context.tags", Valid: val{obj{tests.Str1024Special: tests.Str1024Special}, obj{tests.Str1024: 123.45}, obj{tests.Str1024: true}}, Invalid: []tests.Invalid{ - {Msg: `context/properties/tags/type`, Values: val{"tags"}}, - {Msg: `context/properties/tags/patternproperties`, Values: val{obj{"invalid": tests.Str1025}, obj{tests.Str1024: obj{}}}}, - {Msg: `context/properties/tags/additionalproperties`, Values: val{obj{"invali*d": "hello"}, obj{"invali\"d": "hello"}, obj{"invali.d": "hello"}}}}}, + {Msg: `decode error`, Values: val{"tags"}}, + {Msg: `validation error`, Values: val{ + obj{"invalid": tests.Str1025}, + obj{tests.Str1024: obj{}}, + obj{"invali*d": "hello"}, + obj{"invali\"d": "hello"}, + obj{"invali.d": "hello"}}}}}, {Key: "error.context.user.id", Valid: val{123, tests.Str1024Special}, Invalid: []tests.Invalid{ - {Msg: `context/properties/user/properties/id/type`, Values: val{obj{}}}, - {Msg: `context/properties/user/properties/id/maxlength`, Values: val{tests.Str1025}}}}, + {Msg: `validation error`, Values: val{obj{}, tests.Str1025}}}}, }) } diff --git a/processor/stream/package_tests/intake_test_processor.go b/processor/stream/package_tests/intake_test_processor.go index 66e90cdb494..3661a2d9fa6 100644 --- a/processor/stream/package_tests/intake_test_processor.go +++ b/processor/stream/package_tests/intake_test_processor.go @@ -20,6 +20,7 @@ package package_tests import ( "bytes" "context" + "encoding/json" "errors" "io" "time" @@ -28,6 +29,8 @@ import ( "github.com/elastic/apm-server/decoder" "github.com/elastic/apm-server/model" + "github.com/elastic/apm-server/model/modeldecoder" + v2 "github.com/elastic/apm-server/model/modeldecoder/v2" "github.com/elastic/apm-server/processor/stream" "github.com/elastic/apm-server/publish" "github.com/elastic/apm-server/tests" @@ -83,12 +86,45 @@ func (p *intakeTestProcessor) LoadPayload(path string) (interface{}, error) { func (p *intakeTestProcessor) Decode(data interface{}) error { events := data.([]interface{}) for _, e := range events { - err := p.Processor.HandleRawModel(e.(map[string]interface{}), &model.Batch{}, time.Now(), model.Metadata{}) + b, err := json.Marshal(e) if err != nil { return err } + d := decoder.NewNDJSONStreamDecoder(bytes.NewReader(b), 300*1024) + body, err := d.ReadAhead() + if err != nil && err != io.EOF { + return err + } + eventType := p.IdentifyEventType(body, &stream.Result{}) + input := modeldecoder.Input{ + RequestTime: time.Now(), + Metadata: model.Metadata{}, + Config: p.Mconfig, + } + switch eventType { + case "error": + var event model.Error + err = v2.DecodeNestedError(d, &input, &event) + case "span": + var event model.Span + err = v2.DecodeNestedSpan(d, &input, &event) + case "transaction": + var event model.Transaction + err = v2.DecodeNestedTransaction(d, &input, &event) + case "metricset": + var m map[string]interface{} + if err = d.Decode(&m); err != nil && err != io.EOF { + return err + } + input.Raw = m[eventType] + err = modeldecoder.DecodeMetricset(input, &model.Batch{}) + default: + return errors.New("root key required") + } + if err != nil && err != io.EOF { + return err + } } - return nil } diff --git a/processor/stream/package_tests/metadata_attrs_test.go b/processor/stream/package_tests/metadata_attrs_test.go index 8ce99524688..df107b798f6 100644 --- a/processor/stream/package_tests/metadata_attrs_test.go +++ b/processor/stream/package_tests/metadata_attrs_test.go @@ -18,6 +18,8 @@ package package_tests import ( + "bytes" + "encoding/json" "testing" "github.com/stretchr/testify/require" @@ -25,7 +27,7 @@ import ( "github.com/elastic/apm-server/decoder" "github.com/elastic/apm-server/model" "github.com/elastic/apm-server/model/metadata/generated/schema" - "github.com/elastic/apm-server/model/modeldecoder" + v2 "github.com/elastic/apm-server/model/modeldecoder/v2" "github.com/elastic/apm-server/processor/stream" "github.com/elastic/apm-server/tests" "github.com/elastic/apm-server/tests/loader" @@ -47,14 +49,14 @@ func (p *MetadataProcessor) LoadPayload(path string) (interface{}, error) { func (p *MetadataProcessor) Validate(data interface{}) error { events := data.([]interface{}) for _, e := range events { - rawEvent := e.(map[string]interface{}) - rawMetadata, ok := rawEvent["metadata"].(map[string]interface{}) - if !ok { - return stream.ErrUnrecognizedObject + //TODO(simitt): combine loading the data and validating them once the new json decoding is finished + b, err := json.Marshal(e) + if err != nil { + return err } - - // validate the metadata object against our jsonschema - if err := modeldecoder.DecodeMetadata(rawMetadata, false, &model.Metadata{}); err != nil { + dec := decoder.NewJSONDecoder(bytes.NewReader(b)) + var m model.Metadata + if err := v2.DecodeNestedMetadata(dec, &m); err != nil { return err } } @@ -171,7 +173,7 @@ func metadataRequiredKeys() *tests.Set { "metadata.service.runtime.name", "metadata.service.runtime.version", "metadata.service.language.name", - "metadata.system.container.id", + // "metadata.system.container.id", //does not throw an error since it is the only attribute "metadata.process.pid", ) } @@ -179,7 +181,6 @@ func metadataRequiredKeys() *tests.Set { func TestAttrsPresenceInMetadata(t *testing.T) { metadataProcSetup().AttrsPresence(t, metadataRequiredKeys(), nil) } - func TestInvalidPayloadsForMetadata(t *testing.T) { type val []interface{} @@ -187,8 +188,9 @@ func TestInvalidPayloadsForMetadata(t *testing.T) { {Key: "metadata.service.name", Valid: val{"m"}, Invalid: []tests.Invalid{ - {Msg: "service/properties/name", Values: val{tests.Str1024Special}}, - {Msg: "service/properties/name", Values: val{""}}}, + {Msg: "validation error", Values: val{tests.Str1024Special}}, + {Msg: "validation error", Values: val{""}}, + }, }} metadataProcSetup().DataValidation(t, payloadData) } diff --git a/processor/stream/package_tests/span_attrs_test.go b/processor/stream/package_tests/span_attrs_test.go index 1863d1294b2..b479104a1ac 100644 --- a/processor/stream/package_tests/span_attrs_test.go +++ b/processor/stream/package_tests/span_attrs_test.go @@ -115,6 +115,9 @@ func spanRequiredKeys() *tests.Set { "span.start", "span.timestamp", "span.stacktrace.filename", + "span.context.destination.service.resource", + "span.context.destination.service.name", + "span.context.destination.service.type", ) } @@ -219,23 +222,19 @@ func TestPayloadDataForSpans(t *testing.T) { {Key: "span.context.tags", Valid: val{obj{tests.Str1024Special: tests.Str1024Special}, obj{tests.Str1024: 123.45}, obj{tests.Str1024: true}}, Invalid: []tests.Invalid{ - {Msg: `tags/type`, Values: val{"tags"}}, - {Msg: `tags/patternproperties`, Values: val{obj{"invalid": tests.Str1025}, obj{tests.Str1024: obj{}}}}, - {Msg: `tags/additionalproperties`, Values: val{obj{"invali*d": "hello"}, obj{"invali\"d": "hello"}, obj{"invali.d": "hello"}}}}, - }, + {Msg: `decode error`, Values: val{"tags"}}, + {Msg: `validation error`, Values: val{ + obj{"invalid": tests.Str1025}, obj{tests.Str1024: obj{}}, + obj{"invali*d": "hello"}, obj{"invali\"d": "hello"}, obj{"invali.d": "hello"}}}}}, {Key: "span.timestamp", Valid: val{json.Number("1496170422281000")}, Invalid: []tests.Invalid{ - {Msg: `timestamp/type`, Values: val{"1496170422281000"}}}}, + {Msg: `decode error`, Values: val{"1496170422281000"}}}}, {Key: "span.stacktrace.pre_context", - Valid: val{[]interface{}{}, []interface{}{"context"}}, - Invalid: []tests.Invalid{ - {Msg: `/stacktrace/items/properties/pre_context/items/type`, Values: val{[]interface{}{123}}}, - {Msg: `stacktrace/items/properties/pre_context/type`, Values: val{"test"}}}}, + Valid: val{[]interface{}{}, []interface{}{"context"}}, + Invalid: []tests.Invalid{{Msg: `decode error`, Values: val{[]interface{}{123}, "test"}}}}, {Key: "span.stacktrace.post_context", - Valid: val{[]interface{}{}, []interface{}{"context"}}, - Invalid: []tests.Invalid{ - {Msg: `/stacktrace/items/properties/post_context/items/type`, Values: val{[]interface{}{123}}}, - {Msg: `stacktrace/items/properties/post_context/type`, Values: val{"test"}}}}, + Valid: val{[]interface{}{}, []interface{}{"context"}}, + Invalid: []tests.Invalid{{Msg: `decode error`, Values: val{[]interface{}{123, "test"}}}}}, }) } diff --git a/processor/stream/package_tests/transaction_attrs_test.go b/processor/stream/package_tests/transaction_attrs_test.go index 66a3d867db0..1d41ce1003c 100644 --- a/processor/stream/package_tests/transaction_attrs_test.go +++ b/processor/stream/package_tests/transaction_attrs_test.go @@ -101,7 +101,6 @@ func transactionRequiredKeys() *tests.Set { "transaction.duration", "transaction.type", "transaction.context.request.method", - "transaction.context.request.url", "transaction.experience.longtask.count", "transaction.experience.longtask.sum", "transaction.experience.longtask.max", @@ -172,22 +171,23 @@ func TestPayloadDataForTransaction(t *testing.T) { []tests.SchemaTestData{ {Key: "transaction.duration", Valid: []interface{}{12.4}, - Invalid: []tests.Invalid{{Msg: `duration/type`, Values: val{"123"}}}}, + Invalid: []tests.Invalid{{Msg: `decode error`, Values: val{"123"}}}}, {Key: "transaction.timestamp", Valid: val{json.Number("1496170422281000")}, Invalid: []tests.Invalid{ - {Msg: `timestamp/type`, Values: val{"1496170422281000"}}}}, + {Msg: `decode error`, Values: val{"1496170422281000"}}}}, {Key: "transaction.marks", Valid: []interface{}{obj{}, obj{tests.Str1024: obj{tests.Str1024: 21.0, "end": -45}}}, Invalid: []tests.Invalid{ - {Msg: `marks/type`, Values: val{"marks"}}, - {Msg: `marks/patternproperties`, Values: val{ + {Msg: `decode error`, Values: val{ + "marks", obj{"timing": obj{"start": "start"}}, obj{"timing": obj{"start": obj{}}}, + }}, + {Msg: `validation error`, Values: val{ obj{"timing": obj{"m*e": -45}}, obj{"timing": obj{"m\"": -45}}, - obj{"timing": obj{"m.": -45}}}}, - {Msg: `marks/additionalproperties`, Values: val{ + obj{"timing": obj{"m.": -45}}, obj{"tim*ing": obj{"start": -45}}, obj{"tim\"ing": obj{"start": -45}}, obj{"tim.ing": obj{"start": -45}}}}}}, @@ -195,37 +195,40 @@ func TestPayloadDataForTransaction(t *testing.T) { Valid: val{obj{"whatever": obj{"comes": obj{"end": -45}}}, obj{"whatever": 123}}, Invalid: []tests.Invalid{ - {Msg: `context/properties/custom/additionalproperties`, Values: val{obj{"what.ever": 123}, obj{"what*ever": 123}, obj{"what\"ever": 123}}}, - {Msg: `context/properties/custom/type`, Values: val{"context"}}}}, + {Msg: `validation error`, Values: val{obj{"what.ever": 123}, obj{"what*ever": 123}, obj{"what\"ever": 123}}}, + {Msg: `decode error`, Values: val{"context"}}}}, {Key: "transaction.context.request.body", Valid: []interface{}{obj{}, tests.Str1025}, - Invalid: []tests.Invalid{{Msg: `context/properties/request/properties/body/type`, Values: val{102}}}}, + Invalid: []tests.Invalid{{Msg: `validation error`, Values: val{102}}}}, {Key: "transaction.context.request.headers", Valid: val{ obj{"User-Agent": "go-1.1"}, obj{"foo-bar": "a,b"}, obj{"foo": []interface{}{"a", "b"}}}, - Invalid: []tests.Invalid{{Msg: `properties/headers`, Values: val{102, obj{"foo": obj{"bar": "a"}}}}}}, + Invalid: []tests.Invalid{ + {Msg: `decode error`, Values: val{102, obj{"foo": obj{"bar": "a"}}}}}}, {Key: "transaction.context.request.env", Valid: []interface{}{obj{}}, - Invalid: []tests.Invalid{{Msg: `context/properties/request/properties/env/type`, Values: val{102, "a"}}}}, + Invalid: []tests.Invalid{{Msg: `decode error`, Values: val{102, "a"}}}}, {Key: "transaction.context.request.cookies", Valid: []interface{}{obj{}}, - Invalid: []tests.Invalid{{Msg: `context/properties/request/properties/cookies/type`, Values: val{123, ""}}}}, + Invalid: []tests.Invalid{{Msg: `decode error`, Values: val{123, ""}}}}, {Key: "transaction.context.response.headers", Valid: val{ obj{"User-Agent": "go-1.1"}, obj{"foo-bar": "a,b"}, obj{"foo": []interface{}{"a", "b"}}}, - Invalid: []tests.Invalid{{Msg: `properties/headers`, Values: val{102, obj{"foo": obj{"bar": "a"}}}}}}, + Invalid: []tests.Invalid{{Msg: `decode error`, Values: val{102, obj{"foo": obj{"bar": "a"}}}}}}, {Key: "transaction.context.tags", Valid: val{obj{tests.Str1024Special: tests.Str1024Special}, obj{tests.Str1024: 123.45}, obj{tests.Str1024: true}}, Invalid: []tests.Invalid{ - {Msg: `tags/type`, Values: val{"tags"}}, - {Msg: `tags/patternproperties`, Values: val{obj{"invalid": tests.Str1025}, obj{tests.Str1024: obj{}}}}, - {Msg: `tags/additionalproperties`, Values: val{obj{"invali*d": "hello"}, obj{"invali\"d": "hello"}, obj{"invali.d": "hello"}}}}}, + {Msg: `decode error`, Values: val{"tags"}}, + {Msg: `validation error`, Values: val{ + obj{"invalid": tests.Str1025}, + obj{tests.Str1024: obj{}}, + obj{"invali*d": "hello"}, + obj{"invali\"d": "hello"}, + obj{"invali.d": "hello"}}}}}, {Key: "transaction.context.user.id", - Valid: val{123, tests.Str1024Special}, - Invalid: []tests.Invalid{ - {Msg: `context/properties/user/properties/id/type`, Values: val{obj{}}}, - {Msg: `context/properties/user/properties/id/maxlength`, Values: val{tests.Str1025}}}}, + Valid: val{123, tests.Str1024Special}, + Invalid: []tests.Invalid{{Msg: `validation error`, Values: val{obj{}, tests.Str1025}}}}, }) } diff --git a/processor/stream/processor.go b/processor/stream/processor.go index 61249d11460..30eb542bb85 100644 --- a/processor/stream/processor.go +++ b/processor/stream/processor.go @@ -18,8 +18,8 @@ package stream import ( + "bytes" "context" - "errors" "io" "sync" "time" @@ -28,14 +28,16 @@ import ( "go.elastic.co/apm" + "github.com/pkg/errors" + "github.com/elastic/apm-server/beater/config" "github.com/elastic/apm-server/decoder" "github.com/elastic/apm-server/model" "github.com/elastic/apm-server/model/modeldecoder" - "github.com/elastic/apm-server/model/modeldecoder/field" + "github.com/elastic/apm-server/model/modeldecoder/rumv3" + v2 "github.com/elastic/apm-server/model/modeldecoder/v2" "github.com/elastic/apm-server/publish" "github.com/elastic/apm-server/utility" - "github.com/elastic/apm-server/validation" ) var ( @@ -46,44 +48,53 @@ const ( batchSize = 10 ) -type decodeMetadataFunc func(interface{}, bool, *model.Metadata) error +type decodeMetadataFunc func(decoder.Decoder, *model.Metadata) error +type decodeTransactionFunc func(decoder.Decoder, *modeldecoder.Input, *model.Transaction) error +type decodeErrorFunc func(decoder.Decoder, *modeldecoder.Input, *model.Error) error +type decodeSpanFunc func(decoder.Decoder, *modeldecoder.Input, *model.Span) error // functions with the decodeEventFunc signature decode their input argument into their batch argument (output) type decodeEventFunc func(modeldecoder.Input, *model.Batch) error type Processor struct { - Mconfig modeldecoder.Config - MaxEventSize int - streamReaderPool sync.Pool - decodeMetadata decodeMetadataFunc - models map[string]decodeEventFunc + Mconfig modeldecoder.Config + MaxEventSize int + streamReaderPool sync.Pool + decodeMetadata decodeMetadataFunc + decodeError decodeErrorFunc + decodeSpan decodeSpanFunc + decodeTransaction decodeTransactionFunc + models map[string]decodeEventFunc + isRUM bool } func BackendProcessor(cfg *config.Config) *Processor { return &Processor{ - Mconfig: modeldecoder.Config{Experimental: cfg.Mode == config.ModeExperimental}, - MaxEventSize: cfg.MaxEventSize, - decodeMetadata: modeldecoder.DecodeMetadata, + Mconfig: modeldecoder.Config{Experimental: cfg.Mode == config.ModeExperimental}, + MaxEventSize: cfg.MaxEventSize, + decodeMetadata: v2.DecodeNestedMetadata, + decodeError: v2.DecodeNestedError, + decodeSpan: v2.DecodeNestedSpan, + decodeTransaction: v2.DecodeNestedTransaction, models: map[string]decodeEventFunc{ - "transaction": modeldecoder.DecodeTransaction, - "span": modeldecoder.DecodeSpan, - "metricset": modeldecoder.DecodeMetricset, - "error": modeldecoder.DecodeError, + "metricset": modeldecoder.DecodeMetricset, }, + isRUM: false, } } func RUMV2Processor(cfg *config.Config) *Processor { return &Processor{ - Mconfig: modeldecoder.Config{Experimental: cfg.Mode == config.ModeExperimental}, - MaxEventSize: cfg.MaxEventSize, - decodeMetadata: modeldecoder.DecodeMetadata, + Mconfig: modeldecoder.Config{Experimental: cfg.Mode == config.ModeExperimental}, + MaxEventSize: cfg.MaxEventSize, + decodeMetadata: v2.DecodeNestedMetadata, + decodeError: v2.DecodeNestedError, + decodeSpan: v2.DecodeNestedSpan, + decodeTransaction: v2.DecodeNestedTransaction, models: map[string]decodeEventFunc{ - "transaction": modeldecoder.DecodeRUMV2Transaction, - "span": modeldecoder.DecodeRUMV2Span, - "metricset": modeldecoder.DecodeRUMV2Metricset, - "error": modeldecoder.DecodeRUMV2Error, + "metricset": modeldecoder.DecodeRUMV2Metricset, }, + isRUM: true, } } @@ -91,19 +102,19 @@ func RUMV3Processor(cfg *config.Config) *Processor { return &Processor{ Mconfig: modeldecoder.Config{Experimental: cfg.Mode == config.ModeExperimental, HasShortFieldNames: true}, MaxEventSize: cfg.MaxEventSize, - decodeMetadata: modeldecoder.DecodeRUMV3Metadata, + decodeMetadata: rumv3.DecodeNestedMetadata, models: map[string]decodeEventFunc{ "x": modeldecoder.DecodeRUMV3Transaction, "e": modeldecoder.DecodeRUMV3Error, "me": modeldecoder.DecodeRUMV3Metricset, }, + isRUM: true, } } -func (p *Processor) readMetadata(metadata *model.Metadata, reader *streamReader) error { - var rawModel map[string]interface{} - err := reader.Read(&rawModel) - if err != nil { +func (p *Processor) readMetadata(reader *streamReader, metadata *model.Metadata) error { + if err := p.decodeMetadata(reader, metadata); err != nil { + err = reader.wrapError(err) if err == io.EOF { return &Error{ Type: InvalidInputErrType, @@ -111,52 +122,34 @@ func (p *Processor) readMetadata(metadata *model.Metadata, reader *streamReader) Document: string(reader.LatestLine()), } } - return err - } - - fieldName := field.Mapper(p.Mconfig.HasShortFieldNames) - rawMetadata, ok := rawModel[fieldName("metadata")].(map[string]interface{}) - if !ok { + if _, ok := err.(*Error); ok { + return err + } return &Error{ Type: InvalidInputErrType, - Message: ErrUnrecognizedObject.Error(), + Message: err.Error(), Document: string(reader.LatestLine()), } } - - if err := p.decodeMetadata(rawMetadata, p.Mconfig.HasShortFieldNames, metadata); err != nil { - var ve *validation.Error - if errors.As(err, &ve) { - return &Error{ - Type: InvalidInputErrType, - Message: err.Error(), - Document: string(reader.LatestLine()), - } - } - return err - } return nil } -// HandleRawModel validates and decodes a single json object into its struct form -func (p *Processor) HandleRawModel(rawModel map[string]interface{}, batch *model.Batch, requestTime time.Time, streamMetadata model.Metadata) error { - for key, decodeEvent := range p.models { - entry, ok := rawModel[key] - if !ok { - continue - } - err := decodeEvent(modeldecoder.Input{ - Raw: entry, - RequestTime: requestTime, - Metadata: streamMetadata, - Config: p.Mconfig, - }, batch) - if err != nil { - return err - } - return nil +// IdentifyEventType takes a reader and reads ahead the first key of the +// underlying json input. This method makes some assumptions met by the +// input format: +// - the input is in json format +// - every valid ndjson line only has one root key +func (p *Processor) IdentifyEventType(body []byte, result *Result) string { + // find event type, trim spaces and account for single and double quotes + body = bytes.TrimLeft(body, `{ "'`) + end := bytes.Index(body, []byte(`"`)) + if end == -1 { + end = bytes.Index(body, []byte(`'`)) + } + if end == -1 { + return "" } - return ErrUnrecognizedObject + return string(body[0:end]) } // readBatch will read up to `batchSize` objects from the ndjson stream, @@ -189,24 +182,78 @@ func (p *Processor) readBatch( // input events are decoded and appended to the batch for i := 0; i < batchSize && !reader.IsEOF(); i++ { - var rawModel map[string]interface{} - err := reader.Read(&rawModel) + body, err := reader.ReadAhead() if err != nil && err != io.EOF { + err = reader.wrapError(err) if e, ok := err.(*Error); ok && (e.Type == InvalidInputErrType || e.Type == InputTooLargeErrType) { response.LimitedAdd(e) continue + } else { + // return early, we assume we can only recover from a input error types + response.Add(err) + return true } - // return early, we assume we can only recover from a input error types - response.Add(err) - return true } - if len(rawModel) > 0 { - - err := p.HandleRawModel(rawModel, batch, requestTime, *streamMetadata) - if err != nil { + if len(body) == 0 { + // required for backwards compatibility - sending empty lines was permitted in previous versions + continue + } + eventType := p.IdentifyEventType(body, response) + input := modeldecoder.Input{ + RequestTime: requestTime, + Metadata: *streamMetadata, + Config: p.Mconfig, + } + if decodeFn, ok := p.models[eventType]; ok { + var rawModel map[string]interface{} + err = reader.Decode(&rawModel) + err = reader.wrapError(err) + if err != nil && err != io.EOF { + if e, ok := err.(*Error); ok && (e.Type == InvalidInputErrType || e.Type == InputTooLargeErrType) { + response.LimitedAdd(e) + continue + } + // return early, we assume we can only recover from a input error types + response.Add(err) + return true + } + if len(rawModel) > 0 { + input.Raw = rawModel[eventType] + if err := decodeFn(input, batch); err != nil { + response.LimitedAdd(&Error{ + Type: InvalidInputErrType, + Message: err.Error(), + Document: string(reader.LatestLine()), + }) + continue + } + } + } else { + switch eventType { + case "error": + var event model.Error + if handleDecodeErr(p.decodeError(reader, &input, &event), reader, response) { + continue + } + event.RUM = p.isRUM + batch.Errors = append(batch.Errors, &event) + case "span": + var event model.Span + if handleDecodeErr(p.decodeSpan(reader, &input, &event), reader, response) { + continue + } + event.RUM = p.isRUM + batch.Spans = append(batch.Spans, &event) + case "transaction": + var event model.Transaction + if handleDecodeErr(p.decodeTransaction(reader, &input, &event), reader, response) { + continue + } + batch.Transactions = append(batch.Transactions, &event) + default: response.LimitedAdd(&Error{ Type: InvalidInputErrType, - Message: err.Error(), + Message: errors.Wrap(ErrUnrecognizedObject, eventType).Error(), Document: string(reader.LatestLine()), }) continue @@ -216,6 +263,22 @@ func (p *Processor) readBatch( return reader.IsEOF() } +func handleDecodeErr(err error, r *streamReader, result *Result) bool { + if err == nil || err == io.EOF { + return false + } + e, ok := err.(*Error) + if !ok || (e.Type != InvalidInputErrType && e.Type != InputTooLargeErrType) { + e = &Error{ + Type: InvalidInputErrType, + Message: err.Error(), + Document: string(r.LatestLine()), + } + } + result.LimitedAdd(e) + return true +} + // HandleStream processes a stream of events func (p *Processor) HandleStream(ctx context.Context, ipRateLimiter *rate.Limiter, meta *model.Metadata, reader io.Reader, report publish.Reporter) *Result { res := &Result{} @@ -224,7 +287,7 @@ func (p *Processor) HandleStream(ctx context.Context, ipRateLimiter *rate.Limite defer sr.release() // first item is the metadata object - err := p.readMetadata(meta, sr) + err := p.readMetadata(sr, meta) if err != nil { // no point in continuing if we couldn't read the metadata res.Add(err) @@ -298,26 +361,27 @@ func (sr *streamReader) release() { sr.processor.streamReaderPool.Put(sr) } -func (sr *streamReader) Read(v *map[string]interface{}) error { - // TODO(axw) decode into a reused map, clearing out the - // map between reads. We would require that decoders copy - // any contents of rawModel that they wish to retain after - // the call, in order to safely reuse the map. - err := sr.NDJSONStreamDecoder.Decode(v) - if err != nil { - if _, ok := err.(decoder.JSONDecodeError); ok { - return &Error{ - Type: InvalidInputErrType, - Message: err.Error(), - Document: string(sr.LatestLine()), - } +func (sr *streamReader) wrapError(err error) error { + if err == nil { + return nil + } + if _, ok := err.(decoder.JSONDecodeError); ok { + return &Error{ + Type: InvalidInputErrType, + Message: err.Error(), + Document: string(sr.LatestLine()), } - if err == decoder.ErrLineTooLong { - return &Error{ - Type: InputTooLargeErrType, - Message: "event exceeded the permitted size.", - Document: string(sr.LatestLine()), - } + } + + var e = err + if err, ok := err.(v2.DecodeError); ok { + e = err.Unwrap() + } + if errors.Is(e, decoder.ErrLineTooLong) { + return &Error{ + Type: InputTooLargeErrType, + Message: "event exceeded the permitted size.", + Document: string(sr.LatestLine()), } } return err diff --git a/processor/stream/test_approved_es_documents/testIntakeIntegrationErrors.approved.json b/processor/stream/test_approved_es_documents/testIntakeIntegrationErrors.approved.json index f8036f7ce98..54f576a7064 100644 --- a/processor/stream/test_approved_es_documents/testIntakeIntegrationErrors.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeIntegrationErrors.approved.json @@ -256,7 +256,7 @@ "Mozilla Chrome Edge" ] }, - "method": "post", + "method": "POST", "referrer": "http://localhost:8000/test/e2e/", "socket": { "encrypted": true, diff --git a/processor/stream/test_approved_es_documents/testIntakeIntegrationEvents.approved.json b/processor/stream/test_approved_es_documents/testIntakeIntegrationEvents.approved.json index b2c41c84ff1..599c731aad3 100644 --- a/processor/stream/test_approved_es_documents/testIntakeIntegrationEvents.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeIntegrationEvents.approved.json @@ -59,7 +59,7 @@ "MozillaChromeEdge" ] }, - "method": "post", + "method": "POST", "socket": { "encrypted": true, "remote_address": "12.53.12.1:8080" @@ -271,7 +271,7 @@ "us": 3781 }, "http": { - "method": "get", + "method": "GET", "response": { "decoded_body_size": 401, "encoded_body_size": 356, @@ -629,7 +629,7 @@ "opbeans-java:3000" ] }, - "method": "post", + "method": "POST", "socket": { "encrypted": true, "remote_address": "12.53.12.1" diff --git a/processor/stream/test_approved_es_documents/testIntakeIntegrationSpans.approved.json b/processor/stream/test_approved_es_documents/testIntakeIntegrationSpans.approved.json index a644de5a3ea..0d46024b789 100644 --- a/processor/stream/test_approved_es_documents/testIntakeIntegrationSpans.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeIntegrationSpans.approved.json @@ -591,7 +591,7 @@ "us": 3781 }, "http": { - "method": "get", + "method": "GET", "response": { "decoded_body_size": 401, "encoded_body_size": 356, diff --git a/processor/stream/test_approved_es_documents/testIntakeIntegrationTransactions.approved.json b/processor/stream/test_approved_es_documents/testIntakeIntegrationTransactions.approved.json index ce452ca053b..7ff6a5f5ca1 100644 --- a/processor/stream/test_approved_es_documents/testIntakeIntegrationTransactions.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeIntegrationTransactions.approved.json @@ -197,7 +197,7 @@ "Mozilla Chrome Edge" ] }, - "method": "post", + "method": "POST", "referrer": "http://localhost:8000/test/e2e/", "socket": { "encrypted": true, @@ -368,7 +368,7 @@ }, "http": { "request": { - "method": "post", + "method": "POST", "socket": { "remote_address": "192.0.1" } diff --git a/processor/stream/test_approved_stream_result/testIntegrationResultInvalidEvent.approved.json b/processor/stream/test_approved_stream_result/testIntegrationResultInvalidEvent.approved.json index 703797831cb..b18aa4919f9 100644 --- a/processor/stream/test_approved_stream_result/testIntegrationResultInvalidEvent.approved.json +++ b/processor/stream/test_approved_stream_result/testIntegrationResultInvalidEvent.approved.json @@ -3,7 +3,7 @@ "errors": [ { "document": "{ \"transaction\": { \"id\": 12345, \"trace_id\": \"0123456789abcdef0123456789abcdef\", \"parent_id\": \"abcdefabcdef01234567\", \"type\": \"request\", \"duration\": 32.592981, \"span_count\": { \"started\": 21 } } } ", - "message": "failed to validate transaction: error validating JSON: I[#] S[#] doesn't validate with \"transaction#\"\n I[#] S[#/allOf/3] allOf failed\n I[#/id] S[#/allOf/3/properties/id/type] expected string, but got number" + "message": "decode error: data read error: v2.transactionRoot.Transaction: v2.transaction.ID: ReadString: expects \" or n, but found 1, error found in #10 byte of ...| { \"id\": 12345, \"tra|..., bigger context ...|{ \"transaction\": { \"id\": 12345, \"trace_id\": \"0123456789abcdef0123456789abcde|..." } ] } diff --git a/processor/stream/test_approved_stream_result/testIntegrationResultInvalidJSONEvent.approved.json b/processor/stream/test_approved_stream_result/testIntegrationResultInvalidJSONEvent.approved.json index ac1e798d6f8..ae54db10f8d 100644 --- a/processor/stream/test_approved_stream_result/testIntegrationResultInvalidJSONEvent.approved.json +++ b/processor/stream/test_approved_stream_result/testIntegrationResultInvalidJSONEvent.approved.json @@ -3,7 +3,7 @@ "errors": [ { "document": "{ \"invalid-json\" }", - "message": "data read error: invalid character '}' after object key" + "message": "invalid-json: did not recognize object type" } ] } diff --git a/processor/stream/test_approved_stream_result/testIntegrationResultInvalidJSONMetadata.approved.json b/processor/stream/test_approved_stream_result/testIntegrationResultInvalidJSONMetadata.approved.json index b1f27fbb794..8481bde2ec4 100644 --- a/processor/stream/test_approved_stream_result/testIntegrationResultInvalidJSONMetadata.approved.json +++ b/processor/stream/test_approved_stream_result/testIntegrationResultInvalidJSONMetadata.approved.json @@ -3,7 +3,7 @@ "errors": [ { "document": "{\"metadata\": {\"invalid-json\"}}", - "message": "data read error: invalid character '}' after object key" + "message": "decode error: data read error: v2.metadataRoot.Metadata: v2.metadata.readFieldHash: expect :, but found }, error found in #10 byte of ...|lid-json\"}}|..., bigger context ...|{\"metadata\": {\"invalid-json\"}}|..." } ] } diff --git a/processor/stream/test_approved_stream_result/testIntegrationResultInvalidMetadata.approved.json b/processor/stream/test_approved_stream_result/testIntegrationResultInvalidMetadata.approved.json index da67acd9c67..b36495f24f1 100644 --- a/processor/stream/test_approved_stream_result/testIntegrationResultInvalidMetadata.approved.json +++ b/processor/stream/test_approved_stream_result/testIntegrationResultInvalidMetadata.approved.json @@ -3,7 +3,7 @@ "errors": [ { "document": "{\"metadata\": {\"user\": null}}", - "message": "failed to validate metadata: error validating JSON: I[#] S[#] doesn't validate with \"metadata#\"\n I[#] S[#/required] missing properties: \"service\"" + "message": "validation error: 'metadata' required" } ] } diff --git a/processor/stream/test_approved_stream_result/testIntegrationResultInvalidMetadata2.approved.json b/processor/stream/test_approved_stream_result/testIntegrationResultInvalidMetadata2.approved.json index 26640294ec7..3b487d4e70f 100644 --- a/processor/stream/test_approved_stream_result/testIntegrationResultInvalidMetadata2.approved.json +++ b/processor/stream/test_approved_stream_result/testIntegrationResultInvalidMetadata2.approved.json @@ -3,7 +3,7 @@ "errors": [ { "document": "{\"not\": \"metadata\"}", - "message": "did not recognize object type" + "message": "validation error: 'metadata' required" } ] } diff --git a/processor/stream/test_approved_stream_result/testIntegrationResultReadError.approved.json b/processor/stream/test_approved_stream_result/testIntegrationResultReadError.approved.json index 0e4a36a0497..7bfc1f07165 100644 --- a/processor/stream/test_approved_stream_result/testIntegrationResultReadError.approved.json +++ b/processor/stream/test_approved_stream_result/testIntegrationResultReadError.approved.json @@ -1,5 +1,5 @@ { - "accepted": 4, + "accepted": 3, "errors": [ { "message": "timeout" diff --git a/processor/stream/test_approved_stream_result/testIntegrationResultUnrecognizedEvent.approved.json b/processor/stream/test_approved_stream_result/testIntegrationResultUnrecognizedEvent.approved.json index 0dfff3f52d8..a3231874cb8 100644 --- a/processor/stream/test_approved_stream_result/testIntegrationResultUnrecognizedEvent.approved.json +++ b/processor/stream/test_approved_stream_result/testIntegrationResultUnrecognizedEvent.approved.json @@ -3,7 +3,7 @@ "errors": [ { "document": "{\"tennis-court\": {\"name\": \"Centre Court, Wimbledon\"}}", - "message": "did not recognize object type" + "message": "tennis-court: did not recognize object type" } ] } diff --git a/systemtest/logging_test.go b/systemtest/logging_test.go index 454ec906325..355dd1e9de5 100644 --- a/systemtest/logging_test.go +++ b/systemtest/logging_test.go @@ -95,7 +95,7 @@ func TestAPMServerRequestLoggingValid(t *testing.T) { }}, requestEntries) assert.NotContains(t, logEntries[0].Fields, "error") - assert.Regexp(t, "failed to validate transaction: .*", logEntries[1].Fields["error"]) + assert.Regexp(t, "validation error: 'transaction' required", logEntries[1].Fields["error"]) assert.Equal(t, "event exceeded the permitted size.", logEntries[2].Fields["error"]) } diff --git a/testdata/intake-v2/transactions.ndjson b/testdata/intake-v2/transactions.ndjson index c363958f516..853304a1bff 100644 --- a/testdata/intake-v2/transactions.ndjson +++ b/testdata/intake-v2/transactions.ndjson @@ -2,4 +2,4 @@ {"transaction": { "id": "945254c567a5417e", "trace_id": "0123456789abcdef0123456789abcdef", "parent_id": "abcdefabcdef01234567", "type": "request", "duration": 32.592981, "span_count": { "started": 43 }}} {"transaction": {"id": "4340a8e0df1906ecbfa9", "trace_id": "0acd456789abcdef0123456789abcdef", "name": "GET /api/types","type": "request","duration": 32.592981,"outcome":"success", "result": "success", "timestamp": 1496170407154000, "sampled": true, "span_count": {"started": 17},"context": {"service": {"runtime": {"version": "7.0"}},"page":{"referer":"http://localhost:8000/test/e2e/","url":"http://localhost:8000/test/e2e/general-usecase/"}, "request": {"socket": {"remote_address": "12.53.12.1","encrypted": true},"http_version": "1.1","method": "POST","url": {"protocol": "https:","full": "https://www.example.com/p/a/t/h?query=string#hash","hostname": "www.example.com","port": "8080","pathname": "/p/a/t/h","search": "?query=string","hash": "#hash","raw": "/p/a/t/h?query=string#hash"},"headers": {"user-agent":["Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.103 Safari/537.36","Mozilla Chrome Edge"],"content-type": "text/html","cookie": "c1=v1, c2=v2","some-other-header": "foo","array": ["foo","bar","baz"]},"cookies": {"c1": "v1","c2": "v2"},"env": {"SERVER_SOFTWARE": "nginx","GATEWAY_INTERFACE": "CGI/1.1"},"body": {"str": "hello world","additional": { "foo": {},"bar": 123,"req": "additional information"}}},"response": {"status_code": 200,"headers": {"content-type": "application/json"},"headers_sent": true,"finished": true,"transfer_size":25.8,"encoded_body_size":26.90,"decoded_body_size":29.90}, "user": {"id": "99","username": "foo"},"tags": {"organization_uuid": "9f0e9d64-c185-4d21-a6f4-4673ed561ec8", "tag2": 12, "tag3": 12.45, "tag4": false, "tag5": null },"custom": {"my_key": 1,"some_other_value": "foo bar","and_objects": {"foo": ["bar","baz"]},"(": "not a valid regex and that is fine"}}}} {"transaction": { "id": "cdef4340a8e0df19", "trace_id": "0acd456789abcdef0123456789abcdef", "type": "request", "duration": 13.980558, "timestamp": 1532976822281000, "sampled": null, "span_count": { "dropped": 55, "started": 436 }, "marks": {"navigationTiming": {"appBeforeBootstrap": 608.9300000000001,"navigationStart": -21},"another_mark": {"some_long": 10,"some_float": 10.0}, "performance": {}}, "context": { "request": { "socket": { "remote_address": "192.0.1", "encrypted": null }, "method": "POST", "headers": { "user-agent": null, "content-type": null, "cookie": null }, "url": { "protocol": null, "full": null, "hostname": null, "port": null, "pathname": null, "search": null, "hash": null, "raw": null } }, "response": { "headers": { "content-type": null } }, "service": {"environment":"testing","name": "service1","node": {"configured_name": "node-ABC"}, "language": {"version": "2.5", "name": "ruby"}, "agent": {"version": "2.2", "name": "elastic-ruby", "ephemeral_id": "justanid"}, "framework": {"version": "5.0", "name": "Rails"}, "version": "2", "runtime": {"version": "2.5", "name": "cruby"}}},"experience":{"cls":1,"fid":2.0,"tbt":3.4,"longtask":{"count":3,"sum":2.5,"max":1}}}} -{"transaction": { "id": "00xxxxFFaaaa1234", "trace_id": "0123456789abcdef0123456789abcdef", "name": "amqp receive", "parent_id": "abcdefabcdef01234567", "type": "messaging", "duration": 3, "span_count": { "started": 1 }, "context": {"message": {"queue": { "name": "new_users"}, "age":{ "ms": 1577958057123}, "headers": {"user_id": "1ax3", "involved_services": ["user", "auth"]}, "body": "user created"}}}} +{"transaction": { "id": "00xxxxFFaaaa1234", "trace_id": "0123456789abcdef0123456789abcdef", "name": "amqp receive", "parent_id": "abcdefabcdef01234567", "type": "messaging", "duration": 3, "span_count": { "started": 1 }, "context": {"message": {"queue": { "name": "new_users"}, "age":{ "ms": 1577958057123}, "headers": {"user_id": "1ax3", "involved_services": ["user", "auth"]}, "body": "user created"}}}} \ No newline at end of file diff --git a/tests/json_schema.go b/tests/json_schema.go index dd88b373401..1061ed95eec 100644 --- a/tests/json_schema.go +++ b/tests/json_schema.go @@ -122,20 +122,7 @@ func (ps *ProcessorSetup) AttrsMatchJsonSchema(t *testing.T, payloadAttrs, paylo // - `required`: ensure required keys must not be missing or nil // - `conditionally required`: prepare payload according to conditions, then // ensure required keys must not be missing -func (ps *ProcessorSetup) AttrsPresence(t *testing.T, requiredKeys *Set, condRequiredKeys map[string]Condition) { - - required := Union(requiredKeys, NewSet( - "service", - "service.name", - "service.agent", - "service.agent.name", - "service.agent.version", - "service.language.name", - "service.runtime.name", - "service.runtime.version", - "process.pid", - )) - +func (ps *ProcessorSetup) AttrsPresence(t *testing.T, required *Set, condRequiredKeys map[string]Condition) { payload, err := ps.Proc.LoadPayload(ps.FullPayloadPath) require.NoError(t, err) @@ -149,7 +136,7 @@ func (ps *ProcessorSetup) AttrsPresence(t *testing.T, requiredKeys *Set, condReq //test sending nil value for key ps.changePayload(t, key, nil, Condition{}, upsertFn, func(k string) (bool, []string) { - errMsgs := []string{keyLast, "did not recognize object type"} + errMsgs := []string{keyLast, "did not recognize object type", "requires at least one of the fields", "required"} return !required.ContainsStrPattern(k), errMsgs }, ) @@ -158,14 +145,30 @@ func (ps *ProcessorSetup) AttrsPresence(t *testing.T, requiredKeys *Set, condReq cond := condRequiredKeys[key] ps.changePayload(t, key, nil, cond, deleteFn, func(k string) (bool, []string) { + validationErr := "validation error:" + keyParts := strings.Split(key, ".") + prefix := " " + for i := 0; i < len(keyParts); i++ { + if i == len(keyParts)-1 { + validationErr = fmt.Sprintf("%s%s'%s'", validationErr, prefix, keyParts[i]) + continue + } + validationErr = fmt.Sprintf("%s%s%s", validationErr, prefix, keyParts[i]) + prefix = ": " + } errMsgs := []string{ fmt.Sprintf("missing properties: \"%s\"", keyLast), + fmt.Sprintf("'%s'", key), "did not recognize object type", + validationErr, + "requires at least one of the fields", + "required", } if required.ContainsStrPattern(k) { return false, errMsgs - } else if _, ok := condRequiredKeys[k]; ok { + } + if _, ok := condRequiredKeys[k]; ok { return false, errMsgs } return true, []string{} @@ -261,7 +264,7 @@ func (ps *ProcessorSetup) changePayload( require.NoError(t, err) err = ps.Proc.Validate(payload) - assert.NoError(t, err, "vanilla payload did not validate") + require.NoError(t, err, "vanilla payload did not validate, error: %v", err) // prepare payload according to conditions: @@ -298,12 +301,12 @@ func (ps *ProcessorSetup) changePayload( } else { if assert.Error(t, err, fmt.Sprintf(`Expected error for key <%v>, but received no error.`, key)) { for _, errMsg := range errMsgs { - if strings.Contains(strings.ToLower(err.Error()), errMsg) { + if strings.Contains(strings.ToLower(err.Error()), strings.ToLower(errMsg)) { return } } wantLog = true - assert.Fail(t, fmt.Sprintf("Expected error to be one of %v, but was %v", errMsgs, err.Error())) + assert.Fail(t, fmt.Sprintf("Expected error to be one of <%v>, but was <%v>", errMsgs, err.Error())) } else { wantLog = true } diff --git a/tests/loader/loader.go b/tests/loader/loader.go index 1cb7343a18f..df75cc92f8b 100644 --- a/tests/loader/loader.go +++ b/tests/loader/loader.go @@ -71,5 +71,7 @@ func unmarshalData(filePath string, err error) (map[string]interface{}, error) { return nil, err } defer r.Close() - return decoder.DecodeJSONData(r) + var m map[string]interface{} + err = decoder.NewJSONDecoder(r).Decode(&m) + return m, err } diff --git a/tests/system/drop_unsampled_transactions.approved.json b/tests/system/drop_unsampled_transactions.approved.json index d93fd09b0dc..6212d88ceec 100644 --- a/tests/system/drop_unsampled_transactions.approved.json +++ b/tests/system/drop_unsampled_transactions.approved.json @@ -265,7 +265,7 @@ "Mozilla Chrome Edge" ] }, - "method": "post", + "method": "POST", "referrer": "http://localhost:8000/test/e2e/", "socket": { "encrypted": true, diff --git a/tests/system/error.approved.json b/tests/system/error.approved.json index 26da94671cf..f1d108baaed 100644 --- a/tests/system/error.approved.json +++ b/tests/system/error.approved.json @@ -310,7 +310,7 @@ "Mozilla Chrome Edge" ] }, - "method": "post", + "method": "POST", "referrer": "http://localhost:8000/test/e2e/", "socket": { "encrypted": true, diff --git a/tests/system/keep_unsampled_transactions.approved.json b/tests/system/keep_unsampled_transactions.approved.json index 5c45cad3ff4..377b90cbd06 100644 --- a/tests/system/keep_unsampled_transactions.approved.json +++ b/tests/system/keep_unsampled_transactions.approved.json @@ -361,7 +361,7 @@ "Mozilla Chrome Edge" ] }, - "method": "post", + "method": "POST", "referrer": "http://localhost:8000/test/e2e/", "socket": { "encrypted": true, diff --git a/tests/system/spans.approved.json b/tests/system/spans.approved.json index 69ea7b3d0e6..6c888737a92 100644 --- a/tests/system/spans.approved.json +++ b/tests/system/spans.approved.json @@ -47,7 +47,7 @@ "us": 3781 }, "http": { - "method": "get", + "method": "GET", "response": { "status_code": 200 }, diff --git a/tests/system/test_integration.py b/tests/system/test_integration.py index 0c6c895d3b9..78e292cfca2 100644 --- a/tests/system/test_integration.py +++ b/tests/system/test_integration.py @@ -333,7 +333,7 @@ def check_experimental_key_indexed(self, experimental): # check whether or not top level key `experimental` has been indexed rs = self.es.search(index=idx, body={"query": {"exists": {"field": 'experimental'}}}) ct = 1 if experimental else 0 - assert rs['hits']['total']['value'] == ct + assert rs['hits']['total']['value'] == ct, idx @integration_test diff --git a/tests/system/test_requests.py b/tests/system/test_requests.py index 27bf0b8ccb3..515ff4c6777 100644 --- a/tests/system/test_requests.py +++ b/tests/system/test_requests.py @@ -41,7 +41,7 @@ def test_validation_fail(self): data = self.get_event_payload(name="invalid-event.ndjson") r = self.request_intake(data=data) assert r.status_code == 400, r.status_code - assert "failed to validate transaction: error validating JSON" in r.text, r.text + assert "decode error: data read error" in r.text, r.text def test_rum_default_disabled(self): r = self.request_intake(url='http://localhost:8200/intake/v2/rum/events') diff --git a/tests/system/transaction.approved.json b/tests/system/transaction.approved.json index 755282d5319..8d3046c5aea 100644 --- a/tests/system/transaction.approved.json +++ b/tests/system/transaction.approved.json @@ -361,7 +361,7 @@ "Mozilla Chrome Edge" ] }, - "method": "post", + "method": "POST", "referrer": "http://localhost:8000/test/e2e/", "socket": { "encrypted": true,