Skip to content

Commit

Permalink
Fix timestamp conversion in opamp bridge (#3582)
Browse files Browse the repository at this point in the history
Kubernetes (and Go in general) allow for signed unix timestamps
representing dates before 01-01-1970. However, OpAMP only accepts
unsigned timestamps. Until now, opamp bridge simply assumed the
conversion could always be carried out. This change instead returns
errors when either the bridge or the respective K8s resources have
negative Unix timestamps.
  • Loading branch information
swiatekm authored Dec 30, 2024
1 parent 193a64c commit 8061478
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 34 deletions.
65 changes: 55 additions & 10 deletions cmd/operator-opamp-bridge/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package agent
import (
"bytes"
"context"
"errors"
"fmt"
"strings"
"time"
Expand Down Expand Up @@ -95,10 +96,18 @@ func (agent *Agent) getHealth() *protobufs.ComponentHealth {
LastError: err.Error(),
}
}
statusTime, err := agent.getCurrentTimeUnixNano()
if err != nil {
return &protobufs.ComponentHealth{
Healthy: false,
StartTimeUnixNano: agent.startTime,
LastError: err.Error(),
}
}
return &protobufs.ComponentHealth{
Healthy: true,
StartTimeUnixNano: agent.startTime,
StatusTimeUnixNano: uint64(agent.clock.Now().UnixNano()),
StatusTimeUnixNano: statusTime,
LastError: "",
ComponentHealthMap: healthMap,
}
Expand All @@ -124,9 +133,17 @@ func (agent *Agent) generateCollectorPoolHealth() (map[string]*protobufs.Compone
for _, pod := range podMap {
isPoolHealthy = isPoolHealthy && pod.Healthy
}
podStartTime, err := timeToUnixNanoUnsigned(col.ObjectMeta.GetCreationTimestamp().Time)
if err != nil {
return nil, err
}
statusTime, err := agent.getCurrentTimeUnixNano()
if err != nil {
return nil, err
}
healthMap[key.String()] = &protobufs.ComponentHealth{
StartTimeUnixNano: uint64(col.ObjectMeta.GetCreationTimestamp().UnixNano()),
StatusTimeUnixNano: uint64(agent.clock.Now().UnixNano()),
StartTimeUnixNano: podStartTime,
StatusTimeUnixNano: statusTime,
Status: col.Status.Scale.StatusReplicas,
ComponentHealthMap: podMap,
Healthy: isPoolHealthy,
Expand Down Expand Up @@ -158,6 +175,10 @@ func (agent *Agent) getCollectorSelector(col v1beta1.OpenTelemetryCollector) map
}

func (agent *Agent) generateCollectorHealth(selectorLabels map[string]string, namespace string) (map[string]*protobufs.ComponentHealth, error) {
statusTime, err := agent.getCurrentTimeUnixNano()
if err != nil {
return nil, err
}
pods, err := agent.applier.GetCollectorPods(selectorLabels, namespace)
if err != nil {
return nil, err
Expand All @@ -169,15 +190,18 @@ func (agent *Agent) generateCollectorHealth(selectorLabels map[string]string, na
if item.Status.Phase != "Running" {
healthy = false
}
var startTime int64
var startTime uint64
if item.Status.StartTime != nil {
startTime = item.Status.StartTime.UnixNano()
startTime, err = timeToUnixNanoUnsigned(item.Status.StartTime.Time)
if err != nil {
return nil, err
}
} else {
healthy = false
}
healthMap[key.String()] = &protobufs.ComponentHealth{
StartTimeUnixNano: uint64(startTime),
StatusTimeUnixNano: uint64(agent.clock.Now().UnixNano()),
StartTimeUnixNano: startTime,
StatusTimeUnixNano: statusTime,
Status: string(item.Status.Phase),
Healthy: healthy,
}
Expand All @@ -197,7 +221,7 @@ func (agent *Agent) onConnectFailed(ctx context.Context, err error) {

// onError is called when an agent receives an error response from the server.
func (agent *Agent) onError(ctx context.Context, err *protobufs.ServerErrorResponse) {
agent.logger.Error(fmt.Errorf(err.GetErrorMessage()), "server returned an error response")
agent.logger.Error(errors.New(err.GetErrorMessage()), "server returned an error response")
}

// saveRemoteConfigStatus receives a status from the server when the server sets a remote configuration.
Expand All @@ -207,7 +231,11 @@ func (agent *Agent) saveRemoteConfigStatus(_ context.Context, status *protobufs.

// Start sets up the callbacks for the OpAMP client and begins the client's connection to the server.
func (agent *Agent) Start() error {
agent.startTime = uint64(agent.clock.Now().UnixNano())
startTime, err := agent.getCurrentTimeUnixNano()
if err != nil {
return err
}
agent.startTime = startTime
settings := types.StartSettings{
OpAMPServerURL: agent.config.Endpoint,
Header: agent.config.Headers.ToHTTPHeader(),
Expand All @@ -224,7 +252,7 @@ func (agent *Agent) Start() error {
PackagesStateProvider: nil,
Capabilities: agent.config.GetCapabilities(),
}
err := agent.opampClient.SetAgentDescription(agent.agentDescription)
err = agent.opampClient.SetAgentDescription(agent.agentDescription)
if err != nil {
return err
}
Expand Down Expand Up @@ -429,3 +457,20 @@ func (agent *Agent) onMessage(ctx context.Context, msg *types.MessageData) {
agent.initMeter(msg.OwnMetricsConnSettings)
}
}

// getCurrentTimeUnixNano returns the current time as a uint64, which the protocol expects.
func (agent *Agent) getCurrentTimeUnixNano() (uint64, error) {
// technically this could be negative if the system time is set to before 1970-01-1
// the proto demands this to be a nonnegative number, so in that case, just return 0
return timeToUnixNanoUnsigned(agent.clock.Now())
}

// timeToUnixNanoUnsigned returns the number of nanoseconds elapsed from 1970-01-01 to the given time, but returns an
// error if the value is negative. OpAMP expects these values to be non-negative.
func timeToUnixNanoUnsigned(t time.Time) (uint64, error) {
signedUnixNano := t.UnixNano()
if signedUnixNano < 0 {
return 0, fmt.Errorf("invalid system time, must be after 01-01-1970 due to OpAMP requirements: %v", t)
}
return uint64(signedUnixNano), nil
}
50 changes: 26 additions & 24 deletions cmd/operator-opamp-bridge/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,7 @@ const (
agentTestFileBatchNotAllowedName = "testdata/agentbatchnotallowed.yaml"
agentTestFileNoProcessorsAllowedName = "testdata/agentnoprocessorsallowed.yaml"

// collectorStartTime is set to the result of a zero'd out creation timestamp
// read more here https://github.com/open-telemetry/opentelemetry-go/issues/4268
// we could attempt to hack the creation timestamp, but this is a constant and far easier.
collectorStartTime = uint64(11651379494838206464)
collectorStartTime = uint64(0)
)

var (
Expand All @@ -78,8 +75,9 @@ var (
updatedYamlConfigHash = getConfigHash(testCollectorKey, collectorUpdatedFile)
otherUpdatedYamlConfigHash = getConfigHash(otherCollectorKey, collectorUpdatedFile)

podTime = metav1.NewTime(time.UnixMicro(1704748549000000))
mockPodList = &v1.PodList{
podTime = metav1.NewTime(time.Unix(0, 0))
podTimeUnsigned, _ = timeToUnixNanoUnsigned(podTime.Time)
mockPodList = &v1.PodList{
TypeMeta: metav1.TypeMeta{
Kind: "PodList",
APIVersion: "v1",
Expand All @@ -95,6 +93,7 @@ var (
"app.kubernetes.io/part-of": "opentelemetry",
"app.kubernetes.io/component": "opentelemetry-collector",
},
CreationTimestamp: podTime,
},
Spec: v1.PodSpec{},
Status: v1.PodStatus{
Expand All @@ -119,6 +118,7 @@ var (
"app.kubernetes.io/part-of": "opentelemetry",
"app.kubernetes.io/component": "opentelemetry-collector",
},
CreationTimestamp: podTime,
},
Spec: v1.PodSpec{},
Status: v1.PodStatus{
Expand Down Expand Up @@ -215,6 +215,8 @@ func getFakeApplier(t *testing.T, conf *config.Config, lists ...runtimeClient.Ob

func TestAgent_getHealth(t *testing.T) {
fakeClock := testingclock.NewFakeClock(time.Now())
startTime, err := timeToUnixNanoUnsigned(fakeClock.Now())
require.NoError(t, err)
type fields struct {
configFile string
}
Expand Down Expand Up @@ -244,10 +246,10 @@ func TestAgent_getHealth(t *testing.T) {
want: []*protobufs.ComponentHealth{
{
Healthy: true,
StartTimeUnixNano: uint64(fakeClock.Now().UnixNano()),
StartTimeUnixNano: startTime,
LastError: "",
Status: "",
StatusTimeUnixNano: uint64(fakeClock.Now().UnixNano()),
StatusTimeUnixNano: startTime,
ComponentHealthMap: map[string]*protobufs.ComponentHealth{},
},
},
Expand All @@ -269,15 +271,15 @@ func TestAgent_getHealth(t *testing.T) {
want: []*protobufs.ComponentHealth{
{
Healthy: true,
StartTimeUnixNano: uint64(fakeClock.Now().UnixNano()),
StatusTimeUnixNano: uint64(fakeClock.Now().UnixNano()),
StartTimeUnixNano: startTime,
StatusTimeUnixNano: startTime,
ComponentHealthMap: map[string]*protobufs.ComponentHealth{
"testnamespace/collector": {
Healthy: true,
StartTimeUnixNano: collectorStartTime,
LastError: "",
Status: "",
StatusTimeUnixNano: uint64(fakeClock.Now().UnixNano()),
StatusTimeUnixNano: startTime,
ComponentHealthMap: map[string]*protobufs.ComponentHealth{},
},
},
Expand All @@ -302,23 +304,23 @@ func TestAgent_getHealth(t *testing.T) {
want: []*protobufs.ComponentHealth{
{
Healthy: true,
StartTimeUnixNano: uint64(fakeClock.Now().UnixNano()),
StatusTimeUnixNano: uint64(fakeClock.Now().UnixNano()),
StartTimeUnixNano: startTime,
StatusTimeUnixNano: startTime,
ComponentHealthMap: map[string]*protobufs.ComponentHealth{
"testnamespace/collector": {
Healthy: true,
StartTimeUnixNano: collectorStartTime,
LastError: "",
Status: "",
StatusTimeUnixNano: uint64(fakeClock.Now().UnixNano()),
StatusTimeUnixNano: startTime,
ComponentHealthMap: map[string]*protobufs.ComponentHealth{},
},
"testnamespace/other": {
Healthy: true,
StartTimeUnixNano: collectorStartTime,
LastError: "",
Status: "",
StatusTimeUnixNano: uint64(fakeClock.Now().UnixNano()),
StatusTimeUnixNano: startTime,
ComponentHealthMap: map[string]*protobufs.ComponentHealth{},
},
},
Expand All @@ -342,21 +344,21 @@ func TestAgent_getHealth(t *testing.T) {
want: []*protobufs.ComponentHealth{
{
Healthy: true,
StartTimeUnixNano: uint64(fakeClock.Now().UnixNano()),
StatusTimeUnixNano: uint64(fakeClock.Now().UnixNano()),
StartTimeUnixNano: startTime,
StatusTimeUnixNano: startTime,
ComponentHealthMap: map[string]*protobufs.ComponentHealth{
"other/third": {
Healthy: true,
StartTimeUnixNano: collectorStartTime,
LastError: "",
Status: "",
StatusTimeUnixNano: uint64(fakeClock.Now().UnixNano()),
StatusTimeUnixNano: startTime,
ComponentHealthMap: map[string]*protobufs.ComponentHealth{
otherCollectorName + "/" + thirdCollectorName + "-1": {
Healthy: true,
Status: "Running",
StatusTimeUnixNano: uint64(fakeClock.Now().UnixNano()),
StartTimeUnixNano: uint64(podTime.UnixNano()),
StatusTimeUnixNano: startTime,
StartTimeUnixNano: podTimeUnsigned,
},
},
},
Expand All @@ -381,20 +383,20 @@ func TestAgent_getHealth(t *testing.T) {
want: []*protobufs.ComponentHealth{
{
Healthy: true,
StartTimeUnixNano: uint64(fakeClock.Now().UnixNano()),
StatusTimeUnixNano: uint64(fakeClock.Now().UnixNano()),
StartTimeUnixNano: startTime,
StatusTimeUnixNano: startTime,
ComponentHealthMap: map[string]*protobufs.ComponentHealth{
"other/third": {
Healthy: false, // we're working with mocks so the status will never be reconciled.
StartTimeUnixNano: collectorStartTime,
LastError: "",
Status: "",
StatusTimeUnixNano: uint64(fakeClock.Now().UnixNano()),
StatusTimeUnixNano: startTime,
ComponentHealthMap: map[string]*protobufs.ComponentHealth{
otherCollectorName + "/" + thirdCollectorName + "-1": {
Healthy: false,
Status: "Running",
StatusTimeUnixNano: uint64(fakeClock.Now().UnixNano()),
StatusTimeUnixNano: startTime,
StartTimeUnixNano: uint64(0),
},
},
Expand Down
1 change: 1 addition & 0 deletions cmd/operator-opamp-bridge/agent/testdata/basic.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ metadata:
name: simplest
labels:
"opentelemetry.io/opamp-managed": "true"
creationTimestamp: "1970-01-01T00:00:00Z"
spec:
config:
receivers:
Expand Down
1 change: 1 addition & 0 deletions cmd/operator-opamp-bridge/agent/testdata/updated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ metadata:
name: simplest
labels:
"opentelemetry.io/opamp-managed": "test-bridge"
creationTimestamp: "1970-01-01T00:00:00Z"
spec:
config:
receivers:
Expand Down

0 comments on commit 8061478

Please sign in to comment.