Skip to content

Commit

Permalink
model decode: Integrate new v2 decoder with backend stream processor (#…
Browse files Browse the repository at this point in the history
…4261)

* change stream processor logic

* adapt package tests

* adapt decoder to be able to read ahaed for identifying event type

* add RUM information in stream processor for accurate sourcemapping

* Allow empty lines for backwards compatibility

* Add changelog for error messages on Intake API
  • Loading branch information
simitt authored Oct 6, 2020
1 parent 224a375 commit 2c36ad7
Show file tree
Hide file tree
Showing 66 changed files with 619 additions and 533 deletions.
7 changes: 6 additions & 1 deletion beater/api/intake/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,12 @@ func TestIntakeHandler(t *testing.T) {
code: http.StatusAccepted, id: request.IDResponseValidAccepted,
},
"TooLarge": {
path: "errors.ndjson", processor: &stream.Processor{},
path: "errors.ndjson",
processor: func() *stream.Processor {
p := stream.BackendProcessor(config.DefaultConfig())
p.MaxEventSize = 10
return p
}(),
code: http.StatusBadRequest, id: request.IDResponseErrorsRequestTooLarge},
"Closing": {
path: "errors.ndjson", reporter: beatertest.ErrorReporterFn(publish.ErrChannelClosed),
Expand Down
2 changes: 1 addition & 1 deletion beater/api/intake/test_approved/BodyReader.approved.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"accepted": 0,
"errors": [
{
"message": "EOF while reading metadata"
"message": "validation error: 'metadata' required"
}
]
}
2 changes: 1 addition & 1 deletion beater/api/intake/test_approved/InvalidEvent.approved.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"errors": [
{
"document": "{ \"transaction\": { \"id\": 12345, \"trace_id\": \"0123456789abcdef0123456789abcdef\", \"parent_id\": \"abcdefabcdef01234567\", \"type\": \"request\", \"duration\": 32.592981, \"span_count\": { \"started\": 21 } } } ",
"message": "failed to validate transaction: error validating JSON: I[#] S[#] doesn't validate with \"transaction#\"\n I[#] S[#/allOf/3] allOf failed\n I[#/id] S[#/allOf/3/properties/id/type] expected string, but got number"
"message": "decode error: data read error: v2.transactionRoot.Transaction: v2.transaction.ID: ReadString: expects \" or n, but found 1, error found in #10 byte of ...| { \"id\": 12345, \"tra|..., bigger context ...|{ \"transaction\": { \"id\": 12345, \"trace_id\": \"0123456789abcdef0123456789abcde|..."
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"errors": [
{
"document": "{ \"invalid-json\" }",
"message": "data read error: invalid character '}' after object key"
"message": "invalid-json: did not recognize object type"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"errors": [
{
"document": "{\"metadata\": {\"invalid-json\"}}",
"message": "data read error: invalid character '}' after object key"
"message": "decode error: data read error: v2.metadataRoot.Metadata: v2.metadata.readFieldHash: expect :, but found }, error found in #10 byte of ...|lid-json\"}}|..., bigger context ...|{\"metadata\": {\"invalid-json\"}}|..."
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"errors": [
{
"document": "{\"metadata\": {\"user\": null}}",
"message": "failed to validate metadata: error validating JSON: I[#] S[#] doesn't validate with \"metadata#\"\n I[#] S[#/required] missing properties: \"service\""
"message": "validation error: 'metadata' required"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"errors": [
{
"document": "{\"not\": \"metadata\"}",
"message": "did not recognize object type"
"message": "validation error: 'metadata' required"
}
]
}
1 change: 1 addition & 0 deletions beater/api/intake/test_approved/TooLarge.approved.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"accepted": 0,
"errors": [
{
"document": "{\"metadata",
"message": "event exceeded the permitted size."
}
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"errors": [
{
"document": "{\"tennis-court\": {\"name\": \"Centre Court, Wimbledon\"}}",
"message": "did not recognize object type"
"message": "tennis-court: did not recognize object type"
}
]
}
26 changes: 9 additions & 17 deletions beater/api/profile/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,9 @@ import (
"github.com/elastic/apm-server/beater/request"
"github.com/elastic/apm-server/decoder"
"github.com/elastic/apm-server/model"
"github.com/elastic/apm-server/model/modeldecoder"
v2 "github.com/elastic/apm-server/model/modeldecoder/v2"
"github.com/elastic/apm-server/publish"
"github.com/elastic/apm-server/transform"
"github.com/elastic/apm-server/validation"
)

var (
Expand Down Expand Up @@ -104,34 +103,27 @@ func Handler(report publish.Reporter) request.Handler {
}
}
r := &decoder.LimitedReader{R: part, N: metadataContentLengthLimit}
raw, err := decoder.DecodeJSONData(r)
if err != nil {
dec := decoder.NewJSONDecoder(r)
metadata := model.Metadata{
UserAgent: model.UserAgent{Original: c.RequestMetadata.UserAgent},
Client: model.Client{IP: c.RequestMetadata.ClientIP},
System: model.System{IP: c.RequestMetadata.SystemIP}}
if err := v2.DecodeMetadata(dec, &metadata); err != nil {
if r.N < 0 {
return nil, requestError{
id: request.IDResponseErrorsRequestTooLarge,
err: err,
}
}
return nil, requestError{
id: request.IDResponseErrorsDecode,
err: errors.Wrap(err, "failed to decode metadata JSON"),
}
}
metadata := model.Metadata{
UserAgent: model.UserAgent{Original: c.RequestMetadata.UserAgent},
Client: model.Client{IP: c.RequestMetadata.ClientIP},
System: model.System{IP: c.RequestMetadata.SystemIP}}
if err := modeldecoder.DecodeMetadata(raw, false, &metadata); err != nil {
var ve *validation.Error
if errors.As(err, &ve) {
if _, ok := err.(v2.ValidationError); ok {
return nil, requestError{
id: request.IDResponseErrorsValidate,
err: errors.Wrap(err, "invalid metadata"),
}
}
return nil, requestError{
id: request.IDResponseErrorsDecode,
err: errors.Wrap(err, "failed to decode metadata"),
err: errors.Wrap(err, "invalid metadata"),
}
}
profileMetadata = metadata
Expand Down
6 changes: 3 additions & 3 deletions beater/api/profile/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,16 +128,16 @@ func TestHandler(t *testing.T) {
id: request.IDResponseValidAccepted,
parts: []part{
heapProfilePart(),
part{
{
name: "profile",
// No messageType param specified, so pprof is assumed.
contentType: "application/x-protobuf",
body: heapProfileBody(),
},
part{
{
name: "metadata",
contentType: "application/json",
body: strings.NewReader(`{"service":{"name":"foo","agent":{"name":"go","version":"1.0"}}}`),
body: strings.NewReader(`{"service":{"name":"foo","agent":{"name":"java","version":"1.2.0"}}}`),
},
},
body: prettyJSON(map[string]interface{}{"accepted": 2}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@
"Mozilla Chrome Edge"
]
},
"method": "post",
"method": "POST",
"referrer": "http://localhost:8000/test/e2e/",
"socket": {
"encrypted": true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
"MozillaChromeEdge"
]
},
"method": "post",
"method": "POST",
"socket": {
"encrypted": true,
"remote_address": "12.53.12.1:8080"
Expand Down Expand Up @@ -294,7 +294,7 @@
"us": 3781
},
"http": {
"method": "get",
"method": "GET",
"response": {
"decoded_body_size": 401,
"encoded_body_size": 356,
Expand Down Expand Up @@ -666,7 +666,7 @@
"opbeans-java:3000"
]
},
"method": "post",
"method": "POST",
"socket": {
"encrypted": true,
"remote_address": "12.53.12.1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,7 @@
"us": 3781
},
"http": {
"method": "get",
"method": "GET",
"response": {
"decoded_body_size": 401,
"encoded_body_size": 356,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@
"Mozilla Chrome Edge"
]
},
"method": "post",
"method": "POST",
"referrer": "http://localhost:8000/test/e2e/",
"socket": {
"encrypted": true,
Expand Down Expand Up @@ -395,7 +395,7 @@
},
"http": {
"request": {
"method": "post",
"method": "POST",
"socket": {
"remote_address": "192.0.1"
}
Expand Down
1 change: 1 addition & 0 deletions changelogs/head.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ https://github.com/elastic/apm-server/compare/7.9\...master[View commits]

[float]
==== Intake API Changes
* Changed error messages for invalid events due to internal changes of decoder logic {pull}4261[4261]

[float]
==== Added
Expand Down
15 changes: 7 additions & 8 deletions decoder/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,21 @@ import (
)

//TODO(simitt): look into config options for performance tuning
var jsonit = jsoniter.ConfigCompatibleWithStandardLibrary
var json = jsoniter.ConfigCompatibleWithStandardLibrary

type Decoder interface {
Decode(v interface{}) error
}

// JSONIterDecoder can decode from a given reader, using jsoniter
// TODO(simitt): rename to JSONDecoder when everything is integrated
type JSONIterDecoder struct {
type JSONDecoder struct {
*jsoniter.Decoder
reader io.Reader
}

// NewJSONIteratorDecoder returns a *json.Decoder where numbers are unmarshaled
// NewJSONDecoder returns a *json.Decoder where numbers are unmarshaled
// as a Number instead of a float64 into an interface{}
func NewJSONIteratorDecoder(r io.Reader) JSONIterDecoder {
d := jsonit.NewDecoder(r)
func NewJSONDecoder(r io.Reader) JSONDecoder {
d := json.NewDecoder(r)
d.UseNumber()
return JSONIterDecoder{Decoder: d}
return JSONDecoder{Decoder: d, reader: r}
}
4 changes: 3 additions & 1 deletion decoder/req_decoder_test.go → decoder/decoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ import (
)

func TestDecodeJSONData(t *testing.T) {
decoded, err := decoder.DecodeJSONData(strings.NewReader(
d := decoder.NewJSONDecoder(strings.NewReader(
`{"id":"85925e55b43f4342","system": {"hostname":"prod1.example.com"},"number":123}`,
))
var decoded map[string]interface{}
err := d.Decode(&decoded)
assert.Nil(t, err)
assert.Equal(t, map[string]interface{}{
"id": "85925e55b43f4342",
Expand Down
16 changes: 0 additions & 16 deletions decoder/req_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package decoder
import (
"compress/gzip"
"compress/zlib"
"encoding/json"
"io"
"net/http"

Expand Down Expand Up @@ -85,18 +84,3 @@ func CompressedRequestReader(req *http.Request) (io.ReadCloser, error) {
readerCounter.Inc()
return reader, nil
}

func DecodeJSONData(reader io.Reader) (map[string]interface{}, error) {
v := make(map[string]interface{})
d := NewJSONDecoder(reader)
if err := d.Decode(&v); err != nil {
return nil, err
}
return v, nil
}

func NewJSONDecoder(r io.Reader) *json.Decoder {
d := json.NewDecoder(r)
d.UseNumber()
return d
}
31 changes: 22 additions & 9 deletions decoder/stream_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ package decoder
import (
"bufio"
"bytes"
"encoding/json"
"io"

jsoniter "github.com/json-iterator/go"
)

// NewNDJSONStreamDecoder returns a new NDJSONStreamDecoder which decodes
Expand All @@ -40,9 +41,10 @@ type NDJSONStreamDecoder struct {
lineReader *LineReader

isEOF bool
latestError error
latestLine []byte
latestLineReader bytes.Reader
decoder *json.Decoder
decoder *jsoniter.Decoder
}

// Reset sets sr's underlying io.Reader to r, and resets any reading/decoding state.
Expand All @@ -51,35 +53,46 @@ func (dec *NDJSONStreamDecoder) Reset(r io.Reader) {
dec.lineReader.Reset(dec.bufioReader)
dec.isEOF = false
dec.latestLine = nil
dec.latestLineReader.Reset(nil)
dec.resetLatestLineReader()
}

func (dec *NDJSONStreamDecoder) resetDecoder() {
dec.decoder = NewJSONDecoder(&dec.latestLineReader)
dec.decoder = json.NewDecoder(&dec.latestLineReader)
dec.decoder.UseNumber()
}

// Decode decodes the next line into v.
func (dec *NDJSONStreamDecoder) Decode(v interface{}) error {
buf, readErr := dec.readLine()
if len(buf) == 0 || (readErr != nil && !dec.isEOF) {
return readErr
defer dec.resetLatestLineReader()
if dec.latestLineReader.Size() == 0 {
dec.ReadAhead()
}
if len(dec.latestLine) == 0 || (dec.latestError != nil && !dec.isEOF) {
return dec.latestError
}
if err := dec.decoder.Decode(v); err != nil {
dec.resetDecoder() // clear out decoding state
return JSONDecodeError("data read error: " + err.Error())
}
return readErr // this might be io.EOF
return dec.latestError // this might be io.EOF
}

func (dec *NDJSONStreamDecoder) readLine() ([]byte, error) {
// ReadAhead reads the next NDJSON line, buffering it for a subsequent call to Decode.
func (dec *NDJSONStreamDecoder) ReadAhead() ([]byte, error) {
// readLine can return valid data in `buf` _and_ also an io.EOF
line, readErr := dec.lineReader.ReadLine()
dec.latestLine = line
dec.latestLineReader.Reset(dec.latestLine)
dec.latestError = readErr
dec.isEOF = readErr == io.EOF
return line, readErr
}

func (dec *NDJSONStreamDecoder) resetLatestLineReader() {
dec.latestLineReader.Reset(nil)
dec.latestError = nil
}

// IsEOF signals whether the underlying reader reached the end
func (dec *NDJSONStreamDecoder) IsEOF() bool { return dec.isEOF }

Expand Down
Loading

0 comments on commit 2c36ad7

Please sign in to comment.