Skip to content

Commit bce5cf4

Browse files
authored
Add configuration options to Kube Cache service (#1304)
1 parent ac81b7d commit bce5cf4

File tree

5 files changed

+126
-27
lines changed

5 files changed

+126
-27
lines changed

.golangci.yml

-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ linters:
1919
- errorlint
2020
- cyclop
2121
- errname
22-
- exportloopref
2322
- gocritic
2423
- goimports
2524
- gosimple

cmd/k8s-cache/main.go

+52-17
Original file line numberDiff line numberDiff line change
@@ -2,45 +2,60 @@ package main
22

33
import (
44
"context"
5+
"flag"
6+
"fmt"
7+
"io"
58
"log/slog"
9+
"net/http"
610
"os"
711
"os/signal"
8-
"strconv"
912
"syscall"
1013
"time"
1114

15+
"github.com/grafana/beyla/pkg/buildinfo"
16+
"github.com/grafana/beyla/pkg/kubecache"
1217
"github.com/grafana/beyla/pkg/kubecache/meta"
1318
"github.com/grafana/beyla/pkg/kubecache/service"
1419
)
1520

16-
const defaultPort = 50055
17-
1821
// main code of te Kubernetes K8s informer's metadata cache service, when it runs as a separate service and not
1922
// as a library inside Beyla
2023

2124
func main() {
22-
// TODO: use buildinfo to print version
23-
// TODO: let configure logger
24-
slog.SetDefault(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{AddSource: true, Level: slog.LevelDebug})))
25+
lvl := slog.LevelVar{}
26+
lvl.Set(slog.LevelInfo)
27+
slog.SetDefault(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
28+
Level: &lvl,
29+
})))
30+
31+
slog.Info("Beyla's Kubernetes Metadata cache service", "Version", buildinfo.Version, "Revision", buildinfo.Revision)
2532

26-
ic := service.InformersCache{
27-
Port: defaultPort,
33+
configPath := flag.String("config", "", "path to the configuration file")
34+
flag.Parse()
35+
if cfg := os.Getenv("BEYLA_K8S_CACHE_CONFIG_PATH"); cfg != "" {
36+
configPath = &cfg
2837
}
29-
portStr := os.Getenv("BEYLA_K8S_CACHE_PORT")
30-
if portStr != "" {
31-
var err error
32-
if ic.Port, err = strconv.Atoi(portStr); err != nil {
33-
slog.Error("invalid BEYLA_K8S_CACHE_PORT, using default port", "error", err)
34-
ic.Port = defaultPort
35-
}
38+
config := loadFromFile(configPath)
39+
if err := lvl.UnmarshalText([]byte(config.LogLevel)); err != nil {
40+
slog.Error("unknown log level specified, choices are [DEBUG, INFO, WARN, ERROR]", "error", err)
41+
os.Exit(-1)
42+
}
43+
44+
if config.ProfilePort != 0 {
45+
go func() {
46+
slog.Info("starting PProf HTTP listener", "port", config.ProfilePort)
47+
err := http.ListenAndServe(fmt.Sprintf(":%d", config.ProfilePort), nil)
48+
slog.Error("PProf HTTP listener stopped working", "error", err)
49+
}()
3650
}
3751

52+
ic := service.InformersCache{Config: config}
53+
3854
// Adding shutdown hook for graceful stop.
3955
ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
4056

4157
if err := ic.Run(ctx,
42-
// TODO: make it configurable
43-
meta.WithResyncPeriod(30*time.Minute)); err != nil {
58+
meta.WithResyncPeriod(config.InformerResyncPeriod)); err != nil {
4459
slog.Error("starting informers' cache service", "error", err)
4560
os.Exit(-1)
4661
}
@@ -51,3 +66,23 @@ func main() {
5166
time.Sleep(time.Second)
5267
}
5368
}
69+
70+
func loadFromFile(configPath *string) *kubecache.Config {
71+
var configReader io.ReadCloser
72+
if configPath != nil && *configPath != "" {
73+
var err error
74+
if configReader, err = os.Open(*configPath); err != nil {
75+
slog.Error("can't open "+*configPath, "error", err)
76+
os.Exit(-1)
77+
}
78+
defer configReader.Close()
79+
}
80+
config, err := kubecache.LoadConfig(configReader)
81+
if err != nil {
82+
slog.Error("wrong configuration", "error", err)
83+
// nolint:gocritic
84+
os.Exit(-1)
85+
}
86+
87+
return config
88+
}

pkg/kubecache/config.go

+53
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package kubecache
2+
3+
import (
4+
"fmt"
5+
"io"
6+
"time"
7+
8+
"github.com/caarlos0/env/v9"
9+
"gopkg.in/yaml.v3"
10+
)
11+
12+
// Config options of the Kubernetes Cache service. Check the "DefaultConfig" variable for a view of the default values.
13+
type Config struct {
14+
// LogLevel can be one of: debug, info, warn, error
15+
LogLevel string `yaml:"log_level" env:"BEYLA_K8S_CACHE_LOG_LEVEL"`
16+
// Port where the service is going to listen to
17+
Port int `yaml:"port" env:"BEYLA_K8S_CACHE_PORT"`
18+
// MaxConnection is the maximum number of concurrent clients that the service can handle at the same time
19+
MaxConnections int `yaml:"max_connections" env:"BEYLA_K8S_CACHE_MAX_CONNECTIONS"`
20+
// ProfilePort is the port where the pprof server is going to listen to. 0 (default) means disabled
21+
ProfilePort int `yaml:"profile_port" env:"BEYLA_K8S_CACHE_PROFILE_PORT"`
22+
// InformerResyncPeriod is the time interval between complete resyncs of the informers
23+
InformerResyncPeriod time.Duration `yaml:"informer_resync_period" env:"BEYLA_K8S_CACHE_INFORMER_RESYNC_PERIOD"`
24+
}
25+
26+
var DefaultConfig = Config{
27+
LogLevel: "info",
28+
Port: 50055,
29+
MaxConnections: 100,
30+
InformerResyncPeriod: 30 * time.Minute,
31+
ProfilePort: 0,
32+
}
33+
34+
// LoadConfig overrides configuration in the following order (from less to most priority)
35+
// 1 - Default configuration (DefaultConfig variable)
36+
// 2 - Contents of the provided file reader (nillable)
37+
// 3 - Environment variables
38+
func LoadConfig(file io.Reader) (*Config, error) {
39+
cfg := DefaultConfig
40+
if file != nil {
41+
cfgBuf, err := io.ReadAll(file)
42+
if err != nil {
43+
return nil, fmt.Errorf("reading YAML configuration: %w", err)
44+
}
45+
if err := yaml.Unmarshal(cfgBuf, &cfg); err != nil {
46+
return nil, fmt.Errorf("parsing YAML configuration: %w", err)
47+
}
48+
}
49+
if err := env.Parse(&cfg); err != nil {
50+
return nil, fmt.Errorf("reading env vars: %w", err)
51+
}
52+
return &cfg, nil
53+
}

pkg/kubecache/envtest/integration_test.go

+13-5
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"sigs.k8s.io/controller-runtime/pkg/client"
2222
"sigs.k8s.io/controller-runtime/pkg/envtest"
2323

24+
"github.com/grafana/beyla/pkg/kubecache"
2425
"github.com/grafana/beyla/pkg/kubecache/informer"
2526
"github.com/grafana/beyla/pkg/kubecache/meta"
2627
"github.com/grafana/beyla/pkg/kubecache/service"
@@ -34,6 +35,8 @@ var (
3435

3536
const timeout = 10 * time.Second
3637

38+
var freePort int
39+
3740
func TestMain(m *testing.M) {
3841
slog.SetDefault(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{AddSource: true, Level: slog.LevelDebug})))
3942
var cancel context.CancelFunc
@@ -64,6 +67,11 @@ func TestMain(m *testing.M) {
6467
slog.Error("creating K8s manager client", "error", err)
6568
os.Exit(1)
6669
}
70+
freePort, err = test.FreeTCPPort()
71+
if err != nil {
72+
slog.Error("getting a free TCP port", "error", err)
73+
os.Exit(1)
74+
}
6775
go func() {
6876
if err := k8sManager.Start(ctx); err != nil {
6977
slog.Error("starting manager", "error", err)
@@ -78,12 +86,12 @@ func TestMain(m *testing.M) {
7886
}()
7987

8088
// Create and start informers client cache
81-
svc := service.InformersCache{
82-
Port: 50055, // TODO: get a free port automatically to not collide with other tests
83-
}
89+
iConfig := kubecache.DefaultConfig
90+
iConfig.Port = freePort
91+
svc := service.InformersCache{Config: &iConfig}
8492
go func() {
8593
if err := svc.Run(ctx,
86-
meta.WithResyncPeriod(30*time.Minute),
94+
meta.WithResyncPeriod(iConfig.InformerResyncPeriod),
8795
meta.WithKubeClient(theClient),
8896
); err != nil {
8997
slog.Error("running service", "error", err)
@@ -95,7 +103,7 @@ func TestMain(m *testing.M) {
95103
}
96104

97105
func TestAPIs(t *testing.T) {
98-
svcClient := serviceClient{ID: "first-pod", Address: "127.0.0.1:50055"}
106+
svcClient := serviceClient{ID: "first-pod", Address: fmt.Sprintf("127.0.0.1:%d", freePort)}
99107
// client
100108
require.Eventually(t, func() bool {
101109
return svcClient.Start(ctx) == nil

pkg/kubecache/service/service.go

+8-4
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"google.golang.org/grpc"
1212
"google.golang.org/grpc/peer"
1313

14+
"github.com/grafana/beyla/pkg/kubecache"
1415
"github.com/grafana/beyla/pkg/kubecache/informer"
1516
"github.com/grafana/beyla/pkg/kubecache/meta"
1617
)
@@ -19,7 +20,7 @@ import (
1920
type InformersCache struct {
2021
informer.UnimplementedEventStreamServiceServer
2122

22-
Port int
23+
Config *kubecache.Config
2324

2425
started atomic.Bool
2526
informers *meta.Informers
@@ -32,7 +33,7 @@ func (ic *InformersCache) Run(ctx context.Context, opts ...meta.InformerOption)
3233
}
3334
ic.log = slog.With("component", "server.InformersCache")
3435

35-
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", ic.Port))
36+
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", ic.Config.Port))
3637
if err != nil {
3738
return fmt.Errorf("starting TCP connection: %w", err)
3839
}
@@ -42,10 +43,13 @@ func (ic *InformersCache) Run(ctx context.Context, opts ...meta.InformerOption)
4243
return fmt.Errorf("initializing informers: %w", err)
4344
}
4445

45-
s := grpc.NewServer()
46+
s := grpc.NewServer(
47+
// TODO: configure other aspects (e.g. secure connections)
48+
grpc.MaxConcurrentStreams(uint32(ic.Config.MaxConnections)),
49+
)
4650
informer.RegisterEventStreamServiceServer(s, ic)
4751

48-
ic.log.Info("server listening", "port", ic.Port)
52+
ic.log.Info("server listening", "port", ic.Config.Port)
4953

5054
errs := make(chan error, 1)
5155
go func() {

0 commit comments

Comments
 (0)