Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[elastic-agent] initial instrumentation #29031

Merged
merged 38 commits into from
Feb 1, 2022
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
53cbb1b
add some initial instrumentation
stuartnelson3 Nov 16, 2021
9f960a2
update tests
stuartnelson3 Nov 17, 2021
97f5658
update NOTICE.txt
stuartnelson3 Nov 18, 2021
234ab4e
make update
stuartnelson3 Nov 18, 2021
8bf85e9
fix order capturing the error/ending the span
stuartnelson3 Nov 19, 2021
1303efc
close the default tracer
stuartnelson3 Nov 30, 2021
be720ae
use higher-level span name
stuartnelson3 Nov 30, 2021
e81239a
add agent instrumentation config
stuartnelson3 Dec 2, 2021
c3fc48e
pass config values to tracer
stuartnelson3 Dec 2, 2021
0e4f1f7
add explicit tls config
stuartnelson3 Dec 3, 2021
69c327b
add span for emitter
stuartnelson3 Dec 3, 2021
696d0ea
instrument dispatcher
stuartnelson3 Dec 4, 2021
8207973
instrument Update fn
stuartnelson3 Dec 4, 2021
55bd680
instrument acker
stuartnelson3 Dec 4, 2021
ba1b101
update tests
stuartnelson3 Dec 4, 2021
10d67d7
mage check
stuartnelson3 Dec 6, 2021
b9b5e5b
remove libbeat tls config
stuartnelson3 Dec 6, 2021
4dc7c89
tracing for enroll command
stuartnelson3 Dec 6, 2021
49a08da
client is already wrapped
stuartnelson3 Dec 6, 2021
3956d45
wrap base round tripper with apm
stuartnelson3 Dec 6, 2021
c6f4aa2
remove some comments
stuartnelson3 Dec 6, 2021
a9620bc
nest apm config under agent.monitoring.apm
stuartnelson3 Dec 6, 2021
eb315d9
Merge branch 'master' into instrument-elastic-agent
stuartnelson3 Dec 6, 2021
b2560ae
Merge branch 'master' into instrument-elastic-agent
stuartnelson3 Dec 7, 2021
4cf02d0
re-add mistakenly deleted log value
stuartnelson3 Dec 7, 2021
83c1979
remove TODO
stuartnelson3 Dec 7, 2021
e6ead42
Apply suggestions from code review
stuartnelson3 Dec 8, 2021
e2b75fc
Merge branch 'master' into instrument-elastic-agent
stuartnelson3 Jan 10, 2022
4b7fd3d
remove old instrumentation config
stuartnelson3 Jan 10, 2022
c89a52d
Merge branch 'master' into instrument-elastic-agent
stuartnelson3 Jan 14, 2022
6f05e1b
comments
stuartnelson3 Jan 14, 2022
5c2fddb
Merge branch 'master' into instrument-elastic-agent
stuartnelson3 Jan 18, 2022
7657748
Merge branch 'master' into instrument-elastic-agent
stuartnelson3 Jan 19, 2022
010bb60
Merge branch 'master' into instrument-elastic-agent
stuartnelson3 Jan 24, 2022
8d9603a
Merge branch 'master' into instrument-elastic-agent
stuartnelson3 Jan 25, 2022
d303e3d
Merge branch 'master' into instrument-elastic-agent
stuartnelson3 Jan 27, 2022
92a87e2
Merge branch 'master' into instrument-elastic-agent
stuartnelson3 Jan 31, 2022
c0b4b9e
Merge branch 'master' into instrument-elastic-agent
stuartnelson3 Jan 31, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,180 changes: 1,102 additions & 78 deletions NOTICE.txt

Large diffs are not rendered by default.

8 changes: 6 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,11 @@ require (
github.com/vmware/govmomi v0.0.0-20170802214208-2cad15190b41
github.com/xdg/scram v1.0.3
github.com/yuin/gopher-lua v0.0.0-20170403160031-b402f3114ec7 // indirect
go.elastic.co/apm v1.11.0
go.elastic.co/apm v1.14.0
go.elastic.co/apm/module/apmelasticsearch v1.7.2
go.elastic.co/apm/module/apmhttp v1.7.2
go.elastic.co/apm/module/apmgorilla v1.14.0
go.elastic.co/apm/module/apmgrpc v1.14.0
go.elastic.co/apm/module/apmhttp v1.14.0
go.elastic.co/ecszap v0.3.0
go.elastic.co/go-licence-detector v0.4.0
go.etcd.io/bbolt v1.3.6
Expand Down Expand Up @@ -241,6 +243,7 @@ require (
github.com/hashicorp/go-version v1.2.0 // indirect
github.com/imdario/mergo v0.3.12 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/jcchavezs/porto v0.1.0 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.0.0 // indirect
Expand Down Expand Up @@ -276,6 +279,7 @@ require (
golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/grpc/examples v0.0.0-20211115174500-b2317c762757 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
k8s.io/klog/v2 v2.9.0 // indirect
k8s.io/kube-openapi v0.0.0-20210305001622-591a79e4bda7 // indirect
Expand Down
19 changes: 15 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,6 @@ github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7Do
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/creack/pty v1.1.11 h1:07n33Z8lZxZ2qwegKbObQohDhXDQxiMMz1NOUGYlesw=
github.com/creack/pty v1.1.11/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/cucumber/godog v0.8.1 h1:lVb+X41I4YDreE+ibZ50bdXmySxgRviYFgKY6Aw4XE8=
github.com/cucumber/godog v0.8.1/go.mod h1:vSh3r/lM+psC1BPXvdkSEuNjmXfpVqrMGYAElF6hxnA=
github.com/cyphar/filepath-securejoin v0.2.2/go.mod h1:FpkQEhXnPnOthhzymB7CGsFk2G9VLXONKD9G7QGMM+4=
github.com/cyphar/filepath-securejoin v0.2.3 h1:YX6ebbZCZP7VkM3scTTokDgBL2TY741X51MTk3ycuNI=
Expand Down Expand Up @@ -512,6 +511,7 @@ github.com/elastic/go-concert v0.2.0 h1:GAQrhRVXprnNjtvTP9pWJ1d4ToEA4cU5ci7TwTa2
github.com/elastic/go-concert v0.2.0/go.mod h1:HWjpO3IAEJUxOeaJOWXWEp7imKd27foxz9V5vegC/38=
github.com/elastic/go-libaudit/v2 v2.2.0 h1:TY3FDpG4Zr9Qnv6KYW6olYr/U+nfu0rD2QAbv75VxMQ=
github.com/elastic/go-libaudit/v2 v2.2.0/go.mod h1:MM/l/4xV7ilcl+cIblL8Zn448J7RZaDwgNLE4gNKYPg=
github.com/elastic/go-licenser v0.3.1/go.mod h1:D8eNQk70FOCVBl3smCGQt/lv7meBeQno2eI1S5apiHQ=
github.com/elastic/go-licenser v0.4.0 h1:jLq6A5SilDS/Iz1ABRkO6BHy91B9jBora8FwGRsDqUI=
github.com/elastic/go-licenser v0.4.0/go.mod h1:V56wHMpmdURfibNBggaSBfqgPxyT1Tldns1i87iTEvU=
github.com/elastic/go-lookslike v0.3.0 h1:HDI/DQ65V85ZqM7D/sbxcK2wFFnh3+7iFvBk2v2FTHs=
Expand Down Expand Up @@ -895,6 +895,7 @@ github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0U
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4 h1:z53tR0945TRRQO/fLEVPI6SMv7ZflF0TEaTAoU7tOzg=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
Expand Down Expand Up @@ -1000,6 +1001,8 @@ github.com/influxdata/usage-client v0.0.0-20160829180054-6d3895376368/go.mod h1:
github.com/j-keck/arping v0.0.0-20160618110441-2cf9dc699c56/go.mod h1:ymszkNOg6tORTn+6F6j+Jc8TOr5osrynvN6ivFWZ2GA=
github.com/jarcoal/httpmock v1.0.4 h1:jp+dy/+nonJE4g4xbVtl9QdrUNbn6/3hDT5R4nDIZnA=
github.com/jarcoal/httpmock v1.0.4/go.mod h1:ATjnClrvW/3tijVmpL/va5Z3aAyGvqU3gCT8nX0Txik=
github.com/jcchavezs/porto v0.1.0 h1:Xmxxn25zQMmgE7/yHYmh19KcItG81hIwfbEEFnd6w/Q=
github.com/jcchavezs/porto v0.1.0/go.mod h1:fESH0gzDHiutHRdX2hv27ojnOVFco37hg1W6E9EZF4A=
github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8=
github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs=
github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo=
Expand Down Expand Up @@ -1574,12 +1577,17 @@ github.com/yvasiyarov/go-metrics v0.0.0-20140926110328-57bccd1ccd43/go.mod h1:aX
github.com/yvasiyarov/gorelic v0.0.0-20141212073537-a9bba5b9ab50/go.mod h1:NUSPSUX/bi6SeDMUh6brw0nXpxHnc96TguQh0+r/ssA=
github.com/yvasiyarov/newrelic_platform_go v0.0.0-20140908184405-b21fdbd4370f/go.mod h1:GlGEuHIJweS1mbCqG+7vt2nvWLzLLnRHbXz5JKd/Qbg=
go.elastic.co/apm v1.7.2/go.mod h1:tCw6CkOJgkWnzEthFN9HUP1uL3Gjc/Ur6m7gRPLaoH0=
go.elastic.co/apm v1.11.0 h1:uJyt6nCW9880sZhfl1tB//Jy/5TadNoAd8edRUtgb3w=
go.elastic.co/apm v1.11.0/go.mod h1:qoOSi09pnzJDh5fKnfY7bPmQgl8yl2tULdOu03xhui0=
go.elastic.co/apm v1.14.0 h1:9yilcTbWpqhfyunUj6/SDpZbR4FOVB50xQgODe0TW/0=
go.elastic.co/apm v1.14.0/go.mod h1:dylGv2HKR0tiCV+wliJz1KHtDyuD8SPe69oV7VyK6WY=
go.elastic.co/apm/module/apmelasticsearch v1.7.2 h1:5STGHLZLSeAzxordMc+dFVKiyVtMmxADOV+TgRaXXJg=
go.elastic.co/apm/module/apmelasticsearch v1.7.2/go.mod h1:ZyNFuyWdt42GBZkz0SogoLzDBrBGj4orxpiUuxYeYq8=
go.elastic.co/apm/module/apmhttp v1.7.2 h1:2mRh7SwBuEVLmJlX+hsMdcSg9xaielCLElaPn/+i34w=
go.elastic.co/apm/module/apmgorilla v1.14.0 h1:espCHSZ3ibkrffR6KLua+0jMeBSgO/087U9BZ46Cyv8=
go.elastic.co/apm/module/apmgorilla v1.14.0/go.mod h1:+cDGiyPXN3EvTxoh7zcWOL1/Yw6zKhNkUU0a0OGyXsg=
go.elastic.co/apm/module/apmgrpc v1.14.0 h1:sQA5XnxdKpjzKlMOJD1AKuMEsQh1CSdzJ21iJhZVNgw=
go.elastic.co/apm/module/apmgrpc v1.14.0/go.mod h1:rfw0Kw9yhui/O+rap4gDme0Pj2jGpTdN4iSWK2HM6+0=
go.elastic.co/apm/module/apmhttp v1.7.2/go.mod h1:sTFWiWejnhSdZv6+dMgxGec2Nxe/ZKfHfz/xtRM+cRY=
go.elastic.co/apm/module/apmhttp v1.14.0 h1:uDSIPr1BJOt1A/T5J9Beq9VtMtQHqOdqQUXCPRQF4C4=
go.elastic.co/apm/module/apmhttp v1.14.0/go.mod h1:PY8hyV0X3eKqXYYoN0pyu1pWcvFCwGmh5eUFuS39Zmo=
go.elastic.co/ecszap v0.3.0 h1:Zo/Y4sJLqbWDlqCHI4F4Lzeg0Fs4+n5ldVis4h9xV8w=
go.elastic.co/ecszap v0.3.0/go.mod h1:HTUi+QRmr3EuZMqxPX+5fyOdMNfUu5iPebgfhgsTJYQ=
go.elastic.co/fastjson v1.0.0/go.mod h1:PmeUOMMtLHQr9ZS9J9owrAVg0FkaZDRZJEFTTGHtchs=
Expand Down Expand Up @@ -2120,6 +2128,7 @@ google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEY
google.golang.org/genproto v0.0.0-20200618031413-b414f8b61790/go.mod h1:jDfRM7FcilCzHH/e9qn6dsT145K34l5v+OpcnNgKAAA=
google.golang.org/genproto v0.0.0-20200729003335-053ba62fc06f/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200804131852-c06518451d9c/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200806141610-86f49bd18e98/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200825200019-8632dd797987/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200904004341-0bd0a958aa1d/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20201109203340-2640f1f9cdfb/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
Expand Down Expand Up @@ -2171,6 +2180,8 @@ google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9K
google.golang.org/grpc v1.41.0 h1:f+PlOh7QV4iIJkPrx5NQ7qaNGFQ3OTse67yaDHfju4E=
google.golang.org/grpc v1.41.0/go.mod h1:U3l9uK9J0sini8mHphKoXyaqDA/8VyGnDee1zzIUK6k=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw=
google.golang.org/grpc/examples v0.0.0-20211115174500-b2317c762757 h1:d0UA5meuQRj/XhiC4CagMU1V3oZxYV+V9Nw2855gOqA=
google.golang.org/grpc/examples v0.0.0-20211115174500-b2317c762757/go.mod h1:gID3PKrg7pWKntu9Ss6zTLJ0ttC0X9IHgREOCZwbCVU=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
Expand Down
21 changes: 16 additions & 5 deletions x-pack/elastic-agent/pkg/agent/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"context"
"fmt"

"go.elastic.co/apm"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/storage"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/status"
Expand Down Expand Up @@ -39,7 +41,15 @@ type upgraderControl interface {
}

// New creates a new Agent and bootstrap the required subsystem.
func New(log *logger.Logger, pathConfigFile string, reexec reexecManager, statusCtrl status.Controller, uc upgraderControl, agentInfo *info.AgentInfo) (Application, error) {
func New(
log *logger.Logger,
pathConfigFile string,
reexec reexecManager,
statusCtrl status.Controller,
uc upgraderControl,
agentInfo *info.AgentInfo,
tracer *apm.Tracer,
) (Application, error) {
// Load configuration from disk to understand in which mode of operation
// we must start the elastic-agent, the mode of operation cannot be changed without restarting the
// elastic-agent.
Expand All @@ -52,7 +62,7 @@ func New(log *logger.Logger, pathConfigFile string, reexec reexecManager, status
return nil, err
}

return createApplication(log, pathConfigFile, rawConfig, reexec, statusCtrl, uc, agentInfo)
return createApplication(log, pathConfigFile, rawConfig, reexec, statusCtrl, uc, agentInfo, tracer)
}

func createApplication(
Expand All @@ -63,6 +73,7 @@ func createApplication(
statusCtrl status.Controller,
uc upgraderControl,
agentInfo *info.AgentInfo,
tracer *apm.Tracer,
) (Application, error) {
log.Info("Detecting execution mode")
ctx := context.Background()
Expand All @@ -74,7 +85,7 @@ func createApplication(

if configuration.IsStandalone(cfg.Fleet) {
log.Info("Agent is managed locally")
return newLocal(ctx, log, paths.ConfigFile(), rawConfig, reexec, statusCtrl, uc, agentInfo)
return newLocal(ctx, log, paths.ConfigFile(), rawConfig, reexec, statusCtrl, uc, agentInfo, tracer)
}

// not in standalone; both modes require reading the fleet.yml configuration file
Expand All @@ -86,11 +97,11 @@ func createApplication(

if configuration.IsFleetServerBootstrap(cfg.Fleet) {
log.Info("Agent is in Fleet Server bootstrap mode")
return newFleetServerBootstrap(ctx, log, pathConfigFile, rawConfig, statusCtrl, agentInfo)
return newFleetServerBootstrap(ctx, log, pathConfigFile, rawConfig, statusCtrl, agentInfo, tracer)
}

log.Info("Agent is managed by Fleet")
return newManaged(ctx, log, store, cfg, rawConfig, reexec, statusCtrl, agentInfo)
return newManaged(ctx, log, store, cfg, rawConfig, reexec, statusCtrl, agentInfo, tracer)
}

func mergeFleetConfig(rawConfig *config.Config) (storage.Store, *configuration.Configuration, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ package application
import (
"context"

"go.elastic.co/apm"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting I didn't know about go.elastic.co, when the restructuring of the repository is complete we should take a look a it. @kvch @cmacknz @ruflin

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH, I am not a huge fan of vanity imports. It looks great, but it is just vanity... :)


"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/transpiler"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/sorted"
Expand Down Expand Up @@ -51,6 +53,7 @@ func newFleetServerBootstrap(
rawConfig *config.Config,
statusCtrl status.Controller,
agentInfo *info.AgentInfo,
tracer *apm.Tracer,
) (*FleetServerBootstrap, error) {
cfg, err := configuration.NewFromConfig(rawConfig)
if err != nil {
Expand Down Expand Up @@ -79,7 +82,7 @@ func newFleetServerBootstrap(
}

bootstrapApp.bgContext, bootstrapApp.cancelCtxFn = context.WithCancel(ctx)
bootstrapApp.srv, err = server.NewFromConfig(log, cfg.Settings.GRPC, &operation.ApplicationStatusHandler{})
bootstrapApp.srv, err = server.NewFromConfig(log, cfg.Settings.GRPC, &operation.ApplicationStatusHandler{}, tracer)
if err != nil {
return nil, errors.New(err, "initialize GRPC listener")
}
Expand Down Expand Up @@ -133,6 +136,7 @@ func (b *FleetServerBootstrap) Start() error {
if err := b.srv.Start(); err != nil {
return err
}
// TODO: Does it make sense to pass in a ctx here?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you either clarify the TODOs and remove them or add a bit more detail, your github handle and/or a link to a follow up task?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 i should start writing "QUESTION" since that's what they really are, they just get highlighed when I use todo in vim so i see them 😅 i've cleaned it up a bit since talking with @michel-laterman last week.

if err := b.source.Start(); err != nil {
return err
}
Expand Down Expand Up @@ -166,20 +170,22 @@ func bootstrapEmitter(ctx context.Context, log *logger.Logger, agentInfo transpi
case c = <-ch:
}

err := emit(log, agentInfo, router, modifiers, c)
err := emit(ctx, log, agentInfo, router, modifiers, c)
if err != nil {
log.Error(err)
}
}
}()

return func(c *config.Config) error {
return func(ctx context.Context, c *config.Config) error {
span, _ := apm.StartSpan(ctx, "emit", "app.internal")
defer span.End()
ch <- c
return nil
}, nil
}

func emit(log *logger.Logger, agentInfo transpiler.AgentInfo, router pipeline.Router, modifiers *pipeline.ConfigModifiers, c *config.Config) error {
func emit(ctx context.Context, log *logger.Logger, agentInfo transpiler.AgentInfo, router pipeline.Router, modifiers *pipeline.ConfigModifiers, c *config.Config) error {
if err := info.InjectAgentConfig(c); err != nil {
return err
}
Expand Down Expand Up @@ -218,7 +224,7 @@ func emit(log *logger.Logger, agentInfo transpiler.AgentInfo, router pipeline.Ro
return errors.New("bootstrap configuration is incorrect causing fleet-server to not be started")
}

return router.Route(ast.HashStr(), map[pipeline.RoutingKey][]program.Program{
return router.Route(ctx, ast.HashStr(), map[pipeline.RoutingKey][]program.Program{
pipeline.DefaultRK: {
{
Spec: spec,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ func (f *fleetGateway) worker() {
}

var errMsg string
if err := f.dispatcher.Dispatch(f.acker, actions...); err != nil {
// TODO: Is there a purpose in using f.bgContext?
if err := f.dispatcher.Dispatch(context.Background(), f.acker, actions...); err != nil {
stuartnelson3 marked this conversation as resolved.
Show resolved Hide resolved
errMsg = fmt.Sprintf("failed to dispatch actions, error: %s", err)
f.log.Error(errMsg)
f.statusReporter.Update(state.Failed, errMsg, nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type testingDispatcher struct {
received chan struct{}
}

func (t *testingDispatcher) Dispatch(acker store.FleetAcker, actions ...fleetapi.Action) error {
func (t *testingDispatcher) Dispatch(_ context.Context, acker store.FleetAcker, actions ...fleetapi.Action) error {
t.Lock()
defer t.Unlock()
defer func() { t.received <- struct{}{} }()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ func New(

// Start starts the gateway.
func (w *fleetServerWrapper) Start() error {
err := w.emitter(w.injectedCfg)
// TODO: Do we want to pass a ctx to Start()?
err := w.emitter(context.Background(), w.injectedCfg)
if err != nil {
return err
}
Expand Down
5 changes: 4 additions & 1 deletion x-pack/elastic-agent/pkg/agent/application/local_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ package application
import (
"context"

"go.elastic.co/apm"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/filters"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths"
Expand Down Expand Up @@ -65,6 +67,7 @@ func newLocal(
statusCtrl status.Controller,
uc upgraderControl,
agentInfo *info.AgentInfo,
tracer *apm.Tracer,
) (*Local, error) {
caps, err := capabilities.Load(paths.AgentCapabilitiesPath(), log, statusCtrl)
if err != nil {
Expand All @@ -91,7 +94,7 @@ func newLocal(
}

localApplication.bgContext, localApplication.cancelCtxFn = context.WithCancel(ctx)
localApplication.srv, err = server.NewFromConfig(log, cfg.Settings.GRPC, &operation.ApplicationStatusHandler{})
localApplication.srv, err = server.NewFromConfig(log, cfg.Settings.GRPC, &operation.ApplicationStatusHandler{}, tracer)
if err != nil {
return nil, errors.New(err, "initialize GRPC listener")
}
Expand Down
9 changes: 7 additions & 2 deletions x-pack/elastic-agent/pkg/agent/application/managed_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"context"
"fmt"

"go.elastic.co/apm"

"github.com/elastic/go-sysinfo"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/filters"
Expand Down Expand Up @@ -78,6 +80,7 @@ func newManaged(
reexec reexecManager,
statusCtrl status.Controller,
agentInfo *info.AgentInfo,
tracer *apm.Tracer,
) (*Managed, error) {
caps, err := capabilities.Load(paths.AgentCapabilitiesPath(), log, statusCtrl)
if err != nil {
Expand Down Expand Up @@ -105,7 +108,7 @@ func newManaged(
}

managedApplication.bgContext, managedApplication.cancelCtxFn = context.WithCancel(ctx)
managedApplication.srv, err = server.NewFromConfig(log, cfg.Settings.GRPC, &operation.ApplicationStatusHandler{})
managedApplication.srv, err = server.NewFromConfig(log, cfg.Settings.GRPC, &operation.ApplicationStatusHandler{}, tracer)
if err != nil {
return nil, errors.New(err, "initialize GRPC listener", errors.TypeNetwork)
}
Expand Down Expand Up @@ -155,6 +158,7 @@ func newManaged(
if err != nil {
return nil, err
}
// Client has been instrumented with apm
acker, err := fleet.NewAcker(log, agentInfo, client)
if err != nil {
return nil, err
Expand All @@ -170,6 +174,7 @@ func newManaged(
managedApplication.stateStore = stateStore
actionAcker := store.NewStateStoreActionAcker(batchedAcker, stateStore)

// TODO: Is there something to instrument here
actionDispatcher, err := dispatcher.New(managedApplication.bgContext, log, handlers.NewDefault(log))
if err != nil {
return nil, err
Expand Down Expand Up @@ -244,7 +249,7 @@ func newManaged(
// TODO(ph) We will need an improvement on fleet, if there is an error while dispatching a
// persisted action on disk we should be able to ask Fleet to get the latest configuration.
// But at the moment this is not possible because the policy change was acked.
if err := store.ReplayActions(log, actionDispatcher, actionAcker, actions...); err != nil {
if err := store.ReplayActions(ctx, log, actionDispatcher, actionAcker, actions...); err != nil {
log.Errorf("could not recover state, error %+v, skipping...", err)
}
stateRestored = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestManagedModeRouting(t *testing.T) {
actions, err := testActions()
require.NoError(t, err)

err = actionDispatcher.Dispatch(noopacker.NewAcker(), actions...)
err = actionDispatcher.Dispatch(context.Background(), noopacker.NewAcker(), actions...)
require.NoError(t, err)

// has 1 config request for fb, mb and monitoring?
Expand Down Expand Up @@ -101,7 +101,7 @@ func newMockStreamStore() *mockStreamStore {
}
}

func (m *mockStreamStore) Execute(cr configrequest.Request) error {
func (m *mockStreamStore) Execute(_ context.Context, cr configrequest.Request) error {
m.store = append(m.store, cr)
return nil
}
Expand Down
9 changes: 6 additions & 3 deletions x-pack/elastic-agent/pkg/agent/application/once.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package application

import (
"context"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/pipeline"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
Expand All @@ -21,6 +23,7 @@ func newOnce(log *logger.Logger, discover discoverFunc, emitter pipeline.Emitter
return &once{log: log, discover: discover, emitter: emitter}
}

// TODO: Pass ctx here?
func (o *once) Start() error {
files, err := o.discover()
if err != nil {
Expand All @@ -31,18 +34,18 @@ func (o *once) Start() error {
return ErrNoConfiguration
}

return readfiles(files, o.emitter)
return readfiles(context.Background(), files, o.emitter)
}

func (o *once) Stop() error {
return nil
}

func readfiles(files []string, emitter pipeline.EmitterFunc) error {
func readfiles(ctx context.Context, files []string, emitter pipeline.EmitterFunc) error {
c, err := config.LoadFiles(files...)
if err != nil {
return errors.New(err, "could not load or merge configuration", errors.TypeConfig)
}

return emitter(c)
return emitter(ctx, c)
}
Loading