Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cmd/opampsupervisor] Report AvailableComponents via OpAMP supervisor #37250

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/report-available-components-opamp-supervisor.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: opampsupervisor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Support retrieval of available components via the OpAMP supervisor.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [37247]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
1 change: 1 addition & 0 deletions cmd/opampsupervisor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ For a list of open issues related to the Supervisor, see [these issues](https://
| AcceptsRestartCommand | <https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21077> |
| ReportsHealth | ⚠️ |
| ReportsRemoteConfig | <https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21079> |
| ReportsAvailableComponents | <https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/37247> |

### Supervisor specification features

Expand Down
98 changes: 98 additions & 0 deletions cmd/opampsupervisor/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,104 @@ func TestSupervisorBootstrapsCollector(t *testing.T) {
}, 5*time.Second, 250*time.Millisecond)
}

func TestSupervisorBootstrapsCollectorAvailableComponents(t *testing.T) {
agentDescription := atomic.Value{}
availableComponents := atomic.Value{}

// Load the Supervisor config so we can get the location of
// the Collector that will be run.
var cfg config.Supervisor
cfgFile := getSupervisorConfig(t, "reports_available_components", map[string]string{})
k := koanf.New("::")
err := k.Load(file.Provider(cfgFile.Name()), yaml.Parser())
require.NoError(t, err)
err = k.UnmarshalWithConf("", &cfg, koanf.UnmarshalConf{
Tag: "mapstructure",
})
require.NoError(t, err)

// Get the binary name and version from the Collector binary
// using the `components` command that prints a YAML-encoded
// map of information about the Collector build. Some of this
// information will be used as defaults for the telemetry
// attributes.
agentPath := cfg.Agent.Executable
componentsInfo, err := exec.Command(agentPath, "components").Output()
require.NoError(t, err)
k = koanf.New("::")
err = k.Load(rawbytes.Provider(componentsInfo), yaml.Parser())
require.NoError(t, err)
buildinfo := k.StringMap("buildinfo")
command := buildinfo["command"]
version := buildinfo["version"]

server := newOpAMPServer(
t,
defaultConnectingHandler,
types.ConnectionCallbacks{
OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
if message.AgentDescription != nil {
agentDescription.Store(message.AgentDescription)
}

response := &protobufs.ServerToAgent{}
if message.AvailableComponents != nil {
availableComponents.Store(message.AvailableComponents)

if message.GetAvailableComponents().GetComponents() == nil {
response.Flags = uint64(protobufs.ServerToAgentFlags_ServerToAgentFlags_ReportAvailableComponents)
}
}

return response
},
})

s := newSupervisor(t, "reports_available_components", map[string]string{"url": server.addr})

require.Nil(t, s.Start())
defer s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, true)

require.Eventually(t, func() bool {
ac, ok := availableComponents.Load().(*protobufs.AvailableComponents)
if !ok {
return false
}

if ac.GetComponents() == nil {
return false
}

require.Len(t, ac.GetComponents(), 5) // connectors, exporters, extensions, processors, receivers
require.NotNil(t, ac.GetComponents()["extensions"])
require.NotNil(t, ac.GetComponents()["extensions"].GetSubComponentMap())
require.NotNil(t, ac.GetComponents()["extensions"].GetSubComponentMap()["opamp"])

ad, ok := agentDescription.Load().(*protobufs.AgentDescription)
if !ok {
return false
}

var agentName, agentVersion string
identAttr := ad.IdentifyingAttributes
for _, attr := range identAttr {
switch attr.Key {
case semconv.AttributeServiceName:
agentName = attr.Value.GetStringValue()
case semconv.AttributeServiceVersion:
agentVersion = attr.Value.GetStringValue()
}
}

// By default the Collector should report its name and version
// from the component.BuildInfo struct built into the Collector
// binary.
return agentName == command && agentVersion == version
}, 10*time.Second, 250*time.Millisecond)
}

func TestSupervisorReportsEffectiveConfig(t *testing.T) {
var agentConfig atomic.Value
server := newOpAMPServer(
Expand Down
6 changes: 6 additions & 0 deletions cmd/opampsupervisor/supervisor/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ type Capabilities struct {
ReportsOwnMetrics bool `mapstructure:"reports_own_metrics"`
ReportsHealth bool `mapstructure:"reports_health"`
ReportsRemoteConfig bool `mapstructure:"reports_remote_config"`
ReportsAvailableComponents bool `mapstructure:"reports_available_components"`
}

func (c Capabilities) SupportedCapabilities() protobufs.AgentCapabilities {
Expand Down Expand Up @@ -128,6 +129,10 @@ func (c Capabilities) SupportedCapabilities() protobufs.AgentCapabilities {
supportedCapabilities |= protobufs.AgentCapabilities_AgentCapabilities_AcceptsOpAMPConnectionSettings
}

if c.ReportsAvailableComponents {
supportedCapabilities |= protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents
}

return supportedCapabilities
}

Expand Down Expand Up @@ -245,6 +250,7 @@ func DefaultSupervisor() Supervisor {
ReportsOwnMetrics: true,
ReportsHealth: true,
ReportsRemoteConfig: false,
ReportsAvailableComponents: false,
},
Storage: Storage{
Directory: defaultStorageDir,
Expand Down
4 changes: 3 additions & 1 deletion cmd/opampsupervisor/supervisor/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ func TestCapabilities_SupportedCapabilities(t *testing.T) {
ReportsOwnMetrics: true,
ReportsHealth: true,
ReportsRemoteConfig: true,
ReportsAvailableComponents: true,
},
expectedAgentCapabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsStatus |
protobufs.AgentCapabilities_AgentCapabilities_ReportsEffectiveConfig |
Expand All @@ -468,7 +469,8 @@ func TestCapabilities_SupportedCapabilities(t *testing.T) {
protobufs.AgentCapabilities_AgentCapabilities_AcceptsRemoteConfig |
protobufs.AgentCapabilities_AgentCapabilities_ReportsRemoteConfig |
protobufs.AgentCapabilities_AgentCapabilities_AcceptsRestartCommand |
protobufs.AgentCapabilities_AgentCapabilities_AcceptsOpAMPConnectionSettings,
protobufs.AgentCapabilities_AgentCapabilities_AcceptsOpAMPConnectionSettings |
protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents,
},
}

Expand Down
4 changes: 2 additions & 2 deletions cmd/opampsupervisor/supervisor/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

type flattenedSettings struct {
onMessage func(conn serverTypes.Connection, message *protobufs.AgentToServer)
onMessage func(conn serverTypes.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent
onConnecting func(request *http.Request) (shouldConnect bool, rejectStatusCode int)
onConnectionClose func(conn serverTypes.Connection)
endpoint string
Expand Down Expand Up @@ -54,7 +54,7 @@ func (fs flattenedSettings) OnConnected(_ context.Context, _ serverTypes.Connect

func (fs flattenedSettings) OnMessage(_ context.Context, conn serverTypes.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
if fs.onMessage != nil {
fs.onMessage(conn, message)
return fs.onMessage(conn, message)
}

return &protobufs.ServerToAgent{}
Expand Down
3 changes: 2 additions & 1 deletion cmd/opampsupervisor/supervisor/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ func Test_flattenedSettings_OnConnecting(t *testing.T) {
func Test_flattenedSettings_OnMessage(t *testing.T) {
onMessageFuncCalled := false
fs := flattenedSettings{
onMessage: func(_ serverTypes.Connection, _ *protobufs.AgentToServer) {
onMessage: func(_ serverTypes.Connection, _ *protobufs.AgentToServer) *protobufs.ServerToAgent {
onMessageFuncCalled = true
return &protobufs.ServerToAgent{}
},
}

Expand Down
72 changes: 61 additions & 11 deletions cmd/opampsupervisor/supervisor/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ type Supervisor struct {
// Supervisor's own config.
config config.Supervisor

agentDescription *atomic.Value
agentDescription *atomic.Value
availableComponents *atomic.Value

// Supervisor's persistent state
persistentState *persistentState
Expand Down Expand Up @@ -174,6 +175,7 @@ func NewSupervisor(logger *zap.Logger, cfg config.Supervisor) (*Supervisor, erro
cfgState: &atomic.Value{},
effectiveConfig: &atomic.Value{},
agentDescription: &atomic.Value{},
availableComponents: &atomic.Value{},
doneChan: make(chan struct{}),
customMessageToServer: make(chan *protobufs.CustomMessage, maxBufferedCustomMessages),
agentConn: &atomic.Value{},
Expand Down Expand Up @@ -300,16 +302,22 @@ func (s *Supervisor) getBootstrapInfo() (err error) {

done := make(chan error, 1)
var connected atomic.Bool
var doneReported atomic.Bool

// Start a one-shot server to get the Collector's agent description
// using the Collector's OpAMP extension.
// and available components using the Collector's OpAMP extension.
err = srv.Start(flattenedSettings{
endpoint: fmt.Sprintf("localhost:%d", s.opampServerPort),
onConnecting: func(_ *http.Request) (bool, int) {
connected.Store(true)
return true, http.StatusOK
},
onMessage: func(_ serverTypes.Connection, message *protobufs.AgentToServer) {
onMessage: func(_ serverTypes.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
response := &protobufs.ServerToAgent{}
if message.GetAvailableComponents() != nil {
s.setAvailableComponents(message.AvailableComponents)
}

if message.AgentDescription != nil {
instanceIDSeen := false
s.setAgentDescription(message.AgentDescription)
Expand All @@ -324,19 +332,47 @@ func (s *Supervisor) getBootstrapInfo() (err error) {
"the Collector's instance ID (%s) does not match with the instance ID set by the Supervisor (%s)",
attr.Value.GetStringValue(),
s.persistentState.InstanceID.String())
return
return response
}
instanceIDSeen = true
}
}

if !instanceIDSeen {
done <- errors.New("the Collector did not specify an instance ID in its AgentDescription message")
return
return response
}
}

// agent description must be defined
_, ok := s.agentDescription.Load().(*protobufs.AgentDescription)
if !ok {
return response
}

// if available components have not been reported, agent description is sufficient to continue
availableComponents, availableComponentsOk := s.availableComponents.Load().(*protobufs.AvailableComponents)
if availableComponentsOk {
// must have a full list of components if available components have been reported
if availableComponents.GetComponents() != nil {
if !doneReported.Load() {
done <- nil
doneReported.Store(true)
}
} else {
// if we don't have a full component list, ask for it
response.Flags = uint64(protobufs.ServerToAgentFlags_ServerToAgentFlags_ReportAvailableComponents)
}
return response
}

// need to only report done once, not on each message - otherwise, we get a hung thread
if !doneReported.Load() {
done <- nil
doneReported.Store(true)
}

return response
},
}.toServerSettings())
if err != nil {
Expand Down Expand Up @@ -456,6 +492,12 @@ func (s *Supervisor) startOpAMPClient() error {
return err
}

if ac, ok := s.availableComponents.Load().(*protobufs.AvailableComponents); ok && ac != nil {
if err = s.opampClient.SetAvailableComponents(ac); err != nil {
return err
}
}

s.logger.Debug("Starting OpAMP client...")
if err = s.opampClient.Start(context.Background(), settings); err != nil {
return err
Expand Down Expand Up @@ -505,7 +547,7 @@ func (s *Supervisor) startOpAMPServer() error {
return nil
}

func (s *Supervisor) handleAgentOpAMPMessage(conn serverTypes.Connection, message *protobufs.AgentToServer) {
func (s *Supervisor) handleAgentOpAMPMessage(conn serverTypes.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
s.agentConn.Store(conn)

s.logger.Debug("Received OpAMP message from the agent")
Expand Down Expand Up @@ -551,6 +593,8 @@ func (s *Supervisor) handleAgentOpAMPMessage(conn serverTypes.Connection, messag
s.logger.Debug("Received health status from agent", zap.Bool("healthy", message.Health.Healthy))
s.lastHealthFromClient = message.Health
}

return &protobufs.ServerToAgent{}
}

func (s *Supervisor) forwardCustomMessagesToServerLoop() {
Expand Down Expand Up @@ -584,6 +628,11 @@ func (s *Supervisor) setAgentDescription(ad *protobufs.AgentDescription) {
s.agentDescription.Store(ad)
}

// setAvailableComponents sets the available components of the OpAMP agent
func (s *Supervisor) setAvailableComponents(ac *protobufs.AvailableComponents) {
s.availableComponents.Store(ac)
}

// applyKeyValueOverrides merges the overrides map into the array of key value pairs.
// If a key from overrides already exists in the array of key value pairs, it is overwritten by the value from the overrides map.
// An array of KeyValue pair is returned, with each key value pair having a distinct key.
Expand Down Expand Up @@ -766,10 +815,11 @@ func (s *Supervisor) composeOpAMPExtensionConfig() []byte {

var cfg bytes.Buffer
tplVars := map[string]any{
"InstanceUid": s.persistentState.InstanceID.String(),
"SupervisorPort": s.opampServerPort,
"PID": s.pidProvider.PID(),
"PPIDPollInterval": orphanPollInterval,
"InstanceUid": s.persistentState.InstanceID.String(),
"SupervisorPort": s.opampServerPort,
"PID": s.pidProvider.PID(),
"PPIDPollInterval": orphanPollInterval,
"ReportsAvailableComponents": s.config.Capabilities.ReportsAvailableComponents,
}
err := s.opampextensionTemplate.Execute(
&cfg,
Expand Down Expand Up @@ -1020,7 +1070,7 @@ func (s *Supervisor) startAgent() (agentStartStatus, error) {
err := s.commander.Start(context.Background())
if err != nil {
s.logger.Error("Cannot start the agent", zap.Error(err))
startErr := fmt.Errorf("Cannot start the agent: %w", err)
startErr := fmt.Errorf("cannot start the agent: %w", err)
err = s.opampClient.SetHealth(&protobufs.ComponentHealth{Healthy: false, LastError: startErr.Error()})
if err != nil {
s.logger.Error("Failed to report OpAMP client health", zap.Error(err))
Expand Down
Loading