Skip to content

Commit

Permalink
Move source mapping to model processing
Browse files Browse the repository at this point in the history
Source mapping transformation is rewritten as
a model.BatchProcessor, moved out of the model
package and into the sourcemap package.

One side-effect of the move to a process, worth
debating, is that source mapping now occurs in
the HTTP handler goroutine rather than in the
publisher, which could increase request latency
slightly. On the other hand this means that source
mapping for more clients than there are CPUs can
now happen concurrently (the publisher is limited
to the number of CPUs in this regard); and the
handlers will block for up to one second anyway
if the publisher is busy/queue is full.

If a stack frame cannot be mapped, we no longer
set `sourcemap.error` or log anything. Just because
RUM and source mapping is enabled, does not mean
that all stacks _must_ be source mapped; therefore
these "errors" are mostly just noise. Likewise we
now only set `sourcemap.updated` when it is true.
  • Loading branch information
axw committed Jul 7, 2021
1 parent 0661ad0 commit 7b67f55
Show file tree
Hide file tree
Showing 27 changed files with 918 additions and 1,226 deletions.
11 changes: 10 additions & 1 deletion beater/api/asset/sourcemap/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package sourcemap

import (
"context"
"errors"
"fmt"
"io/ioutil"
Expand Down Expand Up @@ -46,8 +47,14 @@ var (
validateError = monitoring.NewInt(registry, "validation.errors")
)

// AddedNotifier is an interface for notifying of sourcemap additions.
// This is implemented by sourcemap.Store.
type AddedNotifier interface {
NotifyAdded(ctx context.Context, serviceName, serviceVersion, bundleFilepath string)
}

// Handler returns a request.Handler for managing asset requests.
func Handler(report publish.Reporter) request.Handler {
func Handler(report publish.Reporter, notifier AddedNotifier) request.Handler {
return func(c *request.Context) {
if c.Request.Method != "POST" {
c.Result.SetDefault(request.IDResponseErrorsMethodNotAllowed)
Expand Down Expand Up @@ -99,7 +106,9 @@ func Handler(report publish.Reporter) request.Handler {
c.Result.SetWithError(request.IDResponseErrorsFullQueue, err)
}
c.Write()
return
}
notifier.NotifyAdded(c.Request.Context(), smap.ServiceName, smap.ServiceVersion, smap.BundleFilepath)
c.Result.SetDefault(request.IDResponseValidAccepted)
c.Write()
}
Expand Down
12 changes: 11 additions & 1 deletion beater/api/asset/sourcemap/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ import (
"github.com/elastic/apm-server/transform"
)

type notifier struct {
notified bool
}

func (n *notifier) NotifyAdded(ctx context.Context, serviceName, serviceVersion, bundleFilepath string) {
n.notified = true
}

func TestAssetHandler(t *testing.T) {
testcases := map[string]testcaseT{
"method": {
Expand Down Expand Up @@ -111,6 +119,7 @@ func TestAssetHandler(t *testing.T) {
// test assertion
assert.Equal(t, tc.code, tc.w.Code)
assert.Equal(t, tc.body, tc.w.Body.String())
assert.Equal(t, tc.code == http.StatusAccepted, tc.notifier.notified)
})
}
}
Expand All @@ -122,6 +131,7 @@ type testcaseT struct {
contentType string
reporter func(ctx context.Context, p publish.PendingReq) error
authorizer authorizerFunc
notifier notifier

missingSourcemap, missingServiceName, missingServiceVersion, missingBundleFilepath bool

Expand Down Expand Up @@ -178,7 +188,7 @@ func (tc *testcaseT) setup() error {
}
c := request.NewContext()
c.Reset(tc.w, tc.r)
h := Handler(tc.reporter)
h := Handler(tc.reporter, &tc.notifier)
h(c)
return nil
}
Expand Down
16 changes: 13 additions & 3 deletions beater/api/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/elastic/beats/v7/libbeat/monitoring"

"github.com/elastic/apm-server/agentcfg"
"github.com/elastic/apm-server/beater/api/asset/sourcemap"
apisourcemap "github.com/elastic/apm-server/beater/api/asset/sourcemap"
"github.com/elastic/apm-server/beater/api/config/agent"
"github.com/elastic/apm-server/beater/api/intake"
"github.com/elastic/apm-server/beater/api/profile"
Expand All @@ -42,6 +42,7 @@ import (
"github.com/elastic/apm-server/model/modelprocessor"
"github.com/elastic/apm-server/processor/stream"
"github.com/elastic/apm-server/publish"
"github.com/elastic/apm-server/sourcemap"
)

const (
Expand Down Expand Up @@ -84,6 +85,7 @@ func NewMux(
batchProcessor model.BatchProcessor,
fetcher agentcfg.Fetcher,
ratelimitStore *ratelimit.Store,
sourcemapStore *sourcemap.Store,
) (*http.ServeMux, error) {
pool := request.NewContextPool()
mux := http.NewServeMux()
Expand All @@ -101,6 +103,7 @@ func NewMux(
reporter: report,
batchProcessor: batchProcessor,
ratelimitStore: ratelimitStore,
sourcemapStore: sourcemapStore,
}

type route struct {
Expand Down Expand Up @@ -151,6 +154,7 @@ type routeBuilder struct {
reporter publish.Reporter
batchProcessor model.BatchProcessor
ratelimitStore *ratelimit.Store
sourcemapStore *sourcemap.Store
}

func (r *routeBuilder) profileHandler() (request.Handler, error) {
Expand Down Expand Up @@ -178,14 +182,20 @@ func (r *routeBuilder) rumIntakeHandler(newProcessor func(*config.Config) *strea
}
return func() (request.Handler, error) {
batchProcessor := r.batchProcessor
if r.sourcemapStore != nil {
batchProcessor = modelprocessor.Chained{
sourcemap.BatchProcessor{Store: r.sourcemapStore},
batchProcessor,
}
}
batchProcessor = batchProcessorWithAllowedServiceNames(batchProcessor, r.cfg.RumConfig.AllowServiceNames)
h := intake.Handler(newProcessor(r.cfg), requestMetadataFunc, batchProcessor)
return middleware.Wrap(h, rumMiddleware(r.cfg, r.authenticator, r.ratelimitStore, intake.MonitoringMap)...)
}
}

func (r *routeBuilder) sourcemapHandler() (request.Handler, error) {
h := sourcemap.Handler(r.reporter)
h := apisourcemap.Handler(r.reporter, r.sourcemapStore)
return middleware.Wrap(h, sourcemapMiddleware(r.cfg, r.authenticator, r.ratelimitStore)...)
}

Expand Down Expand Up @@ -270,7 +280,7 @@ func sourcemapMiddleware(cfg *config.Config, auth *auth.Authenticator, ratelimit
msg = "When APM Server is managed by Fleet, Sourcemaps must be uploaded directly to Elasticsearch."
}
enabled := cfg.RumConfig.Enabled && cfg.RumConfig.SourceMapping.Enabled && !cfg.DataStreams.Enabled
backendMiddleware := backendMiddleware(cfg, auth, ratelimitStore, sourcemap.MonitoringMap)
backendMiddleware := backendMiddleware(cfg, auth, ratelimitStore, apisourcemap.MonitoringMap)
return append(backendMiddleware, middleware.KillSwitchMiddleware(enabled, msg))
}

Expand Down
13 changes: 11 additions & 2 deletions beater/api/mux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/elastic/apm-server/beater/request"
"github.com/elastic/apm-server/model"
"github.com/elastic/apm-server/publish"
"github.com/elastic/apm-server/sourcemap"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/monitoring"
)
Expand Down Expand Up @@ -78,7 +79,11 @@ func requestToMuxer(cfg *config.Config, r *http.Request) (*httptest.ResponseReco
nopReporter := func(context.Context, publish.PendingReq) error { return nil }
nopBatchProcessor := model.ProcessBatchFunc(func(context.Context, *model.Batch) error { return nil })
ratelimitStore, _ := ratelimit.NewStore(1000, 1000, 1000)
mux, err := NewMux(beat.Info{Version: "1.2.3"}, cfg, nopReporter, nopBatchProcessor, agentcfg.NewFetcher(cfg), ratelimitStore)
var sourcemapStore *sourcemap.Store
mux, err := NewMux(
beat.Info{Version: "1.2.3"}, cfg,
nopReporter, nopBatchProcessor, agentcfg.NewFetcher(cfg), ratelimitStore, sourcemapStore,
)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -114,7 +119,11 @@ func newTestMux(t *testing.T, cfg *config.Config) http.Handler {
nopReporter := func(context.Context, publish.PendingReq) error { return nil }
nopBatchProcessor := model.ProcessBatchFunc(func(context.Context, *model.Batch) error { return nil })
ratelimitStore, _ := ratelimit.NewStore(1000, 1000, 1000)
mux, err := NewMux(beat.Info{Version: "1.2.3"}, cfg, nopReporter, nopBatchProcessor, agentcfg.NewFetcher(cfg), ratelimitStore)
var sourcemapStore *sourcemap.Store
mux, err := NewMux(
beat.Info{Version: "1.2.3"}, cfg,
nopReporter, nopBatchProcessor, agentcfg.NewFetcher(cfg), ratelimitStore, sourcemapStore,
)
require.NoError(t, err)
return mux
}
29 changes: 13 additions & 16 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,10 +364,7 @@ func (s *serverRunner) Start() {
func (s *serverRunner) run() error {
// Send config to telemetry.
recordAPMServerConfig(s.config)
transformConfig, err := newTransformConfig(s.beat.Info, s.config, s.fleetConfig)
if err != nil {
return err
}
transformConfig := newTransformConfig(s.beat.Info, s.config, s.fleetConfig)
publisherConfig := &publish.PublisherConfig{
Info: s.beat.Info,
Pipeline: s.config.Pipeline,
Expand All @@ -387,6 +384,15 @@ func (s *serverRunner) run() error {
}()
}

var sourcemapStore *sourcemap.Store
if s.config.RumConfig.Enabled && s.config.RumConfig.SourceMapping.Enabled {
store, err := newSourcemapStore(s.beat.Info, s.config.RumConfig.SourceMapping, s.fleetConfig)
if err != nil {
return err
}
sourcemapStore = store
}

// When the publisher stops cleanly it will close its pipeline client,
// calling the acker's Close method. We need to call Open for each new
// publisher to ensure we wait for all clients and enqueued events to
Expand Down Expand Up @@ -432,6 +438,7 @@ func (s *serverRunner) run() error {
Logger: s.logger,
Tracer: s.tracer,
BatchProcessor: batchProcessor,
SourcemapStore: sourcemapStore,
}); err != nil {
return err
}
Expand Down Expand Up @@ -618,24 +625,14 @@ func runServerWithTracerServer(runServer RunServerFunc, tracerServer *tracerServ
}
}

func newTransformConfig(beatInfo beat.Info, cfg *config.Config, fleetCfg *config.Fleet) (*transform.Config, error) {
transformConfig := &transform.Config{
func newTransformConfig(beatInfo beat.Info, cfg *config.Config, fleetCfg *config.Fleet) *transform.Config {
return &transform.Config{
DataStreams: cfg.DataStreams.Enabled,
RUM: transform.RUMConfig{
LibraryPattern: regexp.MustCompile(cfg.RumConfig.LibraryPattern),
ExcludeFromGrouping: regexp.MustCompile(cfg.RumConfig.ExcludeFromGrouping),
},
}

if cfg.RumConfig.Enabled && cfg.RumConfig.SourceMapping.Enabled {
store, err := newSourcemapStore(beatInfo, cfg.RumConfig.SourceMapping, fleetCfg)
if err != nil {
return nil, err
}
transformConfig.RUM.SourcemapStore = store
}

return transformConfig, nil
}

func newSourcemapStore(beatInfo beat.Info, cfg config.SourceMapping, fleetCfg *config.Fleet) (*sourcemap.Store, error) {
Expand Down
35 changes: 6 additions & 29 deletions beater/beater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,10 +252,9 @@ func TestTransformConfigIndex(t *testing.T) {
cfg.RumConfig.SourceMapping.IndexPattern = indexPattern
}

transformConfig, err := newTransformConfig(beat.Info{Version: "1.2.3"}, cfg, nil)
store, err := newSourcemapStore(beat.Info{Version: "1.2.3"}, cfg.RumConfig.SourceMapping, nil)
require.NoError(t, err)
require.NotNil(t, transformConfig.RUM.SourcemapStore)
transformConfig.RUM.SourcemapStore.Added(context.Background(), "name", "version", "path")
store.NotifyAdded(context.Background(), "name", "version", "path")
require.Len(t, requestPaths, 1)

path := requestPaths[0]
Expand All @@ -267,26 +266,6 @@ func TestTransformConfigIndex(t *testing.T) {
t.Run("with-observer-version", func(t *testing.T) { test(t, "blah-%{[observer.version]}-blah", "blah-1.2.3-blah") })
}

func TestTransformConfig(t *testing.T) {
test := func(rumEnabled, sourcemapEnabled bool, expectSourcemapStore bool) {
cfg := config.DefaultConfig()
cfg.RumConfig.Enabled = rumEnabled
cfg.RumConfig.SourceMapping.Enabled = sourcemapEnabled
transformConfig, err := newTransformConfig(beat.Info{Version: "1.2.3"}, cfg, nil)
require.NoError(t, err)
if expectSourcemapStore {
assert.NotNil(t, transformConfig.RUM.SourcemapStore)
} else {
assert.Nil(t, transformConfig.RUM.SourcemapStore)
}
}

test(false, false, false)
test(false, true, false)
test(true, false, false)
test(true, true, true)
}

func TestStoreUsesRUMElasticsearchConfig(t *testing.T) {
var called bool
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -301,11 +280,11 @@ func TestStoreUsesRUMElasticsearchConfig(t *testing.T) {
cfg.RumConfig.SourceMapping.ESConfig = elasticsearch.DefaultConfig()
cfg.RumConfig.SourceMapping.ESConfig.Hosts = []string{ts.URL}

transformConfig, err := newTransformConfig(beat.Info{Version: "1.2.3"}, cfg, nil)
store, err := newSourcemapStore(beat.Info{Version: "1.2.3"}, cfg.RumConfig.SourceMapping, nil)
require.NoError(t, err)
// Check that the provided rum elasticsearch config was used and
// Fetch() goes to the test server.
_, err = transformConfig.RUM.SourcemapStore.Fetch(context.Background(), "app", "1.0", "/bundle/path")
_, err = store.Fetch(context.Background(), "app", "1.0", "/bundle/path")
require.NoError(t, err)

assert.True(t, called)
Expand Down Expand Up @@ -338,11 +317,9 @@ func TestFleetStoreUsed(t *testing.T) {
TLS: nil,
}

transformConfig, err := newTransformConfig(beat.Info{Version: "1.2.3"}, cfg, fleetCfg)
store, err := newSourcemapStore(beat.Info{Version: "1.2.3"}, cfg.RumConfig.SourceMapping, fleetCfg)
require.NoError(t, err)
// Check that the provided rum elasticsearch config was used and
// Fetch() goes to the test server.
_, err = transformConfig.RUM.SourcemapStore.Fetch(context.Background(), "app", "1.0", "/bundle/path")
_, err = store.Fetch(context.Background(), "app", "1.0", "/bundle/path")
require.NoError(t, err)

assert.True(t, called)
Expand Down
4 changes: 3 additions & 1 deletion beater/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/elastic/apm-server/model"
"github.com/elastic/apm-server/model/modelprocessor"
"github.com/elastic/apm-server/publish"
"github.com/elastic/apm-server/sourcemap"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"
"github.com/elastic/beats/v7/libbeat/logp"
Expand All @@ -57,6 +58,7 @@ func newHTTPServer(
batchProcessor model.BatchProcessor,
agentcfgFetcher agentcfg.Fetcher,
ratelimitStore *ratelimit.Store,
sourcemapStore *sourcemap.Store,
) (*httpServer, error) {

// Add a model processor that rate limits, and checks authorization for the agent and service for each event.
Expand All @@ -66,7 +68,7 @@ func newHTTPServer(
batchProcessor,
}

mux, err := api.NewMux(info, cfg, reporter, batchProcessor, agentcfgFetcher, ratelimitStore)
mux, err := api.NewMux(info, cfg, reporter, batchProcessor, agentcfgFetcher, ratelimitStore, sourcemapStore)
if err != nil {
return nil, err
}
Expand Down
20 changes: 18 additions & 2 deletions beater/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/elastic/apm-server/model"
"github.com/elastic/apm-server/model/modelprocessor"
"github.com/elastic/apm-server/publish"
"github.com/elastic/apm-server/sourcemap"
)

// RunServerFunc is a function which runs the APM Server until a
Expand Down Expand Up @@ -69,6 +70,10 @@ type ServerParams struct {
// for self-instrumentation.
Tracer *apm.Tracer

// SourcemapStore holds a sourcemap.Store, or nil if source
// mapping is disabled.
SourcemapStore *sourcemap.Store

// BatchProcessor is the model.BatchProcessor that is used
// for publishing events to the output, such as Elasticsearch.
BatchProcessor model.BatchProcessor
Expand All @@ -84,7 +89,15 @@ type ServerParams struct {
// should remove the reporter parameter.
func newBaseRunServer(reporter publish.Reporter) RunServerFunc {
return func(ctx context.Context, args ServerParams) error {
srv, err := newServer(args.Logger, args.Info, args.Config, args.Tracer, reporter, args.BatchProcessor)
srv, err := newServer(
args.Logger,
args.Info,
args.Config,
args.Tracer,
reporter,
args.SourcemapStore,
args.BatchProcessor,
)
if err != nil {
return err
}
Expand Down Expand Up @@ -118,6 +131,7 @@ func newServer(
cfg *config.Config,
tracer *apm.Tracer,
reporter publish.Reporter,
sourcemapStore *sourcemap.Store,
batchProcessor model.BatchProcessor,
) (server, error) {
agentcfgFetchReporter := agentcfg.NewReporter(agentcfg.NewFetcher(cfg), batchProcessor, 30*time.Second)
Expand All @@ -129,7 +143,9 @@ func newServer(
if err != nil {
return server{}, err
}
httpServer, err := newHTTPServer(logger, info, cfg, tracer, reporter, batchProcessor, agentcfgFetchReporter, ratelimitStore)
httpServer, err := newHTTPServer(
logger, info, cfg, tracer, reporter, batchProcessor, agentcfgFetchReporter, ratelimitStore, sourcemapStore,
)
if err != nil {
return server{}, err
}
Expand Down
1 change: 1 addition & 0 deletions beater/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func newTracerServer(listener net.Listener, logger *logp.Logger) (*tracerServer,
processBatch,
agentcfg.NewFetcher(cfg),
ratelimitStore,
nil, // no sourcemap store
)
if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit 7b67f55

Please sign in to comment.