Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[elastic-agent] apm instrumentation fix #30377

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,180 changes: 1,102 additions & 78 deletions NOTICE.txt

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,10 @@ require (
github.com/vmware/govmomi v0.0.0-20170802214208-2cad15190b41
github.com/xdg/scram v1.0.3
github.com/yuin/gopher-lua v0.0.0-20170403160031-b402f3114ec7 // indirect
go.elastic.co/apm v1.11.0
go.elastic.co/apm v1.15.0
go.elastic.co/apm/module/apmelasticsearch v1.7.2
go.elastic.co/apm/module/apmhttp v1.7.2
go.elastic.co/apm/module/apmgorilla v1.15.0
go.elastic.co/apm/module/apmhttp v1.15.0
go.elastic.co/ecszap v0.3.0
go.elastic.co/go-licence-detector v0.4.0
go.etcd.io/bbolt v1.3.6
Expand Down Expand Up @@ -240,6 +241,7 @@ require (
github.com/hashicorp/go-version v1.2.0 // indirect
github.com/imdario/mergo v0.3.12 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/jcchavezs/porto v0.1.0 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.0.0 // indirect
Expand Down
13 changes: 9 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,6 @@ github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7Do
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/creack/pty v1.1.11 h1:07n33Z8lZxZ2qwegKbObQohDhXDQxiMMz1NOUGYlesw=
github.com/creack/pty v1.1.11/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/cucumber/godog v0.8.1 h1:lVb+X41I4YDreE+ibZ50bdXmySxgRviYFgKY6Aw4XE8=
github.com/cucumber/godog v0.8.1/go.mod h1:vSh3r/lM+psC1BPXvdkSEuNjmXfpVqrMGYAElF6hxnA=
github.com/cyphar/filepath-securejoin v0.2.2/go.mod h1:FpkQEhXnPnOthhzymB7CGsFk2G9VLXONKD9G7QGMM+4=
github.com/cyphar/filepath-securejoin v0.2.3 h1:YX6ebbZCZP7VkM3scTTokDgBL2TY741X51MTk3ycuNI=
Expand Down Expand Up @@ -521,6 +520,7 @@ github.com/elastic/go-concert v0.2.0 h1:GAQrhRVXprnNjtvTP9pWJ1d4ToEA4cU5ci7TwTa2
github.com/elastic/go-concert v0.2.0/go.mod h1:HWjpO3IAEJUxOeaJOWXWEp7imKd27foxz9V5vegC/38=
github.com/elastic/go-libaudit/v2 v2.2.0 h1:TY3FDpG4Zr9Qnv6KYW6olYr/U+nfu0rD2QAbv75VxMQ=
github.com/elastic/go-libaudit/v2 v2.2.0/go.mod h1:MM/l/4xV7ilcl+cIblL8Zn448J7RZaDwgNLE4gNKYPg=
github.com/elastic/go-licenser v0.3.1/go.mod h1:D8eNQk70FOCVBl3smCGQt/lv7meBeQno2eI1S5apiHQ=
github.com/elastic/go-licenser v0.4.0 h1:jLq6A5SilDS/Iz1ABRkO6BHy91B9jBora8FwGRsDqUI=
github.com/elastic/go-licenser v0.4.0/go.mod h1:V56wHMpmdURfibNBggaSBfqgPxyT1Tldns1i87iTEvU=
github.com/elastic/go-lookslike v0.3.0 h1:HDI/DQ65V85ZqM7D/sbxcK2wFFnh3+7iFvBk2v2FTHs=
Expand Down Expand Up @@ -1019,6 +1019,8 @@ github.com/influxdata/usage-client v0.0.0-20160829180054-6d3895376368/go.mod h1:
github.com/j-keck/arping v0.0.0-20160618110441-2cf9dc699c56/go.mod h1:ymszkNOg6tORTn+6F6j+Jc8TOr5osrynvN6ivFWZ2GA=
github.com/jarcoal/httpmock v1.0.4 h1:jp+dy/+nonJE4g4xbVtl9QdrUNbn6/3hDT5R4nDIZnA=
github.com/jarcoal/httpmock v1.0.4/go.mod h1:ATjnClrvW/3tijVmpL/va5Z3aAyGvqU3gCT8nX0Txik=
github.com/jcchavezs/porto v0.1.0 h1:Xmxxn25zQMmgE7/yHYmh19KcItG81hIwfbEEFnd6w/Q=
github.com/jcchavezs/porto v0.1.0/go.mod h1:fESH0gzDHiutHRdX2hv27ojnOVFco37hg1W6E9EZF4A=
github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8=
github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs=
github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo=
Expand Down Expand Up @@ -1591,12 +1593,15 @@ github.com/yvasiyarov/go-metrics v0.0.0-20140926110328-57bccd1ccd43/go.mod h1:aX
github.com/yvasiyarov/gorelic v0.0.0-20141212073537-a9bba5b9ab50/go.mod h1:NUSPSUX/bi6SeDMUh6brw0nXpxHnc96TguQh0+r/ssA=
github.com/yvasiyarov/newrelic_platform_go v0.0.0-20140908184405-b21fdbd4370f/go.mod h1:GlGEuHIJweS1mbCqG+7vt2nvWLzLLnRHbXz5JKd/Qbg=
go.elastic.co/apm v1.7.2/go.mod h1:tCw6CkOJgkWnzEthFN9HUP1uL3Gjc/Ur6m7gRPLaoH0=
go.elastic.co/apm v1.11.0 h1:uJyt6nCW9880sZhfl1tB//Jy/5TadNoAd8edRUtgb3w=
go.elastic.co/apm v1.11.0/go.mod h1:qoOSi09pnzJDh5fKnfY7bPmQgl8yl2tULdOu03xhui0=
go.elastic.co/apm v1.15.0 h1:uPk2g/whK7c7XiZyz/YCUnAUBNPiyNeE3ARX3G6Gx7Q=
go.elastic.co/apm v1.15.0/go.mod h1:dylGv2HKR0tiCV+wliJz1KHtDyuD8SPe69oV7VyK6WY=
go.elastic.co/apm/module/apmelasticsearch v1.7.2 h1:5STGHLZLSeAzxordMc+dFVKiyVtMmxADOV+TgRaXXJg=
go.elastic.co/apm/module/apmelasticsearch v1.7.2/go.mod h1:ZyNFuyWdt42GBZkz0SogoLzDBrBGj4orxpiUuxYeYq8=
go.elastic.co/apm/module/apmhttp v1.7.2 h1:2mRh7SwBuEVLmJlX+hsMdcSg9xaielCLElaPn/+i34w=
go.elastic.co/apm/module/apmgorilla v1.15.0 h1:1yTAksffgaFXYEIwlLRiQnxLfy3p3RtpDw8HDupIJfY=
go.elastic.co/apm/module/apmgorilla v1.15.0/go.mod h1:+23mZudYvZ9VgxCQjseLo9EF5gkKEr0KSQBupw+rzP8=
go.elastic.co/apm/module/apmhttp v1.7.2/go.mod h1:sTFWiWejnhSdZv6+dMgxGec2Nxe/ZKfHfz/xtRM+cRY=
go.elastic.co/apm/module/apmhttp v1.15.0 h1:Le/DhI0Cqpr9wG/NIGOkbz7+rOMqJrfE4MRG6q/+leU=
go.elastic.co/apm/module/apmhttp v1.15.0/go.mod h1:NruY6Jq8ALLzWUVUQ7t4wIzn+onKoiP5woJJdTV7GMg=
go.elastic.co/ecszap v0.3.0 h1:Zo/Y4sJLqbWDlqCHI4F4Lzeg0Fs4+n5ldVis4h9xV8w=
go.elastic.co/ecszap v0.3.0/go.mod h1:HTUi+QRmr3EuZMqxPX+5fyOdMNfUu5iPebgfhgsTJYQ=
go.elastic.co/fastjson v1.0.0/go.mod h1:PmeUOMMtLHQr9ZS9J9owrAVg0FkaZDRZJEFTTGHtchs=
Expand Down
19 changes: 9 additions & 10 deletions metricbeat/metricbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,16 @@ setup.template.settings:
# This requires a Kibana endpoint configuration.
setup.kibana:

# Kibana Host
# Scheme and port can be left out and will be set to the default (http and 5601)
# In case you specify and additional path, the scheme is required: http://localhost:5601/path
# IPv6 addresses should always be defined as: https://[2001:db8::1]:5601
#host: "localhost:5601"
# Kibana Host
# Scheme and port can be left out and will be set to the default (http and 5601)
# In case you specify and additional path, the scheme is required: http://localhost:5601/path
# IPv6 addresses should always be defined as: https://[2001:db8::1]:5601
#host: "localhost:5601"

# Kibana Space ID
# ID of the Kibana Space into which the dashboards should be loaded. By default,
# the Default Space will be used.
#space.id:
# Kibana Space ID
# ID of the Kibana Space into which the dashboards should be loaded. By default,
# the Default Space will be used.
#space.id:

# =============================== Elastic Cloud ================================

Expand Down Expand Up @@ -186,4 +186,3 @@ processors:

# This allows to enable 6.7 migration aliases
#migration.6_to_7.enabled: true

8 changes: 7 additions & 1 deletion x-pack/elastic-agent/pkg/agent/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,13 @@ type upgraderControl interface {
}

// New creates a new Agent and bootstrap the required subsystem.
func New(log *logger.Logger, reexec reexecManager, statusCtrl status.Controller, uc upgraderControl, agentInfo *info.AgentInfo) (Application, error) {
func New(
log *logger.Logger,
reexec reexecManager,
statusCtrl status.Controller,
uc upgraderControl,
agentInfo *info.AgentInfo,
) (Application, error) {
// Load configuration from disk to understand in which mode of operation
// we must start the elastic-agent, the mode of operation cannot be changed without restarting the
// elastic-agent.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,20 +167,22 @@ func bootstrapEmitter(ctx context.Context, log *logger.Logger, agentInfo transpi
case c = <-ch:
}

err := emit(log, agentInfo, router, modifiers, c)
err := emit(ctx, log, agentInfo, router, modifiers, c)
if err != nil {
log.Error(err)
}
}
}()

return func(c *config.Config) error {
return func(ctx context.Context, c *config.Config) error {
// span, _ := apm.StartSpan(ctx, "emit", "app.internal")
// defer span.End()
ch <- c
return nil
}, nil
}

func emit(log *logger.Logger, agentInfo transpiler.AgentInfo, router pipeline.Router, modifiers *pipeline.ConfigModifiers, c *config.Config) error {
func emit(ctx context.Context, log *logger.Logger, agentInfo transpiler.AgentInfo, router pipeline.Router, modifiers *pipeline.ConfigModifiers, c *config.Config) error {
if err := info.InjectAgentConfig(c); err != nil {
return err
}
Expand Down Expand Up @@ -219,7 +221,7 @@ func emit(log *logger.Logger, agentInfo transpiler.AgentInfo, router pipeline.Ro
return errors.New("bootstrap configuration is incorrect causing fleet-server to not be started")
}

return router.Route(ast.HashStr(), map[pipeline.RoutingKey][]program.Program{
return router.Route(ctx, ast.HashStr(), map[pipeline.RoutingKey][]program.Program{
pipeline.DefaultRK: {
{
Spec: spec,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (f *fleetGateway) worker() {
}

var errMsg string
if err := f.dispatcher.Dispatch(f.acker, actions...); err != nil {
if err := f.dispatcher.Dispatch(context.Background(), f.acker, actions...); err != nil {
errMsg = fmt.Sprintf("failed to dispatch actions, error: %s", err)
f.log.Error(errMsg)
f.statusReporter.Update(state.Failed, errMsg, nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type testingDispatcher struct {
received chan struct{}
}

func (t *testingDispatcher) Dispatch(acker store.FleetAcker, actions ...fleetapi.Action) error {
func (t *testingDispatcher) Dispatch(_ context.Context, acker store.FleetAcker, actions ...fleetapi.Action) error {
t.Lock()
defer t.Unlock()
defer func() { t.received <- struct{}{} }()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func New(

// Start starts the gateway.
func (w *fleetServerWrapper) Start() error {
err := w.emitter(w.injectedCfg)
err := w.emitter(context.Background(), w.injectedCfg)
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion x-pack/elastic-agent/pkg/agent/application/managed_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ func newManaged(
if err != nil {
return nil, err
}
// Client has been instrumented with apm
acker, err := fleet.NewAcker(log, agentInfo, client)
if err != nil {
return nil, err
Expand Down Expand Up @@ -244,7 +245,7 @@ func newManaged(
// TODO(ph) We will need an improvement on fleet, if there is an error while dispatching a
// persisted action on disk we should be able to ask Fleet to get the latest configuration.
// But at the moment this is not possible because the policy change was acked.
if err := store.ReplayActions(log, actionDispatcher, actionAcker, actions...); err != nil {
if err := store.ReplayActions(ctx, log, actionDispatcher, actionAcker, actions...); err != nil {
log.Errorf("could not recover state, error %+v, skipping...", err)
}
stateRestored = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestManagedModeRouting(t *testing.T) {
actions, err := testActions()
require.NoError(t, err)

err = actionDispatcher.Dispatch(noopacker.NewAcker(), actions...)
err = actionDispatcher.Dispatch(context.Background(), noopacker.NewAcker(), actions...)
require.NoError(t, err)

// has 1 config request for fb, mb and monitoring?
Expand Down Expand Up @@ -101,7 +101,7 @@ func newMockStreamStore() *mockStreamStore {
}
}

func (m *mockStreamStore) Execute(cr configrequest.Request) error {
func (m *mockStreamStore) Execute(_ context.Context, cr configrequest.Request) error {
m.store = append(m.store, cr)
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion x-pack/elastic-agent/pkg/agent/application/once.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,5 @@ func readfiles(ctx context.Context, files []string, loader *config.Loader, emitt
return errors.New(err, "could not load or merge configuration", errors.TypeConfig)
}

return emitter(c)
return emitter(ctx, c)
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (h *PolicyChange) Handle(ctx context.Context, a fleetapi.Action, acker stor
if err != nil {
return err
}
if err := h.emitter(c); err != nil {
if err := h.emitter(ctx, c); err != nil {
return err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type mockEmitter struct {
policy *config.Config
}

func (m *mockEmitter) Emitter(policy *config.Config) error {
func (m *mockEmitter) Emitter(_ context.Context, policy *config.Config) error {
m.policy = policy
return m.err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (h *Unenroll) Handle(ctx context.Context, a fleetapi.Action, acker store.Fl

// Providing empty map will close all pipelines
noPrograms := make(map[pipeline.RoutingKey][]program.Program)
h.dispatcher.Route(a.ID(), noPrograms)
h.dispatcher.Route(ctx, a.ID(), noPrograms)

if !action.IsDetected {
// ACK only events comming from fleet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,15 @@ func (ad *ActionDispatcher) key(a fleetapi.Action) string {
}

// Dispatch dispatches an action using pre-registered set of handlers.
func (ad *ActionDispatcher) Dispatch(acker store.FleetAcker, actions ...fleetapi.Action) error {
func (ad *ActionDispatcher) Dispatch(ctx context.Context, acker store.FleetAcker, actions ...fleetapi.Action) (err error) {
// span, ctx := apm.StartSpan(ctx, "dispatch", "app.internal")
// defer func() {
// if err != nil {
// apm.CaptureError(ctx, err).Send()
// }
// span.End()
// }()

if len(actions) == 0 {
ad.log.Debug("No action to dispatch")
return nil
Expand All @@ -87,18 +95,19 @@ func (ad *ActionDispatcher) Dispatch(acker store.FleetAcker, actions ...fleetapi
)

for _, action := range actions {
if err := ad.ctx.Err(); err != nil {
if err = ad.ctx.Err(); err != nil {
return err
}

if err := ad.dispatchAction(action, acker); err != nil {
if err = ad.dispatchAction(action, acker); err != nil {
ad.log.Debugf("Failed to dispatch action '%+v', error: %+v", action, err)
return err
}
ad.log.Debugf("Successfully dispatched action: '%+v'", action)
}

return acker.Commit(ad.ctx)
err = acker.Commit(ctx)
return err
}

func (ad *ActionDispatcher) dispatchAction(a fleetapi.Action, acker store.FleetAcker) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ func TestActionDispatcher(t *testing.T) {
ack := noopacker.NewAcker()

t.Run("Success to dispatch multiples events", func(t *testing.T) {
ctx := context.Background()
def := &mockHandler{}
d, err := New(context.Background(), nil, def)
d, err := New(ctx, nil, def)
require.NoError(t, err)

success1 := &mockHandler{}
Expand All @@ -62,7 +63,7 @@ func TestActionDispatcher(t *testing.T) {
action1 := &mockAction{}
action2 := &mockActionOther{}

err = d.Dispatch(ack, action1, action2)
err = d.Dispatch(ctx, ack, action1, action2)

require.NoError(t, err)

Expand All @@ -78,11 +79,12 @@ func TestActionDispatcher(t *testing.T) {

t.Run("Unknown action are caught by the unknown handler", func(t *testing.T) {
def := &mockHandler{}
d, err := New(context.Background(), nil, def)
ctx := context.Background()
d, err := New(ctx, nil, def)
require.NoError(t, err)

action := &mockActionUnknown{}
err = d.Dispatch(ack, action)
err = d.Dispatch(ctx, ack, action)

require.NoError(t, err)
require.True(t, def.called)
Expand Down
Loading