Skip to content

Commit e9b25b3

Browse files
authored
K8s-ingestion (#99)
* Add native support for ingesting k8s logs. * When getting k8s logs, change the log viewer to show namespace and pod instead of host and service * Update docs * Filter at the source not within gonzo (faster) * Bump to 1.25 go, use new wg.Go() feature * Use context to cancel watchers * Clean up k8s docs * Nix change to follow nixpkgs
1 parent 1d482aa commit e9b25b3

File tree

25 files changed

+2906
-111
lines changed

25 files changed

+2906
-111
lines changed

README.md

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ Here are some references to get you started:
4545
### 🎯 Real-Time Analysis
4646

4747
- **Live streaming** - Process logs as they arrive from stdin, files, or network
48+
- **Kubernetes native** - Direct integration with Kubernetes clusters for pod log streaming
4849
- **OTLP native** - First-class support for OpenTelemetry log format
4950
- **OTLP receiver** - Built-in gRPC server to receive logs via OpenTelemetry protocol
5051
- **Format detection** - Automatically detects JSON, logfmt, and plain text
@@ -68,6 +69,7 @@ Here are some references to get you started:
6869
- **Regex support** - Filter logs with regular expressions
6970
- **Attribute search** - Find logs by specific attribute values
7071
- **Severity filtering** - Interactive modal to select specific log levels (Ctrl+f)
72+
- **Kubernetes filtering** - Filter by namespace and pod with interactive selection (Ctrl+k)
7173
- **Multi-level selection** - Enable/disable multiple severity levels at once
7274
- **Interactive selection** - Click or keyboard navigate to explore logs
7375

@@ -144,7 +146,12 @@ gonzo -f "/var/log/*.log" --follow
144146
# Analyze logs from stdin (traditional way)
145147
cat application.log | gonzo
146148

147-
# Stream logs from kubectl
149+
# Stream logs directly from Kubernetes clusters
150+
gonzo --k8s-enabled=true --k8s-namespace=default
151+
gonzo --k8s-enabled=true --k8s-namespace=production --k8s-namespace=staging
152+
gonzo --k8s-enabled=true --k8s-selector="app=my-app"
153+
154+
# Stream logs from kubectl (traditional way)
148155
kubectl logs -f deployment/my-app | gonzo
149156

150157
# Follow system logs
@@ -311,8 +318,9 @@ cat logs.json | gonzo --ai-model="gpt-4"
311318
| `/` | Enter filter mode (regex supported) |
312319
| `s` | Search and highlight text in logs |
313320
| `Ctrl+f` | Open severity filter modal |
321+
| `Ctrl+k` | Open Kubernetes filter modal (k8s mode) |
314322
| `f` | Open fullscreen log viewer modal |
315-
| `c` | Toggle Host/Service columns in log view |
323+
| `c` | Toggle Namespace/Pod or Host/Service cols |
316324
| `r` | Reset all data (manual reset) |
317325
| `u` / `U` | Cycle update intervals (forward/backward) |
318326
| `i` | AI analysis (in detail view) |
@@ -415,6 +423,16 @@ Flags:
415423
--ai-model string AI model for analysis (auto-selects best available if not specified)
416424
-s, --skin string Color scheme/skin to use (default, or name of a skin file)
417425
--stop-words strings Additional stop words to filter out from analysis (adds to built-in list)
426+
427+
Kubernetes Flags:
428+
--k8s-enabled=true Enable Kubernetes log streaming mode
429+
--k8s-namespace stringArray Kubernetes namespace(s) to watch (can specify multiple, default: all)
430+
--k8s-selector string Kubernetes label selector for filtering pods
431+
--k8s-tail int Number of previous log lines to retrieve (default: 10)
432+
--k8s-since int Only return logs newer than relative duration in seconds
433+
--k8s-kubeconfig string Path to kubeconfig file (default: $HOME/.kube/config)
434+
--k8s-context string Kubernetes context to use
435+
418436
-t, --test-mode Run without TTY for testing
419437
-v, --version Print version information
420438
--config string Config file (default: $HOME/.config/gonzo/config.yml)
@@ -669,7 +687,7 @@ internal/
669687
670688
### Prerequisites
671689
672-
- Go 1.21 or higher
690+
- Go 1.25 or higher
673691
- Make (optional, for convenience)
674692
675693
### Building
@@ -768,6 +786,7 @@ This project is licensed under the MIT License - see the [LICENSE](LICENSE) file
768786
769787
- [Docs](https://docs.controltheory.com/) - Complete user guide, integration examples, advanced features (AI, OTel, custom log formats)
770788
- [Usage Guide](USAGE_GUIDE.md) - Detailed usage instructions
789+
- [Kubernetes Integration Guide](guides/KUBERNETES_USAGE.md) - Direct Kubernetes cluster integration, filtering, and usage examples
771790
- [AWS CloudWatch Logs Usage Guide](guides/CLOUDWATCH_USAGE_GUIDE.md) - Usage instructions for AWS CLI log tail and live tail with Gonzo
772791
- [Stern Usage Guide](guides/STERN_USAGE_GUIDE.md) - Usage and examples for using Stern with Gonzo
773792
- [Victoria Logs Integration](guides/VICTORIA_LOGS_USAGE.md) - Using Gonzo with Victoria Logs API

cmd/gonzo/app.go

Lines changed: 96 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"bufio"
55
"context"
66
"fmt"
7+
"io"
78
"log"
89
"os"
910
"strings"
@@ -12,6 +13,7 @@ import (
1213
"github.com/control-theory/gonzo/internal/analyzer"
1314
"github.com/control-theory/gonzo/internal/filereader"
1415
"github.com/control-theory/gonzo/internal/formats"
16+
"github.com/control-theory/gonzo/internal/k8s"
1517
"github.com/control-theory/gonzo/internal/memory"
1618
"github.com/control-theory/gonzo/internal/otlplog"
1719
"github.com/control-theory/gonzo/internal/otlpreceiver"
@@ -21,6 +23,7 @@ import (
2123

2224
tea "github.com/charmbracelet/bubbletea"
2325
"github.com/spf13/cobra"
26+
"k8s.io/klog/v2"
2427
)
2528

2629
// runApp initializes and runs the application
@@ -31,6 +34,15 @@ func runApp(cmd *cobra.Command, args []string) error {
3134
return nil
3235
}
3336

37+
// Redirect log output to discard to avoid messing up the TUI
38+
// All log.Printf calls will be silently discarded
39+
log.SetOutput(io.Discard)
40+
41+
// Suppress klog output from Kubernetes client-go
42+
// This prevents errors like "request.go:752" from appearing in the TUI
43+
klog.SetOutput(io.Discard)
44+
klog.LogToStderr(false)
45+
3446
// Start version checking in background (if not disabled)
3547
var versionChecker *versioncheck.Checker
3648
if !cfg.DisableVersionCheck {
@@ -181,6 +193,10 @@ type simpleTuiModel struct {
181193
vmlogsReceiver *vmlogs.Receiver // Victoria Logs receiver for streaming logs
182194
hasVmlogsInput bool // Whether we're receiving Victoria Logs data
183195

196+
// Kubernetes receiver support
197+
k8sReceiver *k8s.KubernetesLogSource // Kubernetes log source for streaming pod logs
198+
hasK8sInput bool // Whether we're receiving Kubernetes logs
199+
184200
// JSON accumulation for multi-line OTLP support
185201
jsonBuffer strings.Builder // Buffer for accumulating multi-line JSON
186202
jsonDepth int // Track JSON object/array nesting depth
@@ -195,8 +211,45 @@ func (m *simpleTuiModel) Init() tea.Cmd {
195211
// Initialize frequency reset timer
196212
m.lastFreqReset = time.Now()
197213

198-
// Check if Victoria Logs receiver is enabled
199-
if cfg.VmlogsURL != "" {
214+
// Check if Kubernetes receiver is enabled
215+
if cfg.K8sEnabled {
216+
// Kubernetes input mode
217+
m.hasK8sInput = true
218+
m.inputChan = make(chan string, 100)
219+
220+
// Create kubernetes config
221+
k8sConfig := &k8s.Config{
222+
Kubeconfig: cfg.K8sKubeconfig,
223+
Context: cfg.K8sContext,
224+
Namespaces: cfg.K8sNamespaces,
225+
Selector: cfg.K8sSelector,
226+
Since: cfg.K8sSince,
227+
TailLines: cfg.K8sTailLines,
228+
}
229+
230+
// Create and start Kubernetes log source
231+
k8sSource, err := k8s.NewKubernetesLogSource(k8sConfig)
232+
if err != nil {
233+
log.Printf("Error creating Kubernetes log source: %v", err)
234+
// Fall back to other input methods if Kubernetes fails
235+
m.hasK8sInput = false
236+
} else {
237+
m.k8sReceiver = k8sSource
238+
if err := m.k8sReceiver.Start(); err != nil {
239+
log.Printf("Error starting Kubernetes log source: %v", err)
240+
// Fall back to other input methods if Kubernetes fails
241+
m.hasK8sInput = false
242+
} else {
243+
// Wire K8s source to the dashboard for namespace/pod listing
244+
m.dashboard.SetK8sSource(k8sSource)
245+
// Start reading from Kubernetes receiver in the background
246+
go m.readK8sAsync()
247+
}
248+
}
249+
}
250+
251+
// Check if Victoria Logs receiver is enabled (only if Kubernetes is not enabled)
252+
if !m.hasK8sInput && cfg.VmlogsURL != "" {
200253
// Victoria Logs input mode
201254
m.hasVmlogsInput = true
202255
m.inputChan = make(chan string, 100)
@@ -215,8 +268,8 @@ func (m *simpleTuiModel) Init() tea.Cmd {
215268
}
216269
}
217270

218-
// Check if OTLP receiver is enabled (only if Victoria Logs is not enabled)
219-
if !m.hasVmlogsInput && cfg.OTLPEnabled {
271+
// Check if OTLP receiver is enabled (only if Kubernetes and Victoria Logs are not enabled)
272+
if !m.hasK8sInput && !m.hasVmlogsInput && cfg.OTLPEnabled {
220273
// OTLP input mode
221274
m.hasOTLPInput = true
222275
m.inputChan = make(chan string, 100)
@@ -233,8 +286,8 @@ func (m *simpleTuiModel) Init() tea.Cmd {
233286
}
234287
}
235288

236-
// Check if we have file inputs specified (only if Victoria Logs and OTLP are not enabled)
237-
if !m.hasVmlogsInput && !m.hasOTLPInput && len(cfg.Files) > 0 {
289+
// Check if we have file inputs specified (only if Kubernetes, Victoria Logs and OTLP are not enabled)
290+
if !m.hasK8sInput && !m.hasVmlogsInput && !m.hasOTLPInput && len(cfg.Files) > 0 {
238291
// File input mode
239292
m.hasFileInput = true
240293
m.inputChan = make(chan string, 100)
@@ -252,8 +305,8 @@ func (m *simpleTuiModel) Init() tea.Cmd {
252305
}
253306
}
254307

255-
// If no Victoria Logs, no OTLP, no file input or file input failed, check stdin
256-
if !m.hasVmlogsInput && !m.hasOTLPInput && !m.hasFileInput {
308+
// If no Kubernetes, no Victoria Logs, no OTLP, no file input or file input failed, check stdin
309+
if !m.hasK8sInput && !m.hasVmlogsInput && !m.hasOTLPInput && !m.hasFileInput {
257310
// Check if stdin has data available (not a terminal)
258311
stat, _ := os.Stdin.Stat()
259312
if (stat.Mode() & os.ModeCharDevice) == 0 {
@@ -274,13 +327,46 @@ func (m *simpleTuiModel) Init() tea.Cmd {
274327
cmds = append(cmds, m.periodicUpdate())
275328

276329
// Start checking for input data if we have any input source
277-
if m.hasStdinData || m.hasFileInput || m.hasOTLPInput || m.hasVmlogsInput {
330+
if m.hasStdinData || m.hasFileInput || m.hasOTLPInput || m.hasVmlogsInput || m.hasK8sInput {
278331
cmds = append(cmds, m.checkInputChannel())
279332
}
280333

281334
return tea.Batch(cmds...)
282335
}
283336

337+
// readK8sAsync reads from the Kubernetes log source
338+
func (m *simpleTuiModel) readK8sAsync() {
339+
defer close(m.inputChan)
340+
341+
if m.k8sReceiver == nil {
342+
return
343+
}
344+
345+
// Get the channel from Kubernetes receiver
346+
k8sLineChan := m.k8sReceiver.GetLineChan()
347+
348+
// Forward lines from Kubernetes receiver to input channel
349+
for {
350+
select {
351+
case <-m.ctx.Done():
352+
m.k8sReceiver.Stop()
353+
return
354+
case line, ok := <-k8sLineChan:
355+
if !ok {
356+
// Kubernetes receiver finished
357+
return
358+
}
359+
if line != "" {
360+
select {
361+
case m.inputChan <- line:
362+
case <-m.ctx.Done():
363+
return
364+
}
365+
}
366+
}
367+
}
368+
}
369+
284370
// readVmlogsAsync reads from the Victoria Logs receiver
285371
func (m *simpleTuiModel) readVmlogsAsync() {
286372
defer close(m.inputChan)
@@ -495,7 +581,7 @@ func (m *simpleTuiModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
495581
m.processLogLine(string(msg))
496582

497583
// Continue checking for more data if we have input sources
498-
if (m.hasStdinData || m.hasFileInput || m.hasOTLPInput || m.hasVmlogsInput) && !m.finished {
584+
if (m.hasStdinData || m.hasFileInput || m.hasOTLPInput || m.hasVmlogsInput || m.hasK8sInput) && !m.finished {
499585
cmds = append(cmds, m.checkInputChannel())
500586
}
501587

cmd/gonzo/main.go

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,13 @@ type Config struct {
4141
VmlogsUser string `mapstructure:"vmlogs-user"`
4242
VmlogsPassword string `mapstructure:"vmlogs-password"`
4343
VmlogsQuery string `mapstructure:"vmlogs-query"`
44+
K8sEnabled bool `mapstructure:"k8s-enabled"`
45+
K8sKubeconfig string `mapstructure:"k8s-kubeconfig"`
46+
K8sContext string `mapstructure:"k8s-context"`
47+
K8sNamespaces []string `mapstructure:"k8s-namespaces"`
48+
K8sSelector string `mapstructure:"k8s-selector"`
49+
K8sSince int64 `mapstructure:"k8s-since"`
50+
K8sTailLines int64 `mapstructure:"k8s-tail-lines"`
4451
Skin string `mapstructure:"skin"`
4552
StopWords []string `mapstructure:"stop-words"`
4653
Format string `mapstructure:"format"`
@@ -71,9 +78,18 @@ Supports OTLP (OpenTelemetry) format natively, with automatic detection of JSON,
7178
# Use glob patterns to read multiple files
7279
gonzo -f "/var/log/*.log" --follow
7380
74-
# Stream logs from kubectl
81+
# Stream logs from kubectl
7582
kubectl logs -f deployment/my-app | gonzo
76-
83+
84+
# Stream logs directly from Kubernetes
85+
gonzo --k8s-enabled
86+
87+
# Stream logs from specific namespaces
88+
gonzo --k8s-enabled --k8s-namespaces=default,production
89+
90+
# Stream logs with label selector
91+
gonzo --k8s-enabled --k8s-selector="app=myapp,env=prod"
92+
7793
# With custom settings
7894
gonzo -f logs.json --update-interval=2s --log-buffer=2000
7995
@@ -153,6 +169,13 @@ func init() {
153169
rootCmd.Flags().String("vmlogs-user", "", "Victoria Logs basic auth username (can also use GONZO_VMLOGS_USER env var)")
154170
rootCmd.Flags().String("vmlogs-password", "", "Victoria Logs basic auth password (can also use GONZO_VMLOGS_PASSWORD env var)")
155171
rootCmd.Flags().String("vmlogs-query", "*", "Victoria Logs query (LogsQL) to use for streaming (default: '*' for all logs)")
172+
rootCmd.Flags().Bool("k8s-enabled", false, "Enable Kubernetes log streaming from pods")
173+
rootCmd.Flags().String("k8s-kubeconfig", "", "Path to kubeconfig file (default: $KUBECONFIG or ~/.kube/config)")
174+
rootCmd.Flags().String("k8s-context", "", "Kubernetes context to use (default: current context)")
175+
rootCmd.Flags().StringSlice("k8s-namespaces", []string{}, "Kubernetes namespaces to watch (default: all namespaces)")
176+
rootCmd.Flags().String("k8s-selector", "", "Label selector to filter pods (e.g., 'app=myapp,env=prod')")
177+
rootCmd.Flags().Int64("k8s-since", 0, "Only show logs newer than this many seconds (default: 0 = all)")
178+
rootCmd.Flags().Int64("k8s-tail-lines", 10, "Lines of recent logs to show initially per pod (default: 10, use -1 for all)")
156179
rootCmd.Flags().StringP("skin", "s", "default", "Color scheme/skin to use (default, or name of a skin file in ~/.config/gonzo/skins/)")
157180
rootCmd.Flags().StringSlice("stop-words", []string{}, "Additional stop words to filter out from analysis (adds to built-in list)")
158181
rootCmd.Flags().String("format", "", "Log format to use (auto-detect if not specified). Can be: otlp, json, text, or a custom format name from ~/.config/gonzo/formats/")
@@ -174,6 +197,13 @@ func init() {
174197
viper.BindPFlag("vmlogs-user", rootCmd.Flags().Lookup("vmlogs-user"))
175198
viper.BindPFlag("vmlogs-password", rootCmd.Flags().Lookup("vmlogs-password"))
176199
viper.BindPFlag("vmlogs-query", rootCmd.Flags().Lookup("vmlogs-query"))
200+
viper.BindPFlag("k8s-enabled", rootCmd.Flags().Lookup("k8s-enabled"))
201+
viper.BindPFlag("k8s-kubeconfig", rootCmd.Flags().Lookup("k8s-kubeconfig"))
202+
viper.BindPFlag("k8s-context", rootCmd.Flags().Lookup("k8s-context"))
203+
viper.BindPFlag("k8s-namespaces", rootCmd.Flags().Lookup("k8s-namespaces"))
204+
viper.BindPFlag("k8s-selector", rootCmd.Flags().Lookup("k8s-selector"))
205+
viper.BindPFlag("k8s-since", rootCmd.Flags().Lookup("k8s-since"))
206+
viper.BindPFlag("k8s-tail-lines", rootCmd.Flags().Lookup("k8s-tail-lines"))
177207
viper.BindPFlag("skin", rootCmd.Flags().Lookup("skin"))
178208
viper.BindPFlag("stop-words", rootCmd.Flags().Lookup("stop-words"))
179209
viper.BindPFlag("format", rootCmd.Flags().Lookup("format"))

0 commit comments

Comments
 (0)