Skip to content

Commit

Permalink
[cmd/opampsupervisor] Report AvailableComponents via OpAMP supervisor (
Browse files Browse the repository at this point in the history
…#37250)

#### Description
This feature implements support for the new AvailableComponents message
in opamp-go for the OpAMP supervisor:
open-telemetry/opamp-go#340

~~Since the AvailableComponents message has not yet been released, this
PR contains replace directives for testing purposes. This PR will not be
merged until opamp-go has been released with the new AvailableComponents
message implementation, at which point the replace directives will be
removed.~~ opamp-go v0.19.0 has been released.

This PR is one of two that make up
#37248

#### Link to tracking issue
Fixes
#37247

#### Testing
Updated unit tests
Manual end-to-end testing using the OpAMP extension and an external
OpAMP server.

#### Documentation
Updated README

---------

Co-authored-by: Daniel Jaglowski <[email protected]>
  • Loading branch information
mrsillydog and djaglowski authored Feb 28, 2025
1 parent a9d996e commit 2c186cc
Show file tree
Hide file tree
Showing 13 changed files with 280 additions and 18 deletions.
27 changes: 27 additions & 0 deletions .chloggen/report-available-components-opamp-supervisor.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: opampsupervisor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Support retrieval of available components via the OpAMP supervisor.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [37247]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
1 change: 1 addition & 0 deletions cmd/opampsupervisor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ For a list of open issues related to the Supervisor, see [these issues](https://
| AcceptsRestartCommand | <https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21077> |
| ReportsHealth | ⚠️ |
| ReportsRemoteConfig | <https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21079> |
| ReportsAvailableComponents | <https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/37247> |

### Supervisor specification features

Expand Down
98 changes: 98 additions & 0 deletions cmd/opampsupervisor/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,104 @@ func TestSupervisorBootstrapsCollector(t *testing.T) {
}, 5*time.Second, 250*time.Millisecond)
}

func TestSupervisorBootstrapsCollectorAvailableComponents(t *testing.T) {
agentDescription := atomic.Value{}
availableComponents := atomic.Value{}

// Load the Supervisor config so we can get the location of
// the Collector that will be run.
var cfg config.Supervisor
cfgFile := getSupervisorConfig(t, "reports_available_components", map[string]string{})
k := koanf.New("::")
err := k.Load(file.Provider(cfgFile.Name()), yaml.Parser())
require.NoError(t, err)
err = k.UnmarshalWithConf("", &cfg, koanf.UnmarshalConf{
Tag: "mapstructure",
})
require.NoError(t, err)

// Get the binary name and version from the Collector binary
// using the `components` command that prints a YAML-encoded
// map of information about the Collector build. Some of this
// information will be used as defaults for the telemetry
// attributes.
agentPath := cfg.Agent.Executable
componentsInfo, err := exec.Command(agentPath, "components").Output()
require.NoError(t, err)
k = koanf.New("::")
err = k.Load(rawbytes.Provider(componentsInfo), yaml.Parser())
require.NoError(t, err)
buildinfo := k.StringMap("buildinfo")
command := buildinfo["command"]
version := buildinfo["version"]

server := newOpAMPServer(
t,
defaultConnectingHandler,
types.ConnectionCallbacks{
OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
if message.AgentDescription != nil {
agentDescription.Store(message.AgentDescription)
}

response := &protobufs.ServerToAgent{}
if message.AvailableComponents != nil {
availableComponents.Store(message.AvailableComponents)

if message.GetAvailableComponents().GetComponents() == nil {
response.Flags = uint64(protobufs.ServerToAgentFlags_ServerToAgentFlags_ReportAvailableComponents)
}
}

return response
},
})

s := newSupervisor(t, "reports_available_components", map[string]string{"url": server.addr})

require.Nil(t, s.Start())
defer s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, true)

require.Eventually(t, func() bool {
ac, ok := availableComponents.Load().(*protobufs.AvailableComponents)
if !ok {
return false
}

if ac.GetComponents() == nil {
return false
}

require.Len(t, ac.GetComponents(), 5) // connectors, exporters, extensions, processors, receivers
require.NotNil(t, ac.GetComponents()["extensions"])
require.NotNil(t, ac.GetComponents()["extensions"].GetSubComponentMap())
require.NotNil(t, ac.GetComponents()["extensions"].GetSubComponentMap()["opamp"])

ad, ok := agentDescription.Load().(*protobufs.AgentDescription)
if !ok {
return false
}

var agentName, agentVersion string
identAttr := ad.IdentifyingAttributes
for _, attr := range identAttr {
switch attr.Key {
case semconv.AttributeServiceName:
agentName = attr.Value.GetStringValue()
case semconv.AttributeServiceVersion:
agentVersion = attr.Value.GetStringValue()
}
}

// By default the Collector should report its name and version
// from the component.BuildInfo struct built into the Collector
// binary.
return agentName == command && agentVersion == version
}, 10*time.Second, 250*time.Millisecond)
}

func TestSupervisorReportsEffectiveConfig(t *testing.T) {
var agentConfig atomic.Value
server := newOpAMPServer(
Expand Down
6 changes: 6 additions & 0 deletions cmd/opampsupervisor/supervisor/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ type Capabilities struct {
ReportsOwnMetrics bool `mapstructure:"reports_own_metrics"`
ReportsHealth bool `mapstructure:"reports_health"`
ReportsRemoteConfig bool `mapstructure:"reports_remote_config"`
ReportsAvailableComponents bool `mapstructure:"reports_available_components"`
}

func (c Capabilities) SupportedCapabilities() protobufs.AgentCapabilities {
Expand Down Expand Up @@ -128,6 +129,10 @@ func (c Capabilities) SupportedCapabilities() protobufs.AgentCapabilities {
supportedCapabilities |= protobufs.AgentCapabilities_AgentCapabilities_AcceptsOpAMPConnectionSettings
}

if c.ReportsAvailableComponents {
supportedCapabilities |= protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents
}

return supportedCapabilities
}

Expand Down Expand Up @@ -245,6 +250,7 @@ func DefaultSupervisor() Supervisor {
ReportsOwnMetrics: true,
ReportsHealth: true,
ReportsRemoteConfig: false,
ReportsAvailableComponents: false,
},
Storage: Storage{
Directory: defaultStorageDir,
Expand Down
4 changes: 3 additions & 1 deletion cmd/opampsupervisor/supervisor/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ func TestCapabilities_SupportedCapabilities(t *testing.T) {
ReportsOwnMetrics: true,
ReportsHealth: true,
ReportsRemoteConfig: true,
ReportsAvailableComponents: true,
},
expectedAgentCapabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsStatus |
protobufs.AgentCapabilities_AgentCapabilities_ReportsEffectiveConfig |
Expand All @@ -468,7 +469,8 @@ func TestCapabilities_SupportedCapabilities(t *testing.T) {
protobufs.AgentCapabilities_AgentCapabilities_AcceptsRemoteConfig |
protobufs.AgentCapabilities_AgentCapabilities_ReportsRemoteConfig |
protobufs.AgentCapabilities_AgentCapabilities_AcceptsRestartCommand |
protobufs.AgentCapabilities_AgentCapabilities_AcceptsOpAMPConnectionSettings,
protobufs.AgentCapabilities_AgentCapabilities_AcceptsOpAMPConnectionSettings |
protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents,
},
}

Expand Down
4 changes: 2 additions & 2 deletions cmd/opampsupervisor/supervisor/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

type flattenedSettings struct {
onMessage func(conn serverTypes.Connection, message *protobufs.AgentToServer)
onMessage func(conn serverTypes.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent
onConnecting func(request *http.Request) (shouldConnect bool, rejectStatusCode int)
onConnectionClose func(conn serverTypes.Connection)
endpoint string
Expand Down Expand Up @@ -54,7 +54,7 @@ func (fs flattenedSettings) OnConnected(_ context.Context, _ serverTypes.Connect

func (fs flattenedSettings) OnMessage(_ context.Context, conn serverTypes.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
if fs.onMessage != nil {
fs.onMessage(conn, message)
return fs.onMessage(conn, message)
}

return &protobufs.ServerToAgent{}
Expand Down
3 changes: 2 additions & 1 deletion cmd/opampsupervisor/supervisor/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ func Test_flattenedSettings_OnConnecting(t *testing.T) {
func Test_flattenedSettings_OnMessage(t *testing.T) {
onMessageFuncCalled := false
fs := flattenedSettings{
onMessage: func(_ serverTypes.Connection, _ *protobufs.AgentToServer) {
onMessage: func(_ serverTypes.Connection, _ *protobufs.AgentToServer) *protobufs.ServerToAgent {
onMessageFuncCalled = true
return &protobufs.ServerToAgent{}
},
}

Expand Down
72 changes: 61 additions & 11 deletions cmd/opampsupervisor/supervisor/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ type Supervisor struct {
// Supervisor's own config.
config config.Supervisor

agentDescription *atomic.Value
agentDescription *atomic.Value
availableComponents *atomic.Value

// Supervisor's persistent state
persistentState *persistentState
Expand Down Expand Up @@ -174,6 +175,7 @@ func NewSupervisor(logger *zap.Logger, cfg config.Supervisor) (*Supervisor, erro
cfgState: &atomic.Value{},
effectiveConfig: &atomic.Value{},
agentDescription: &atomic.Value{},
availableComponents: &atomic.Value{},
doneChan: make(chan struct{}),
customMessageToServer: make(chan *protobufs.CustomMessage, maxBufferedCustomMessages),
agentConn: &atomic.Value{},
Expand Down Expand Up @@ -300,16 +302,22 @@ func (s *Supervisor) getBootstrapInfo() (err error) {

done := make(chan error, 1)
var connected atomic.Bool
var doneReported atomic.Bool

// Start a one-shot server to get the Collector's agent description
// using the Collector's OpAMP extension.
// and available components using the Collector's OpAMP extension.
err = srv.Start(flattenedSettings{
endpoint: fmt.Sprintf("localhost:%d", s.opampServerPort),
onConnecting: func(_ *http.Request) (bool, int) {
connected.Store(true)
return true, http.StatusOK
},
onMessage: func(_ serverTypes.Connection, message *protobufs.AgentToServer) {
onMessage: func(_ serverTypes.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
response := &protobufs.ServerToAgent{}
if message.GetAvailableComponents() != nil {
s.setAvailableComponents(message.AvailableComponents)
}

if message.AgentDescription != nil {
instanceIDSeen := false
s.setAgentDescription(message.AgentDescription)
Expand All @@ -324,19 +332,47 @@ func (s *Supervisor) getBootstrapInfo() (err error) {
"the Collector's instance ID (%s) does not match with the instance ID set by the Supervisor (%s)",
attr.Value.GetStringValue(),
s.persistentState.InstanceID.String())
return
return response
}
instanceIDSeen = true
}
}

if !instanceIDSeen {
done <- errors.New("the Collector did not specify an instance ID in its AgentDescription message")
return
return response
}
}

// agent description must be defined
_, ok := s.agentDescription.Load().(*protobufs.AgentDescription)
if !ok {
return response
}

// if available components have not been reported, agent description is sufficient to continue
availableComponents, availableComponentsOk := s.availableComponents.Load().(*protobufs.AvailableComponents)
if availableComponentsOk {
// must have a full list of components if available components have been reported
if availableComponents.GetComponents() != nil {
if !doneReported.Load() {
done <- nil
doneReported.Store(true)
}
} else {
// if we don't have a full component list, ask for it
response.Flags = uint64(protobufs.ServerToAgentFlags_ServerToAgentFlags_ReportAvailableComponents)
}
return response
}

// need to only report done once, not on each message - otherwise, we get a hung thread
if !doneReported.Load() {
done <- nil
doneReported.Store(true)
}

return response
},
}.toServerSettings())
if err != nil {
Expand Down Expand Up @@ -456,6 +492,12 @@ func (s *Supervisor) startOpAMPClient() error {
return err
}

if ac, ok := s.availableComponents.Load().(*protobufs.AvailableComponents); ok && ac != nil {
if err = s.opampClient.SetAvailableComponents(ac); err != nil {
return err
}
}

s.logger.Debug("Starting OpAMP client...")
if err = s.opampClient.Start(context.Background(), settings); err != nil {
return err
Expand Down Expand Up @@ -505,7 +547,7 @@ func (s *Supervisor) startOpAMPServer() error {
return nil
}

func (s *Supervisor) handleAgentOpAMPMessage(conn serverTypes.Connection, message *protobufs.AgentToServer) {
func (s *Supervisor) handleAgentOpAMPMessage(conn serverTypes.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
s.agentConn.Store(conn)

s.logger.Debug("Received OpAMP message from the agent")
Expand Down Expand Up @@ -551,6 +593,8 @@ func (s *Supervisor) handleAgentOpAMPMessage(conn serverTypes.Connection, messag
s.logger.Debug("Received health status from agent", zap.Bool("healthy", message.Health.Healthy))
s.lastHealthFromClient = message.Health
}

return &protobufs.ServerToAgent{}
}

func (s *Supervisor) forwardCustomMessagesToServerLoop() {
Expand Down Expand Up @@ -584,6 +628,11 @@ func (s *Supervisor) setAgentDescription(ad *protobufs.AgentDescription) {
s.agentDescription.Store(ad)
}

// setAvailableComponents sets the available components of the OpAMP agent
func (s *Supervisor) setAvailableComponents(ac *protobufs.AvailableComponents) {
s.availableComponents.Store(ac)
}

// applyKeyValueOverrides merges the overrides map into the array of key value pairs.
// If a key from overrides already exists in the array of key value pairs, it is overwritten by the value from the overrides map.
// An array of KeyValue pair is returned, with each key value pair having a distinct key.
Expand Down Expand Up @@ -766,10 +815,11 @@ func (s *Supervisor) composeOpAMPExtensionConfig() []byte {

var cfg bytes.Buffer
tplVars := map[string]any{
"InstanceUid": s.persistentState.InstanceID.String(),
"SupervisorPort": s.opampServerPort,
"PID": s.pidProvider.PID(),
"PPIDPollInterval": orphanPollInterval,
"InstanceUid": s.persistentState.InstanceID.String(),
"SupervisorPort": s.opampServerPort,
"PID": s.pidProvider.PID(),
"PPIDPollInterval": orphanPollInterval,
"ReportsAvailableComponents": s.config.Capabilities.ReportsAvailableComponents,
}
err := s.opampextensionTemplate.Execute(
&cfg,
Expand Down Expand Up @@ -1020,7 +1070,7 @@ func (s *Supervisor) startAgent() (agentStartStatus, error) {
err := s.commander.Start(context.Background())
if err != nil {
s.logger.Error("Cannot start the agent", zap.Error(err))
startErr := fmt.Errorf("Cannot start the agent: %w", err)
startErr := fmt.Errorf("cannot start the agent: %w", err)
err = s.opampClient.SetHealth(&protobufs.ComponentHealth{Healthy: false, LastError: startErr.Error()})
if err != nil {
s.logger.Error("Failed to report OpAMP client health", zap.Error(err))
Expand Down
Loading

0 comments on commit 2c186cc

Please sign in to comment.