Skip to content

Commit cd4858d

Browse files
committed
WIP: basic otel
1 parent dfc4717 commit cd4858d

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+5256
-70
lines changed

Diff for: Makefile

+1
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ verify-codegen:
169169

170170
.PHONY: protobuf
171171
protobuf:
172+
protoc --go_out=. --go_opt=paths=source_relative pkg/otel/otlptracefile/pb/file.proto
172173
cd ${GOPATH_1ST}/src; protoc --gogo_out=. k8s.io/kops/protokube/pkg/gossip/mesh/mesh.proto
173174

174175
.PHONY: hooks

Diff for: cmd/kops/create_cluster.go

+3
Original file line numberDiff line numberDiff line change
@@ -474,6 +474,9 @@ func NewCmdCreateCluster(f *util.Factory, out io.Writer) *cobra.Command {
474474
}
475475

476476
func RunCreateCluster(ctx context.Context, f *util.Factory, out io.Writer, c *CreateClusterOptions) error {
477+
ctx, span := tracer.Start(ctx, "RunCreateCluster")
478+
defer span.End()
479+
477480
isDryrun := false
478481
// direct requires --yes (others do not, because they don't make changes)
479482
targetName := c.Target

Diff for: cmd/kops/doc.go

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package main
2+
3+
import "go.opentelemetry.io/otel"
4+
5+
var tracer = otel.Tracer("k8s.io/kops/cmd/kops")

Diff for: cmd/kops/main.go

+31-2
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,38 @@ limitations under the License.
1616

1717
package main // import "k8s.io/kops/cmd/kops"
1818

19-
import "context"
19+
import (
20+
"context"
21+
"fmt"
22+
"os"
23+
)
2024

2125
func main() {
2226
ctx := context.Background()
23-
Execute(ctx)
27+
if err := run(ctx); err != nil {
28+
os.Exit(1)
29+
}
30+
}
31+
32+
func run(ctx context.Context) error {
33+
// Set up OpenTelemetry.
34+
serviceName := "dice"
35+
serviceVersion := "0.1.0"
36+
otelShutdown, err := setupOTelSDK(ctx, serviceName, serviceVersion)
37+
if err != nil {
38+
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
39+
return err
40+
}
41+
// Handle shutdown properly so nothing leaks.
42+
defer func() {
43+
if err := otelShutdown(context.Background()); err != nil {
44+
fmt.Fprintf(os.Stderr, "error shutting down otel: %v\n", err)
45+
}
46+
}()
47+
48+
if err := Execute(ctx); err != nil {
49+
return err
50+
}
51+
52+
return nil
2453
}

Diff for: cmd/kops/otel.go

+137
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"net/http"
8+
"os"
9+
"time"
10+
11+
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
12+
"go.opentelemetry.io/otel"
13+
"go.opentelemetry.io/otel/exporters/stdout/stdoutmetric"
14+
"go.opentelemetry.io/otel/sdk/metric"
15+
"go.opentelemetry.io/otel/sdk/resource"
16+
"go.opentelemetry.io/otel/sdk/trace"
17+
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
18+
19+
"k8s.io/kops/pkg/otel/otlptracefile"
20+
)
21+
22+
// setupOTelSDK bootstraps the OpenTelemetry pipeline.
23+
// If it does not return an error, make sure to call shutdown for proper cleanup.
24+
func setupOTelSDK(ctx context.Context, serviceName, serviceVersion string) (shutdown func(context.Context) error, err error) {
25+
var shutdownFuncs []func(context.Context) error
26+
27+
// shutdown calls cleanup functions registered via shutdownFuncs.
28+
// The errors from the calls are joined.
29+
// Each registered cleanup will be invoked once.
30+
shutdown = func(ctx context.Context) error {
31+
var err error
32+
for _, fn := range shutdownFuncs {
33+
err = errors.Join(err, fn(ctx))
34+
}
35+
shutdownFuncs = nil
36+
return err
37+
}
38+
39+
// handleErr calls shutdown for cleanup and makes sure that all errors are returned.
40+
handleErr := func(inErr error) {
41+
err = errors.Join(inErr, shutdown(ctx))
42+
}
43+
44+
// Setup resource.
45+
res, err := newResource(serviceName, serviceVersion)
46+
if err != nil {
47+
handleErr(err)
48+
return
49+
}
50+
51+
// Setup trace provider.
52+
tracerProvider, err := newTraceProvider(ctx, res)
53+
if err != nil {
54+
handleErr(err)
55+
return
56+
}
57+
shutdownFuncs = append(shutdownFuncs, tracerProvider.Shutdown)
58+
otel.SetTracerProvider(tracerProvider)
59+
60+
// // Setup meter provider.
61+
// meterProvider, err := newMeterProvider(res)
62+
// if err != nil {
63+
// handleErr(err)
64+
// return
65+
// }
66+
// shutdownFuncs = append(shutdownFuncs, meterProvider.Shutdown)
67+
// otel.SetMeterProvider(meterProvider)
68+
69+
http.DefaultClient = &http.Client{
70+
Transport: otelhttp.NewTransport(http.DefaultTransport),
71+
}
72+
73+
return
74+
}
75+
76+
func newResource(serviceName, serviceVersion string) (*resource.Resource, error) {
77+
return resource.Merge(resource.Default(),
78+
resource.NewWithAttributes(semconv.SchemaURL,
79+
semconv.ServiceName(serviceName),
80+
semconv.ServiceVersion(serviceVersion),
81+
))
82+
}
83+
84+
func newTraceProvider(ctx context.Context, res *resource.Resource) (*trace.TracerProvider, error) {
85+
s := os.Getenv("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT")
86+
if s == "" {
87+
s = os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT")
88+
}
89+
if s != "" {
90+
return nil, fmt.Errorf("otlp not yet implemented")
91+
}
92+
93+
var traceExporter trace.SpanExporter
94+
95+
s = os.Getenv("OTEL_EXPORTER_OTLP_TRACES_FILE")
96+
if s == "" {
97+
s = os.Getenv("OTEL_EXPORTER_OTLP_FILE")
98+
}
99+
if s != "" {
100+
x, err := otlptracefile.New(ctx, otlptracefile.WithPath(s))
101+
if err != nil {
102+
return nil, err
103+
}
104+
traceExporter = x
105+
}
106+
107+
traceProvider := trace.NewTracerProvider(
108+
trace.WithBatcher(traceExporter,
109+
// Default is 5s. Set to 1s for demonstrative purposes.
110+
trace.WithBatchTimeout(time.Second)),
111+
trace.WithResource(res),
112+
)
113+
return traceProvider, nil
114+
115+
// traceProvider := trace.NewTracerProvider(
116+
// trace.WithBatcher(traceExporter,
117+
// // Default is 5s. Set to 1s for demonstrative purposes.
118+
// trace.WithBatchTimeout(time.Second)),
119+
// trace.WithResource(res),
120+
// )
121+
// return traceProvider, nil
122+
}
123+
124+
func newMeterProvider(res *resource.Resource) (*metric.MeterProvider, error) {
125+
metricExporter, err := stdoutmetric.New()
126+
if err != nil {
127+
return nil, err
128+
}
129+
130+
meterProvider := metric.NewMeterProvider(
131+
metric.WithResource(res),
132+
metric.WithReader(metric.NewPeriodicReader(metricExporter,
133+
// Default is 1m. Set to 3s for demonstrative purposes.
134+
metric.WithInterval(3*time.Second))),
135+
)
136+
return meterProvider, nil
137+
}

Diff for: cmd/kops/root.go

+7-4
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ import (
2828

2929
"github.com/spf13/cobra"
3030
"github.com/spf13/viper"
31+
"go.opentelemetry.io/otel/attribute"
32+
"go.opentelemetry.io/otel/trace"
3133
"k8s.io/apimachinery/pkg/util/validation/field"
3234
"k8s.io/client-go/tools/clientcmd"
3335
"k8s.io/client-go/util/homedir"
@@ -89,12 +91,13 @@ var rootCommand = RootCmd{
8991
},
9092
}
9193

92-
func Execute(ctx context.Context) {
94+
func Execute(ctx context.Context) error {
95+
ctx, span := tracer.Start(ctx, "kops", trace.WithAttributes(attribute.StringSlice("args", os.Args)))
96+
defer span.End()
97+
9398
goflag.Set("logtostderr", "true")
9499
goflag.CommandLine.Parse([]string{})
95-
if err := rootCommand.cobraCommand.ExecuteContext(ctx); err != nil {
96-
os.Exit(1)
97-
}
100+
return rootCommand.cobraCommand.ExecuteContext(ctx)
98101
}
99102

100103
func init() {

Diff for: nodeup/pkg/bootstrap/install.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func (i *Installation) Run() error {
5959
return fmt.Errorf("error building context: %v", err)
6060
}
6161

62-
err = context.RunTasks(i.RunTasksOptions)
62+
err = context.RunTasks(ctx, i.RunTasksOptions)
6363
if err != nil {
6464
return fmt.Errorf("error running tasks: %v", err)
6565
}

Diff for: pkg/client/simple/api/clientset.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ func (c *RESTClientset) DeleteCluster(ctx context.Context, cluster *kops.Cluster
131131
return err
132132
}
133133

134-
err = vfsclientset.DeleteAllClusterState(configBase)
134+
err = vfsclientset.DeleteAllClusterState(ctx, configBase)
135135
if err != nil {
136136
return err
137137
}

Diff for: pkg/client/simple/vfsclientset/clientset.go

+12-5
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"fmt"
2222
"strings"
2323

24+
"go.opentelemetry.io/otel"
2425
"k8s.io/klog/v2"
2526

2627
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -50,7 +51,10 @@ func (c *VFSClientset) clusters() *ClusterVFS {
5051

5152
// GetCluster implements the GetCluster method of simple.Clientset for a VFS-backed state store
5253
func (c *VFSClientset) GetCluster(ctx context.Context, name string) (*kops.Cluster, error) {
53-
return c.clusters().Get(name, metav1.GetOptions{})
54+
ctx, span := otel.Tracer("rolldice").Start(ctx, "VFSClientset::GetCluster")
55+
defer span.End()
56+
57+
return c.clusters().Get(ctx, name, metav1.GetOptions{})
5458
}
5559

5660
// UpdateCluster implements the UpdateCluster method of simple.Clientset for a VFS-backed state store
@@ -65,7 +69,10 @@ func (c *VFSClientset) CreateCluster(ctx context.Context, cluster *kops.Cluster)
6569

6670
// ListClusters implements the ListClusters method of simple.Clientset for a VFS-backed state store
6771
func (c *VFSClientset) ListClusters(ctx context.Context, options metav1.ListOptions) (*kops.ClusterList, error) {
68-
return c.clusters().List(options)
72+
ctx, span := otel.Tracer("rolldice").Start(ctx, "VFSClientset::ListClusters")
73+
defer span.End()
74+
75+
return c.clusters().List(ctx, options)
6976
}
7077

7178
// ConfigBaseFor implements the ConfigBaseFor method of simple.Clientset for a VFS-backed state store
@@ -131,8 +138,8 @@ func (c *VFSClientset) pkiPath(cluster *kops.Cluster) (vfs.Path, error) {
131138
}
132139
}
133140

134-
func DeleteAllClusterState(basePath vfs.Path) error {
135-
paths, err := basePath.ReadTree()
141+
func DeleteAllClusterState(ctx context.Context, basePath vfs.Path) error {
142+
paths, err := basePath.ReadTree(ctx)
136143
if err != nil {
137144
return fmt.Errorf("error listing files in state store: %v", err)
138145
}
@@ -237,7 +244,7 @@ func (c *VFSClientset) DeleteCluster(ctx context.Context, cluster *kops.Cluster)
237244
return err
238245
}
239246

240-
return DeleteAllClusterState(configBase)
247+
return DeleteAllClusterState(ctx, configBase)
241248
}
242249

243250
func NewVFSClientset(vfsContext *vfs.VFSContext, basePath vfs.Path) simple.Clientset {

Diff for: pkg/client/simple/vfsclientset/cluster.go

+6-10
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,7 @@ func newClusterVFS(vfsContext *vfs.VFSContext, basePath vfs.Path) *ClusterVFS {
4747
return c
4848
}
4949

50-
func (c *ClusterVFS) Get(name string, options metav1.GetOptions) (*api.Cluster, error) {
51-
ctx := context.TODO()
52-
50+
func (c *ClusterVFS) Get(ctx context.Context, name string, options metav1.GetOptions) (*api.Cluster, error) {
5351
if options.ResourceVersion != "" {
5452
return nil, fmt.Errorf("ResourceVersion not supported in ClusterVFS::Get")
5553
}
@@ -72,10 +70,8 @@ func (c *ClusterVFS) configBase(clusterName string) (vfs.Path, error) {
7270
return configPath, nil
7371
}
7472

75-
func (c *ClusterVFS) List(options metav1.ListOptions) (*api.ClusterList, error) {
76-
ctx := context.TODO()
77-
78-
names, err := c.listNames()
73+
func (c *ClusterVFS) List(ctx context.Context, options metav1.ListOptions) (*api.ClusterList, error) {
74+
names, err := c.listNames(ctx)
7975
if err != nil {
8076
return nil, err
8177
}
@@ -134,7 +130,7 @@ func (r *ClusterVFS) Update(c *api.Cluster, status *api.ClusterStatus) (*api.Clu
134130
return nil, field.Required(field.NewPath("objectMeta", "name"), "clusterName is required")
135131
}
136132

137-
old, err := r.Get(clusterName, metav1.GetOptions{})
133+
old, err := r.Get(ctx, clusterName, metav1.GetOptions{})
138134
if err != nil {
139135
return nil, err
140136
}
@@ -163,8 +159,8 @@ func (r *ClusterVFS) Update(c *api.Cluster, status *api.ClusterStatus) (*api.Clu
163159

164160
// List returns a slice containing all the cluster names
165161
// It skips directories that don't look like clusters
166-
func (r *ClusterVFS) listNames() ([]string, error) {
167-
paths, err := r.basePath.ReadTree()
162+
func (r *ClusterVFS) listNames(ctx context.Context) ([]string, error) {
163+
paths, err := r.basePath.ReadTree(ctx)
168164
if err != nil {
169165
return nil, fmt.Errorf("error reading state store: %v", err)
170166
}

0 commit comments

Comments
 (0)