Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
bd06f70
publish and subscribe the pod info
titaneric Oct 10, 2025
504e5a9
refactor pod subscriber and publisher into its own module
titaneric Oct 10, 2025
2942b5d
implement `fetch_pod_logs` for pod subscriber
titaneric Oct 10, 2025
ce3044a
add basic reconciler for running pods
titaneric Oct 11, 2025
0697f40
start reconciler after pod state is not empty
titaneric Oct 11, 2025
18f6472
Run the reconciler once the pod state is initialized
titaneric Oct 11, 2025
43dd5ca
launch LogTailer for container instead of pod level
titaneric Oct 11, 2025
bdc90e4
send the logs into channel
titaneric Oct 11, 2025
e42a5e4
stream log and record its timestamp
titaneric Oct 11, 2025
bceca79
rename symbol
titaneric Oct 11, 2025
ea5f81e
simplify key generation used in tracking container logs status
titaneric Oct 15, 2025
96b7111
create new pod watcher for reconcile the logs
titaneric Oct 11, 2025
b847618
remove earlier introduced `init_notify` used to block the reconciler
titaneric Oct 11, 2025
7595286
simplify key generation used in tracking container logs status
titaneric Oct 11, 2025
2578ed5
transform the API logs into `Line` and send to out stream
titaneric Oct 14, 2025
2e34730
pass log line by Bytes instead of String
titaneric Oct 14, 2025
8a26708
convert log line in the reconciler
titaneric Oct 14, 2025
3cc048c
tidy up change for easier review
titaneric Oct 15, 2025
598b6f7
reference ContainerInfo in ContainerLogInfo
titaneric Oct 15, 2025
5fdef9b
Merge branch 'vectordotdev:master' into master
titaneric Oct 15, 2025
9cfd21a
update file_id used in API logs's reconciler
titaneric Oct 15, 2025
8a1174c
fix clippy error
titaneric Oct 15, 2025
8faaaf8
replace api_log with log_collection_strategy enum
titaneric Oct 17, 2025
0483a77
execute file server or logs reconciler according to strategy setting
titaneric Oct 17, 2025
ffd36ab
broadcast pod events to reconciler
titaneric Oct 17, 2025
9bcddfd
simplify the trait import
titaneric Oct 17, 2025
d9e7c9a
simplify the BroadcastStream
titaneric Oct 17, 2025
8ebba32
push the pod forwarder into reflectors to abort if needed
titaneric Oct 17, 2025
0667bcd
add batched log TODO
titaneric Oct 17, 2025
e0e51cd
Merge branch 'master' into master
titaneric Oct 17, 2025
1ae2332
refactor the reconciler's pod state handling
titaneric Oct 17, 2025
255bf93
use destructured `log_collection_strategy`
titaneric Oct 17, 2025
cbfb10a
remove dead code an unnecessary clone
titaneric Oct 17, 2025
0f0a38e
take the buffer to avoid unnecessary clone
titaneric Oct 17, 2025
0adfa7a
Merge branch 'vectordotdev:master' into master
titaneric Oct 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 104 additions & 13 deletions src/sources/kubernetes_logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ use std::{cmp::min, path::PathBuf, time::Duration};

use bytes::Bytes;
use chrono::Utc;
use futures::{future::FutureExt, stream::StreamExt};
use futures_util::Stream;
use futures_util::{
Stream,
future::{FutureExt, ready},
stream::StreamExt,
};
use http_1::{HeaderName, HeaderValue};
use k8s_openapi::api::core::v1::{Namespace, Node, Pod};
use k8s_paths_provider::K8sPathsProvider;
Expand All @@ -21,6 +24,9 @@ use kube::{
};
use lifecycle::Lifecycle;
use serde_with::serde_as;
use tokio::pin;
use tokio_stream::wrappers::BroadcastStream;

use vector_lib::{
EstimatedJsonEncodedSizeOf, TimeZone,
codecs::{BytesDeserializer, BytesDeserializerConfig},
Expand Down Expand Up @@ -53,8 +59,7 @@ use crate::{
},
kubernetes::{custom_reflector, meta_cache::MetaCache},
shutdown::ShutdownSignal,
sources,
sources::kubernetes_logs::partial_events_merger::merge_partial_events,
sources::{self, kubernetes_logs::partial_events_merger::merge_partial_events},
transforms::{FunctionTransform, OutputBuffer},
};

Expand All @@ -65,7 +70,9 @@ mod node_metadata_annotator;
mod parser;
mod partial_events_merger;
mod path_helpers;
mod pod_info;
mod pod_metadata_annotator;
mod reconciler;
mod transform_utils;
mod util;

Expand Down Expand Up @@ -278,6 +285,20 @@ pub struct Config {
#[configurable(metadata(docs::type_unit = "seconds"))]
#[serde(default = "default_rotate_wait", rename = "rotate_wait_secs")]
rotate_wait: Duration,

/// The strategy to use for log collection.
log_collection_strategy: LogCollectionStrategy,
}

/// Configuration for the log collection strategy.
#[configurable_component]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
enum LogCollectionStrategy {
/// Collect logs by reading log files from the filesystem.
File,
/// Collect logs via the Kubernetes Logs API.
Api,
}

const fn default_read_from() -> ReadFromConfig {
Expand Down Expand Up @@ -326,6 +347,7 @@ impl Default for Config {
log_namespace: None,
internal_metrics: Default::default(),
rotate_wait: default_rotate_wait(),
log_collection_strategy: default_log_collection_strategy(),
}
}
}
Expand Down Expand Up @@ -584,6 +606,7 @@ struct Source {
delay_deletion: Duration,
include_file_metric_tag: bool,
rotate_wait: Duration,
log_collection_strategy: LogCollectionStrategy,
}

impl Source {
Expand Down Expand Up @@ -673,6 +696,7 @@ impl Source {
delay_deletion,
include_file_metric_tag: config.internal_metrics.include_file_tag,
rotate_wait: config.rotate_wait,
log_collection_strategy: config.log_collection_strategy.clone(),
})
}

Expand Down Expand Up @@ -710,6 +734,7 @@ impl Source {
delay_deletion,
include_file_metric_tag,
rotate_wait,
log_collection_strategy,
} = self;

let mut reflectors = Vec::new();
Expand All @@ -734,14 +759,48 @@ impl Source {
)
.backoff(watcher::DefaultBackoff::default());

// Create shared broadcast channel for pod events
let (pod_event_tx, _) = tokio::sync::broadcast::channel(1000);
let reflector_rx = pod_event_tx.subscribe();

// Spawn task to forward pod events to broadcast channel
let pod_forwarder_tx = pod_event_tx.clone();
let pod_forwarder = tokio::spawn(async move {
pin!(pod_watcher);
while let Some(event_result) = pod_watcher.next().await {
match event_result {
Ok(event) => {
// Only broadcast successful events
if pod_forwarder_tx.send(event).is_err() {
// All receivers have been dropped
break;
}
}
Err(e) => {
warn!("Pod watcher error: {}", e);
// Continue on errors to maintain resilience
}
}
}
});
reflectors.push(pod_forwarder);

// Convert broadcast receiver to stream for reflector
let reflector_stream = BroadcastStream::new(reflector_rx).filter_map(|result| {
ready(match result {
Ok(event) => Some(Ok(event)),
Err(_) => None,
})
});

let pod_store_w = reflector::store::Writer::default();
let pod_state = pod_store_w.as_reader();
let pod_cacher = MetaCache::new();

reflectors.push(tokio::spawn(custom_reflector(
pod_store_w,
pod_cacher,
pod_watcher,
reflector_stream,
delay_deletion,
)));

Expand Down Expand Up @@ -772,7 +831,7 @@ impl Source {

// -----------------------------------------------------------------

let nodes = Api::<Node>::all(client);
let nodes = Api::<Node>::all(client.clone());
let node_watcher = watcher(
nodes,
watcher::Config {
Expand Down Expand Up @@ -888,8 +947,11 @@ impl Source {
log_namespace,
);

// TODO: annotate the logs with pods's metadata
if log_collection_strategy == LogCollectionStrategy::Api {
return event;
}
let file_info = annotator.annotate(&mut event, &line.filename);

emit!(KubernetesLogsEventsReceived {
file: &line.filename,
byte_size: event.estimated_json_encoded_size_of(),
Expand Down Expand Up @@ -940,17 +1002,43 @@ impl Source {
let event_processing_loop = out.send_event_stream(&mut stream);

let mut lifecycle = Lifecycle::new();
{
// Only add file server when log_collection_strategy is File
if log_collection_strategy == LogCollectionStrategy::File {
let (slot, shutdown) = lifecycle.add();
let fut = util::run_file_server(file_server, file_source_tx, shutdown, checkpointer)
.map(|result| match result {
Ok(FileServerShutdown) => info!(message = "File server completed gracefully."),
let fut =
util::run_file_server(file_server, file_source_tx.clone(), shutdown, checkpointer)
.map(|result| match result {
Ok(FileServerShutdown) => {
info!(message = "File server completed gracefully.")
}
Err(error) => emit!(KubernetesLifecycleError {
message: "File server exited with an error.",
error,
count: events_count,
}),
});
slot.bind(Box::pin(fut));
}
if log_collection_strategy == LogCollectionStrategy::Api {
let reconciler_rx = pod_event_tx.subscribe();
let reconciler =
reconciler::Reconciler::new(client.clone(), file_source_tx.clone(), reconciler_rx);
let (slot, shutdown) = lifecycle.add();
let fut = util::complete_with_deadline_on_signal(
reconciler.run(),
shutdown,
Duration::from_secs(30), // more than enough time to propagate
)
.map(|result| {
match result {
Ok(_) => info!(message = "Reconciler completed gracefully."),
Err(error) => emit!(KubernetesLifecycleError {
message: "File server exited with an error.",
error,
message: "Reconciler timed out during the shutdown.",
count: events_count,
}),
});
};
});
slot.bind(Box::pin(fut));
}
{
Expand Down Expand Up @@ -1093,6 +1181,9 @@ const fn default_delay_deletion_ms() -> Duration {
const fn default_rotate_wait() -> Duration {
Duration::from_secs(u64::MAX / 2)
}
const fn default_log_collection_strategy() -> LogCollectionStrategy {
LogCollectionStrategy::File
}

// This function constructs the patterns we include for file watching, created
// from the defaults or user provided configuration.
Expand Down
9 changes: 4 additions & 5 deletions src/sources/kubernetes_logs/parser/cri.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ use vector_lib::{

use crate::{
event::{self, Event, Value},
internal_events::{
DROP_EVENT, ParserConversionError, ParserMatchError, ParserMissingFieldError,
},
internal_events::{DROP_EVENT, ParserConversionError, ParserMissingFieldError},
sources::kubernetes_logs::{Config, transform_utils::get_message_path},
transforms::{FunctionTransform, OutputBuffer},
};
Expand Down Expand Up @@ -58,8 +56,9 @@ impl FunctionTransform for Cri {
}
Some(s) => match parse_log_line(&s) {
None => {
emit!(ParserMatchError { value: &s[..] });
return;
// TODO: fix it until `FunctionTransform` supports Api logs
// emit!(ParserMatchError { value: &s[..] });
drop(log.insert(&message_path, Value::Bytes(s)));
Copy link
Contributor Author

@titaneric titaneric Oct 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is a ugly workaround. For more reasonable PR review size, I dropped the implementation of FunctionTransform trait for Api here (which is very similar to how Cri handle the logs).

}
Some(parsed_log) => {
// For all fields except `timestamp`, simply treat them as `Value::Bytes`. For
Expand Down
45 changes: 45 additions & 0 deletions src/sources/kubernetes_logs/pod_info.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use k8s_openapi::api::core::v1::Pod;
use serde::{Deserialize, Serialize};

/// Pod information struct that contains essential details for log fetching
#[derive(Clone, Debug, Serialize, Deserialize, Eq, Hash, PartialEq)]
pub struct PodInfo {
/// Pod name
pub name: String,
/// Pod namespace
pub namespace: String,
/// Pod phase (Running, Pending, etc.)
pub phase: Option<String>,
/// Container names within the pod
pub containers: Vec<String>,
}

impl From<&Pod> for PodInfo {
fn from(pod: &Pod) -> Self {
let metadata = &pod.metadata;

let name = metadata.name.as_ref().cloned().unwrap_or_default();

let namespace = metadata.namespace.as_ref().cloned().unwrap_or_default();

let phase = pod.status.as_ref().and_then(|status| status.phase.clone());

let containers = pod
.spec
.as_ref()
.map(|spec| {
spec.containers
.iter()
.map(|container| container.name.clone())
.collect()
})
.unwrap_or_default();

PodInfo {
name,
namespace,
phase,
containers,
}
}
}
Loading
Loading