Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ require (
github.com/spf13/pflag v1.0.6
github.com/tsenart/vegeta/v12 v12.12.0
go.opencensus.io v0.24.0
go.opentelemetry.io/contrib/instrumentation/runtime v0.62.0
go.opentelemetry.io/otel v1.37.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.37.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.37.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,8 @@ go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0=
go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo=
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
go.opentelemetry.io/contrib/instrumentation/runtime v0.62.0 h1:ZIt0ya9/y4WyRIzfLC8hQRRsWg0J9M9GyaGtIMiElZI=
go.opentelemetry.io/contrib/instrumentation/runtime v0.62.0/go.mod h1:F1aJ9VuiKWOlWwKdTYDUp1aoS0HzQxg38/VLxKmhm5U=
go.opentelemetry.io/otel v1.37.0 h1:9zhNfelUvx0KBfu/gb+ZgeAfAgtWrfHJZcAqFC228wQ=
go.opentelemetry.io/otel v1.37.0/go.mod h1:ehE/umFRLnuLa/vSccNq9oS1ErUlkkK71gMcN34UG8I=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.37.0 h1:zG8GlgXCJQd5BU98C0hZnBbElszTmUgCNCfYneaDL0A=
Expand Down
167 changes: 101 additions & 66 deletions injection/sharedmain/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ import (
"strings"
"time"

"go.opentelemetry.io/contrib/instrumentation/runtime"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/trace"

"go.uber.org/automaxprocs/maxprocs" // automatically set GOMAXPROCS based on cgroups
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
Expand All @@ -41,14 +46,18 @@ import (
"k8s.io/client-go/rest"

kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/configmap"
cminformer "knative.dev/pkg/configmap/informer"
"knative.dev/pkg/controller"
"knative.dev/pkg/injection"
"knative.dev/pkg/leaderelection"
"knative.dev/pkg/logging"
"knative.dev/pkg/logging/logkey"
"knative.dev/pkg/metrics"
"knative.dev/pkg/profiling"
"knative.dev/pkg/observability"
o11yconfigmap "knative.dev/pkg/observability/configmap"
"knative.dev/pkg/observability/metrics"
"knative.dev/pkg/observability/resource"
"knative.dev/pkg/observability/tracing"
"knative.dev/pkg/reconciler"
"knative.dev/pkg/signals"
"knative.dev/pkg/system"
Expand Down Expand Up @@ -108,19 +117,21 @@ func GetLeaderElectionConfig(ctx context.Context) (*leaderelection.Config, error
// 1. provided context,
// 2. reading from the API server,
// 3. defaults (if not found).
func GetObservabilityConfig(ctx context.Context) (*metrics.ObservabilityConfig, error) {
if cfg := metrics.GetObservabilityConfig(ctx); cfg != nil {
func GetObservabilityConfig(ctx context.Context) (*observability.Config, error) {
if cfg := observability.GetConfig(ctx); cfg != nil {
return cfg, nil
}

observabilityConfigMap, err := kubeclient.Get(ctx).CoreV1().ConfigMaps(system.Namespace()).Get(ctx, metrics.ConfigMapName(), metav1.GetOptions{})
client := kubeclient.Get(ctx).CoreV1().ConfigMaps(system.Namespace())
cm, err := client.Get(ctx, o11yconfigmap.Name(), metav1.GetOptions{})

if apierrors.IsNotFound(err) {
return metrics.NewObservabilityConfigFromConfigMap(nil)
}
if err != nil {
return observability.DefaultConfig(), nil
} else if err != nil {
return nil, err
}
return metrics.NewObservabilityConfigFromConfigMap(observabilityConfigMap)

return o11yconfigmap.Parse(cm)
}

// EnableInjectionOrDie enables Knative Injection and starts the informers.
Expand Down Expand Up @@ -229,8 +240,6 @@ func MainWithConfig(ctx context.Context, component string, cfg *rest.Config, cto
log.Printf("Registering %d informers", len(injection.Default.GetInformers()))
log.Printf("Registering %d controllers", len(ctors))

metrics.MemStatsOrDie(ctx)

// Respect user provided settings, but if omitted customize the default behavior.
if cfg.QPS == 0 {
// Adjust our client's rate limits based on the number of controllers we are running.
Expand All @@ -243,14 +252,14 @@ func MainWithConfig(ctx context.Context, component string, cfg *rest.Config, cto
ctx, startInformers := injection.EnableInjectionOrDie(ctx, cfg)

logger, atomicLevel := SetupLoggerOrDie(ctx, component)
defer flush(logger)
defer logger.Sync()
ctx = logging.WithLogger(ctx, logger)

// Override client-go's warning handler to give us nicely printed warnings.
rest.SetDefaultWarningHandler(&logging.WarningHandler{Logger: logger})

profilingHandler := profiling.NewHandler(logger, false)
profilingServer := profiling.NewServer(profilingHandler)
pprof := newProfilingServer(logger.Named("pprof"))
defer pprof.Shutdown(context.Background())

CheckK8sClientMinimumVersionOrDie(ctx, logger)
cmw := SetupConfigMapWatchOrDie(ctx, logger)
Expand All @@ -267,35 +276,37 @@ func MainWithConfig(ctx context.Context, component string, cfg *rest.Config, cto
leaderElectionConfig.GetComponentConfig(component))
}

SetupObservabilityOrDie(ctx, component, logger, profilingHandler)
mp, tp := SetupObservabilityOrDie(ctx, component, logger, pprof)
defer func() {
if err := mp.Shutdown(context.Background()); err != nil {
logger.Errorw("Error flushing metrics", zap.Error(err))
}
}()
defer func() {
if err := tp.Shutdown(context.Background()); err != nil {
logger.Errorw("Error flushing traces", zap.Error(err))
}
}()

controllers, webhooks := ControllersAndWebhooksFromCtors(ctx, cmw, ctors...)
WatchLoggingConfigOrDie(ctx, cmw, logger, atomicLevel, component)
WatchObservabilityConfigOrDie(ctx, cmw, profilingHandler, logger, component)
WatchObservabilityConfigOrDie(ctx, cmw, pprof, logger, component)

eg, egCtx := errgroup.WithContext(ctx)
eg.Go(profilingServer.ListenAndServe)
eg.Go(pprof.ListenAndServe)

// Many of the webhooks rely on configuration, e.g. configurable defaults, feature flags.
// So make sure that we have synchronized our configuration state before launching the
// webhooks, so that things are properly initialized.
logger.Info("Starting configuration manager...")
logger.Info("Starting configmap watcher...")
if err := cmw.Start(ctx.Done()); err != nil {
logger.Fatalw("Failed to start configuration manager", zap.Error(err))
logger.Fatalw("Failed to start configmap watcher", zap.Error(err))
}

// If we have one or more admission controllers, then start the webhook
// and pass them in.
var wh *webhook.Webhook
if len(webhooks) > 0 {
// Register webhook metrics
opts := webhook.GetOptions(ctx)
if opts != nil {
webhook.RegisterMetrics(opts.StatsReporterOptions...)
} else {
webhook.RegisterMetrics()
}

wh, err = webhook.New(ctx, webhooks)
if err != nil {
logger.Fatalw("Failed to create webhook", zap.Error(err))
Expand Down Expand Up @@ -328,7 +339,6 @@ func MainWithConfig(ctx context.Context, component string, cfg *rest.Config, cto
// returns an error.
<-egCtx.Done()

profilingServer.Shutdown(context.Background())
// Don't forward ErrServerClosed as that indicates we're already shutting down.
if err := eg.Wait(); err != nil && !errors.Is(err, http.ErrServerClosed) {
logger.Errorw("Error while running server", zap.Error(err))
Expand All @@ -346,11 +356,6 @@ func healthProbesDisabled(ctx context.Context) bool {
return ctx.Value(healthProbesDisabledKey{}) != nil
}

func flush(logger *zap.SugaredLogger) {
logger.Sync()
metrics.FlushExporter()
}

// SetupLoggerOrDie sets up the logger using the config from the given context
// and returns a logger and atomic level, or dies by calling log.Fatalf.
func SetupLoggerOrDie(ctx context.Context, component string) (*zap.SugaredLogger, zap.AtomicLevel) {
Expand All @@ -360,10 +365,7 @@ func SetupLoggerOrDie(ctx context.Context, component string) (*zap.SugaredLogger
}
l, level := logging.NewLoggerFromConfig(loggingConfig, component)

// If PodName is injected into the env vars, set it on the logger.
// This is needed for HA components to distinguish logs from different
// pods.
if pn := os.Getenv("POD_NAME"); pn != "" {
if pn := system.PodName(); pn != "" {
l = l.With(zap.String(logkey.Pod, pn))
}

Expand All @@ -372,14 +374,53 @@ func SetupLoggerOrDie(ctx context.Context, component string) (*zap.SugaredLogger

// SetupObservabilityOrDie sets up the observability using the config from the given context
// or dies by calling log.Fatalf.
func SetupObservabilityOrDie(ctx context.Context, component string, logger *zap.SugaredLogger, profilingHandler *profiling.Handler) {
observabilityConfig, err := GetObservabilityConfig(ctx)
func SetupObservabilityOrDie(
ctx context.Context,
component string,
logger *zap.SugaredLogger,
pprof *pprofServer,
) (*metrics.MeterProvider, *tracing.TracerProvider) {
cfg, err := GetObservabilityConfig(ctx)
if err != nil {
logger.Fatal("Error loading observability configuration: ", err)
}
observabilityConfigMap := observabilityConfig.GetConfigMap()
metrics.ConfigMapWatcher(ctx, component, SecretFetcher(ctx), logger)(&observabilityConfigMap)
profilingHandler.UpdateFromConfigMap(&observabilityConfigMap)

pprof.UpdateFromConfig(cfg.Runtime)

resource := resource.Default(component)

meterProvider, err := metrics.NewMeterProvider(
ctx,
cfg.Metrics,
metric.WithView(OTelViews(ctx)...),
metric.WithResource(resource),
)
if err != nil {
logger.Fatalw("Failed to setup meter provider", zap.Error(err))
}

otel.SetMeterProvider(meterProvider)

err = runtime.Start(
runtime.WithMinimumReadMemStatsInterval(cfg.Runtime.ExportInterval),
)
if err != nil {
logger.Fatalw("Failed to start runtime metrics", zap.Error(err))
}

tracerProvider, err := tracing.NewTracerProvider(
ctx,
cfg.Tracing,
trace.WithResource(resource),
)
if err != nil {
logger.Fatalw("Failed to setup trace provider", zap.Error(err))
}

otel.SetTextMapPropagator(tracing.DefaultTextMapPropagator())
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like it should be part of tracing.NewTracerProvider, even though it's actually a global spooky-action-at-a-distance manipulation. WDYT?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tracing.NewTracerProvider

That doesn't actually set the global tracer in NewTracerProvider. We do that in sharedmain after setting the SetTextMapPropagator.

I prefer the observability package to not set any OTel globals.

otel.SetTracerProvider(tracerProvider)

return meterProvider, tracerProvider
}

// CheckK8sClientMinimumVersionOrDie checks that the hosting Kubernetes cluster
Expand Down Expand Up @@ -424,30 +465,24 @@ func WatchLoggingConfigOrDie(ctx context.Context, cmw *cminformer.InformedWatche
// WatchObservabilityConfigOrDie establishes a watch of the observability config
// or dies by calling log.Fatalw. Note, if the config does not exist, it will be
// defaulted and this method will not die.
func WatchObservabilityConfigOrDie(ctx context.Context, cmw *cminformer.InformedWatcher, profilingHandler *profiling.Handler, logger *zap.SugaredLogger, component string) {
if _, err := kubeclient.Get(ctx).CoreV1().ConfigMaps(system.Namespace()).Get(ctx, metrics.ConfigMapName(),
metav1.GetOptions{}); err == nil {
cmw.Watch(metrics.ConfigMapName(),
metrics.ConfigMapWatcher(ctx, component, SecretFetcher(ctx), logger),
profilingHandler.UpdateFromConfigMap)
} else if !apierrors.IsNotFound(err) {
logger.Fatalw("Error reading ConfigMap "+metrics.ConfigMapName(), zap.Error(err))
func WatchObservabilityConfigOrDie(
ctx context.Context,
cmw *cminformer.InformedWatcher,
pprof *pprofServer,
logger *zap.SugaredLogger,
component string,
) {
cmName := o11yconfigmap.Name()
client := kubeclient.Get(ctx).CoreV1().ConfigMaps(system.Namespace())

observers := []configmap.Observer{
pprof.UpdateFromConfigMap,
}
}

// SecretFetcher provides a helper function to fetch individual Kubernetes
// Secrets (for example, a key for client-side TLS). Note that this is not
// intended for high-volume usage; the current use is when establishing a
// metrics client connection in WatchObservabilityConfigOrDie.
func SecretFetcher(ctx context.Context) metrics.SecretFetcher {
// NOTE: Do not use secrets.Get(ctx) here to get a SecretLister, as it will register
// a *global* SecretInformer and require cluster-level `secrets.list` permission,
// even if you scope down the Lister to a given namespace after requesting it. Instead,
// we package up a function from kubeclient.
// TODO(evankanderson): If this direct request to the apiserver on each TLS connection
// to the opencensus agent is too much load, switch to a cached Secret.
return func(name string) (*corev1.Secret, error) {
return kubeclient.Get(ctx).CoreV1().Secrets(system.Namespace()).Get(ctx, name, metav1.GetOptions{})
if _, err := client.Get(ctx, cmName, metav1.GetOptions{}); err == nil {
cmw.Watch(cmName, observers...)
} else if !apierrors.IsNotFound(err) {
logger.Fatalw("Error reading ConfigMap "+cmName, zap.Error(err))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm assuming we don't have a way to notice if the ConfigMap shows up later?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file could use a cleanup - I've just changed the 'Watch' here but the diff shows a larger change.

Generally, I think the controllers should halt and wait if a config map is not present. I could imagine a split brain scenario occurring.

I'd rather not make those edits in this PR.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fine. I'm slighly worried about "halt and catch fire" as a behavior when a configmap we ship has been removed. The idea of shipping them has mostly been about documentation, not about requiring them to have specific contents (or even be present) for things to function.

}
}

Expand All @@ -456,13 +491,13 @@ func SecretFetcher(ctx context.Context) metrics.SecretFetcher {
func ControllersAndWebhooksFromCtors(ctx context.Context,
cmw *cminformer.InformedWatcher,
ctors ...injection.ControllerConstructor,
) ([]*controller.Impl, []interface{}) {
) ([]*controller.Impl, []any) {
// Check whether the context has been infused with a leader elector builder.
// If it has, then every reconciler we plan to start MUST implement LeaderAware.
leEnabled := leaderelection.HasLeaderElection(ctx)

controllers := make([]*controller.Impl, 0, len(ctors))
webhooks := make([]interface{}, 0)
webhooks := make([]any, 0)
for _, cf := range ctors {
ctrl := cf(ctx, cmw)
controllers = append(controllers, ctrl)
Expand Down
17 changes: 6 additions & 11 deletions injection/sharedmain/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"knative.dev/pkg/injection"
"knative.dev/pkg/leaderelection"
"knative.dev/pkg/logging"
"knative.dev/pkg/metrics"
"knative.dev/pkg/observability"
)

func TestEnabledControllers(t *testing.T) {
Expand Down Expand Up @@ -106,17 +106,12 @@ func TestWithLeaderElectionConfig(t *testing.T) {
}

func TestWithObservabilityConfig(t *testing.T) {
want := &metrics.ObservabilityConfig{
EnableVarLogCollection: false,
LoggingURLTemplate: "url-template",
RequestLogTemplate: "log-template",
EnableProbeRequestLog: true,
RequestMetricsBackend: "prometheus",
EnableProfiling: true,
EnableRequestLog: false,
MetricsCollectorAddress: "localhost:9090",
want := &observability.Config{
Tracing: observability.TracingConfig{
Protocol: "some-protocol",
},
}
ctx := metrics.WithConfig(context.Background(), want)
ctx := observability.WithConfig(context.Background(), want)

got, err := GetObservabilityConfig(ctx)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,20 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package globalviews
package sharedmain

import (
"maps"
"slices"
"context"

"go.opentelemetry.io/otel/sdk/metric"
)

var globalViews map[string][]metric.View = make(map[string][]metric.View)
type otelviews struct{}

// Register should be called during package init
func Register(pkg string, v ...metric.View) {
views := globalViews[pkg]
globalViews[pkg] = append(views, v...)
func WithOTelViews(ctx context.Context, views ...metric.View) context.Context {
return context.WithValue(ctx, otelviews{}, views)
}

func GetPackageViews(pkg string) []metric.View {
return globalViews[pkg]
}

func GetAllViews() []metric.View {
list := slices.Collect(maps.Values(globalViews))
return slices.Concat(list...)
func OTelViews(ctx context.Context) []metric.View {
return ctx.Value(otelviews{}).([]metric.View)
}
Loading
Loading