Skip to content

Commit

Permalink
model decode: Integrate new v2 decoder with backend stream processor
Browse files Browse the repository at this point in the history
* change stream processor logic

* adapt package tests

* adapt decoder to be able to read ahaed for idenitying event type
  • Loading branch information
simitt committed Oct 2, 2020
1 parent 12d7904 commit 4e94a87
Show file tree
Hide file tree
Showing 48 changed files with 455 additions and 413 deletions.
7 changes: 6 additions & 1 deletion beater/api/intake/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,12 @@ func TestIntakeHandler(t *testing.T) {
code: http.StatusAccepted, id: request.IDResponseValidAccepted,
},
"TooLarge": {
path: "errors.ndjson", processor: &stream.Processor{},
path: "errors.ndjson",
processor: func() *stream.Processor {
p := stream.BackendProcessor(config.DefaultConfig())
p.MaxEventSize = 10
return p
}(),
code: http.StatusBadRequest, id: request.IDResponseErrorsRequestTooLarge},
"Closing": {
path: "errors.ndjson", reporter: beatertest.ErrorReporterFn(publish.ErrChannelClosed),
Expand Down
2 changes: 1 addition & 1 deletion beater/api/intake/test_approved/BodyReader.approved.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"accepted": 0,
"errors": [
{
"message": "EOF while reading metadata"
"message": "validation error 'metadata' required"
}
]
}
2 changes: 1 addition & 1 deletion beater/api/intake/test_approved/InvalidEvent.approved.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"errors": [
{
"document": "{ \"transaction\": { \"id\": 12345, \"trace_id\": \"0123456789abcdef0123456789abcdef\", \"parent_id\": \"abcdefabcdef01234567\", \"type\": \"request\", \"duration\": 32.592981, \"span_count\": { \"started\": 21 } } } ",
"message": "failed to validate transaction: error validating JSON: I[#] S[#] doesn't validate with \"transaction#\"\n I[#] S[#/allOf/3] allOf failed\n I[#/id] S[#/allOf/3/properties/id/type] expected string, but got number"
"message": "decode error data read error: v2.transactionRoot.Transaction: v2.transaction.ID: ReadString: expects \" or n, but found 1, error found in #10 byte of ...| { \"id\": 12345, \"tra|..., bigger context ...|{ \"transaction\": { \"id\": 12345, \"trace_id\": \"0123456789abcdef0123456789abcde|..."
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"errors": [
{
"document": "{ \"invalid-json\" }",
"message": "data read error: invalid character '}' after object key"
"message": "did not recognize object type"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"errors": [
{
"document": "{\"metadata\": {\"invalid-json\"}}",
"message": "data read error: invalid character '}' after object key"
"message": "decode error data read error: v2.metadataRoot.Metadata: v2.metadata.readFieldHash: expect :, but found }, error found in #10 byte of ...|lid-json\"}}|..., bigger context ...|{\"metadata\": {\"invalid-json\"}}|..."
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"errors": [
{
"document": "{\"metadata\": {\"user\": null}}",
"message": "failed to validate metadata: error validating JSON: I[#] S[#] doesn't validate with \"metadata#\"\n I[#] S[#/required] missing properties: \"service\""
"message": "validation error 'metadata' required"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"errors": [
{
"document": "{\"not\": \"metadata\"}",
"message": "did not recognize object type"
"message": "validation error 'metadata' required"
}
]
}
1 change: 1 addition & 0 deletions beater/api/intake/test_approved/TooLarge.approved.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"accepted": 0,
"errors": [
{
"document": "{\"metadata",
"message": "event exceeded the permitted size."
}
]
Expand Down
20 changes: 7 additions & 13 deletions beater/api/profile/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
"github.com/elastic/apm-server/beater/request"
"github.com/elastic/apm-server/decoder"
"github.com/elastic/apm-server/model"
"github.com/elastic/apm-server/model/modeldecoder"
v2 "github.com/elastic/apm-server/model/modeldecoder/v2"
"github.com/elastic/apm-server/publish"
"github.com/elastic/apm-server/transform"
"github.com/elastic/apm-server/validation"
Expand Down Expand Up @@ -104,24 +104,18 @@ func Handler(report publish.Reporter) request.Handler {
}
}
r := &decoder.LimitedReader{R: part, N: metadataContentLengthLimit}
raw, err := decoder.DecodeJSONData(r)
if err != nil {
dec := decoder.NewJSONDecoder(r)
metadata := model.Metadata{
UserAgent: model.UserAgent{Original: c.RequestMetadata.UserAgent},
Client: model.Client{IP: c.RequestMetadata.ClientIP},
System: model.System{IP: c.RequestMetadata.SystemIP}}
if err := v2.DecodeMetadata(dec, &metadata); err != nil {
if r.N < 0 {
return nil, requestError{
id: request.IDResponseErrorsRequestTooLarge,
err: err,
}
}
return nil, requestError{
id: request.IDResponseErrorsDecode,
err: errors.Wrap(err, "failed to decode metadata JSON"),
}
}
metadata := model.Metadata{
UserAgent: model.UserAgent{Original: c.RequestMetadata.UserAgent},
Client: model.Client{IP: c.RequestMetadata.ClientIP},
System: model.System{IP: c.RequestMetadata.SystemIP}}
if err := modeldecoder.DecodeMetadata(raw, false, &metadata); err != nil {
var ve *validation.Error
if errors.As(err, &ve) {
return nil, requestError{
Expand Down
8 changes: 4 additions & 4 deletions beater/api/profile/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func TestHandler(t *testing.T) {
}},
},
"MetadataInvalid": {
id: request.IDResponseErrorsValidate,
id: request.IDResponseErrorsDecode,
parts: []part{{
name: "metadata",
contentType: "application/json",
Expand All @@ -128,16 +128,16 @@ func TestHandler(t *testing.T) {
id: request.IDResponseValidAccepted,
parts: []part{
heapProfilePart(),
part{
{
name: "profile",
// No messageType param specified, so pprof is assumed.
contentType: "application/x-protobuf",
body: heapProfileBody(),
},
part{
{
name: "metadata",
contentType: "application/json",
body: strings.NewReader(`{"service":{"name":"foo","agent":{"name":"go","version":"1.0"}}}`),
body: strings.NewReader(`{"service":{"name":"foo","agent":{"name":"java","version":"1.2.0"}}}`),
},
},
body: prettyJSON(map[string]interface{}{"accepted": 2}),
Expand Down
1 change: 1 addition & 0 deletions beater/request/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func (c *Context) acceptJSON() bool {
return false
}

//TODO(simitt): use json-iter instead of stdlib json
func (c *Context) writeJSON(body interface{}, pretty bool) error {
enc := json.NewEncoder(c.w)
if pretty {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@
"Mozilla Chrome Edge"
]
},
"method": "post",
"method": "POST",
"referrer": "http://localhost:8000/test/e2e/",
"socket": {
"encrypted": true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
"MozillaChromeEdge"
]
},
"method": "post",
"method": "POST",
"socket": {
"encrypted": true,
"remote_address": "12.53.12.1:8080"
Expand Down Expand Up @@ -294,7 +294,7 @@
"us": 3781
},
"http": {
"method": "get",
"method": "GET",
"response": {
"decoded_body_size": 401,
"encoded_body_size": 356,
Expand Down Expand Up @@ -666,7 +666,7 @@
"opbeans-java:3000"
]
},
"method": "post",
"method": "POST",
"socket": {
"encrypted": true,
"remote_address": "12.53.12.1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,7 @@
"us": 3781
},
"http": {
"method": "get",
"method": "GET",
"response": {
"decoded_body_size": 401,
"encoded_body_size": 356,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@
"Mozilla Chrome Edge"
]
},
"method": "post",
"method": "POST",
"referrer": "http://localhost:8000/test/e2e/",
"socket": {
"encrypted": true,
Expand Down Expand Up @@ -395,7 +395,7 @@
},
"http": {
"request": {
"method": "post",
"method": "POST",
"socket": {
"remote_address": "192.0.1"
}
Expand Down
23 changes: 15 additions & 8 deletions decoder/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,34 @@ package decoder

import (
"io"
"io/ioutil"

jsoniter "github.com/json-iterator/go"
)

//TODO(simitt): look into config options for performance tuning
var jsonit = jsoniter.ConfigCompatibleWithStandardLibrary
var json = jsoniter.ConfigCompatibleWithStandardLibrary

// var json = jsoniter.ConfigFastest

type Decoder interface {
Decode(v interface{}) error
Read() ([]byte, error)
}

// JSONIterDecoder can decode from a given reader, using jsoniter
// TODO(simitt): rename to JSONDecoder when everything is integrated
type JSONIterDecoder struct {
type JSONDecoder struct {
*jsoniter.Decoder
reader io.Reader
}

// NewJSONIteratorDecoder returns a *json.Decoder where numbers are unmarshaled
// NewJSONDecoder returns a *json.Decoder where numbers are unmarshaled
// as a Number instead of a float64 into an interface{}
func NewJSONIteratorDecoder(r io.Reader) JSONIterDecoder {
d := jsonit.NewDecoder(r)
func NewJSONDecoder(r io.Reader) JSONDecoder {
d := json.NewDecoder(r)
d.UseNumber()
return JSONIterDecoder{Decoder: d}
return JSONDecoder{Decoder: d, reader: r}
}

func (d JSONDecoder) Read() ([]byte, error) {
return ioutil.ReadAll(d.reader)
}
14 changes: 13 additions & 1 deletion decoder/req_decoder_test.go → decoder/decoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,30 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/apm-server/decoder"
)

func TestDecodeJSONData(t *testing.T) {
decoded, err := decoder.DecodeJSONData(strings.NewReader(
d := decoder.NewJSONDecoder(strings.NewReader(
`{"id":"85925e55b43f4342","system": {"hostname":"prod1.example.com"},"number":123}`,
))
var decoded map[string]interface{}
err := d.Decode(&decoded)
assert.Nil(t, err)
assert.Equal(t, map[string]interface{}{
"id": "85925e55b43f4342",
"system": map[string]interface{}{"hostname": "prod1.example.com"},
"number": json.Number("123"),
}, decoded)
}

func TestReadJSONData(t *testing.T) {
str := `{"id":"85925e55b43f4342","system": {"hostname":"prod1.example.com"},"number":123}`
d := decoder.NewJSONDecoder(strings.NewReader(str))
b, err := d.Read()
require.Nil(t, err)
require.NotEmpty(t, b)
require.Equal(t, str, string(b))
}
16 changes: 0 additions & 16 deletions decoder/req_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package decoder
import (
"compress/gzip"
"compress/zlib"
"encoding/json"
"io"
"net/http"

Expand Down Expand Up @@ -85,18 +84,3 @@ func CompressedRequestReader(req *http.Request) (io.ReadCloser, error) {
readerCounter.Inc()
return reader, nil
}

func DecodeJSONData(reader io.Reader) (map[string]interface{}, error) {
v := make(map[string]interface{})
d := NewJSONDecoder(reader)
if err := d.Decode(&v); err != nil {
return nil, err
}
return v, nil
}

func NewJSONDecoder(r io.Reader) *json.Decoder {
d := json.NewDecoder(r)
d.UseNumber()
return d
}
Loading

0 comments on commit 4e94a87

Please sign in to comment.