Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,095 changes: 524 additions & 571 deletions Cargo.lock

Large diffs are not rendered by default.

38 changes: 19 additions & 19 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ ahash = "0.8"
anyhow = "1"
assert_matches = "1.5.0"
async-trait = "0.1"
aws-config = "1.8.8"
aws-credential-types = "1.2.8"
aws-sdk-sqs = "1.86.0"
aws-sigv4 = "1.3.5"
aws-config = "1.8.11"
aws-credential-types = "1.2.10"
aws-sdk-sqs = "1.90.0"
aws-sigv4 = "1.3.6"
aws-smithy-async = "1.2.6"
aws-smithy-http = "0.62.4"
aws-smithy-runtime-api = { version = "1.9.1", features = ["test-util"] }
aws-smithy-types = "1.3.3"
axum = "0.8.6"
aws-smithy-http = "0.62.5"
aws-smithy-runtime-api = { version = "1.9.2", features = ["test-util"] }
aws-smithy-types = "1.3.4"
axum = "0.8.7"
backoff = { version = "0.4.0", features = ["tokio"] }
base64ct = "1.8.0"
bd-grpc = { git = "https://github.com/bitdriftlabs/shared-core.git" }
Expand All @@ -42,12 +42,12 @@ bd-test-helpers = { git = "https://github.com/bitdriftlabs/shared-core.gi
bd-time = { git = "https://github.com/bitdriftlabs/shared-core.git" }
built = { version = "0.8", features = ["git2"] }
bytes = "1"
cc = "1.2.41"
clap = { version = "4.5.50", features = ["derive", "env"] }
cc = "1.2.48"
clap = { version = "4.5.53", features = ["derive", "env"] }
comfy-table = "7.2.1"
console-subscriber = "0.4.1"
console-subscriber = "0.5.0"
criterion = { version = "0.7", features = ["html_reports"] }
ctor = "0.6.0"
ctor = "0.6.1"
cuckoofilter = "0.5.0"
dashmap = { version = "6", features = ["raw-api"] }
deadpool = { version = "0.12", features = ["managed", "rt_tokio_1"] }
Expand All @@ -56,11 +56,11 @@ fst = "0.4.7"
futures = "0.3"
futures-util = "0.3.31"
gettid = "0.1.4"
hashbrown = "0.16.0"
http = "1.3.1"
hashbrown = "0.16.1"
http = "1.4.0"
http-body-util = "0.1.3"
humantime-serde = "1.1"
hyper = "1.7.0"
hyper = "1.8.1"

hyper-rustls = { version = "0.27.7", default-features = false, features = [
"http1",
Expand All @@ -69,7 +69,7 @@ hyper-rustls = { version = "0.27.7", default-features = false, features = [
"aws-lc-rs",
] }

hyper-util = { version = "0.1.17", features = ["client", "client-legacy"] }
hyper-util = { version = "0.1.18", features = ["client", "client-legacy"] }
hyperloglogplus = "0.4.1"
intrusive-collections = "0.9.7"
itertools = "0.14.0"
Expand All @@ -85,7 +85,7 @@ kube = { version = "2.0.1", features = [
log = "0.4"
matches = "0.1"
memchr = "2"
mockall = "0.13.1"
mockall = "0.14.0"
nom = "8"
notify = "8.2.0"
parking_lot = "0.12"
Expand All @@ -112,7 +112,7 @@ reqwest = { version = "0.12.24", default-features = false, features = [
] }

reusable-fmt = "0.2.0"
rustls = "0.23.34"
rustls = "0.23.35"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
serde_yaml = "0.9.34"
Expand All @@ -129,7 +129,7 @@ tokio-stream = "0.1.17"
tokio-test = "0.4"
topk = "0.5.0"
topological-sort = "0.2.2"
tracing = "0.1.41"
tracing = "0.1.43"
unwrap-infallible = "0.1.5"
url = "2.5.7"
uuid = { version = "1.18.1", features = ["v4"] }
Expand Down
4 changes: 2 additions & 2 deletions pulse-common/src/k8s/pods_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ pub mod container {
PodInfo {
services: HashMap::new(),
namespace: namespace.to_string(),
name: pod_name.to_string(),
name: pod_name.clone(),
labels: pod.labels().clone(),
annotations: pod.annotations().clone(),
metadata: Arc::new(crate::metadata::Metadata::new(
Expand All @@ -332,7 +332,7 @@ pub mod container {
None,
)),
ip: pod_ip,
ip_string: pod_ip_string.to_string(),
ip_string: pod_ip_string.clone(),
container_ports,
}
};
Expand Down
4 changes: 2 additions & 2 deletions pulse-common/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ impl Metadata {
let pod_annotations = pod_metadata
.as_ref()
.map(|p| btree_to_value(p.pod_annotations));
let node_name = node_info.as_ref().map(|n| n.name.to_string());
let node_ip = node_info.as_ref().map(|n| n.ip.to_string());
let node_name = node_info.as_ref().map(|n| n.name.clone());
let node_ip = node_info.as_ref().map(|n| n.ip.clone());
let node_labels = node_info.as_ref().map(|n| btree_to_value(&n.labels));
let node_annotations = node_info.as_ref().map(|n| btree_to_value(&n.annotations));
let service = pod_metadata.and_then(|p| p.service.map(|s| value!({"name": s})));
Expand Down
47 changes: 30 additions & 17 deletions pulse-metrics/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub trait Batch<I> {
}

// A combined batch and its size. Used for construction of both pending batches as well as entries
// in the LIFO queue.
// in the LIFO/FIFO queue.
struct QueueEntry<T> {
size: usize,
item: T,
Expand All @@ -50,6 +50,7 @@ struct GlobalLockedData<T> {
batch_queue: VecDeque<QueueEntry<T>>,
current_total_size: usize,
waiters: bool,
lifo: bool,
}
struct PerBatchLockedData<T> {
pending_batch: Option<T>,
Expand All @@ -68,14 +69,12 @@ struct Stats {
// BatchBuilder
//

// The batch builder combines the ability to create generic batches of items, as well as a LIFO
// The batch builder combines the ability to create generic batches of items, as well as a LIFO/FIFO
// queue of completed batches. The total size of both completed batches and pending batches are
// tracked, and if there is overflow, the oldest entries are evicted from the LIFO queue to make
// room.
// tracked, and if there is overflow, the oldest entries are evicted from the LIFO/FIFO queue to
// make room.
// The LIFO requirement means we cannot use tokio::mpsc, which is why this file contains so much
// manual synchronization code.
// TODO(mattklein123): Potentially make the LIFO configurable. Some backends might not like this?
// (Though if we send in parallel seems like it shouldn't matter.)
pub struct BatchBuilder<I, B: Batch<I>> {
global_locked_data: Mutex<GlobalLockedData<B>>,
shutdown: AtomicBool,
Expand Down Expand Up @@ -110,6 +109,7 @@ impl<I: Send + Sync + 'static, B: Batch<I> + Send + Sync + 'static> BatchBuilder
batch_queue: VecDeque::new(),
current_total_size: 0,
waiters: false,
lifo: !policy.use_fifo,
}),
shutdown: AtomicBool::new(false),
concurrent_batch_index: AtomicUsize::new(0),
Expand Down Expand Up @@ -189,7 +189,7 @@ impl<I: Send + Sync + 'static, B: Batch<I> + Send + Sync + 'static> BatchBuilder
}

// Send an item through the batch builder. This may result in old data getting dropped if the
// total data in the LIFO queue and the pending batch is too large.
// total data in the queue and the pending batch is too large.
pub fn send(self: &Arc<Self>, items: impl Iterator<Item = I>) {
if self.shutdown.load(Ordering::Relaxed) {
// Just silently drop the data.
Expand Down Expand Up @@ -240,18 +240,21 @@ impl<I: Send + Sync + 'static, B: Batch<I> + Send + Sync + 'static> BatchBuilder
let mut locked_data = self.global_locked_data.lock();

// See if we need to evict old entries to make room for new data.
// For LIFO, evict from back (oldest).
// For FIFO, evict from back (newest to preserve oldest).
while self.max_total_bytes - locked_data.current_total_size < finished_size {
let back = locked_data.batch_queue.pop_back().unwrap();
let evicted = locked_data.batch_queue.pop_back().unwrap();
warn_every!(
15.seconds(),
"batch overflow, evicting oldest with size: {}",
back.size
"batch overflow, evicting {} with size: {}",
if locked_data.lifo { "oldest" } else { "newest" },
evicted.size
);
self
.stats
.dropped_bytes
.inc_by(back.size.try_into().unwrap());
Self::dec_total_size(&mut locked_data, &self.stats, back.size);
.inc_by(evicted.size.try_into().unwrap());
Self::dec_total_size(&mut locked_data, &self.stats, evicted.size);
}

Self::finish_batch(
Expand Down Expand Up @@ -299,7 +302,7 @@ impl<I: Send + Sync + 'static, B: Batch<I> + Send + Sync + 'static> BatchBuilder
}
}

// Finish a pending batch and push it onto the LIFO queue.
// Finish a pending batch and push it onto the queue.
fn finish_batch(
locked_data: &mut GlobalLockedData<B>,
stats: &Stats,
Expand All @@ -326,10 +329,19 @@ impl<I: Send + Sync + 'static, B: Batch<I> + Send + Sync + 'static> BatchBuilder
notify_on_data.notify_waiters();
}
log::trace!("finalizing and pushing batch with size: {size}");
locked_data.batch_queue.push_front(QueueEntry {
size,
item: pending_batch,
});
// For LIFO, push to front and pop from front (newest first).
// For FIFO, push to back and pop from front (oldest first).
if locked_data.lifo {
locked_data.batch_queue.push_front(QueueEntry {
size,
item: pending_batch,
});
} else {
locked_data.batch_queue.push_back(QueueEntry {
size,
item: pending_batch,
});
}
}

// Get the next set of batches. Returns None when the builder has been shutdown and
Expand All @@ -345,6 +357,7 @@ impl<I: Send + Sync + 'static, B: Batch<I> + Send + Sync + 'static> BatchBuilder
if len_to_pop > 0 {
let mut batch_set = Vec::with_capacity(len_to_pop);
for _ in 0 .. len_to_pop {
// Always pop from front for both LIFO and FIFO.
let entry = locked_data.batch_queue.pop_front().unwrap();
Self::dec_total_size(&mut locked_data, &self.stats, entry.size);
log::trace!("popping entry with size: {}", entry.size);
Expand Down
153 changes: 153 additions & 0 deletions pulse-metrics/src/batch_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,3 +259,156 @@ async fn max_size_overflow() {
map_return(batch_builder.next_batch_set(Some(16)).await)
);
}

#[tokio::test(start_paused = true)]
async fn fifo_simple_batch() {
let shutdown_trigger = ComponentShutdownTrigger::default();
let batch_builder = BatchBuilder::new(
&Collector::default().scope("test"),
&QueuePolicy {
use_fifo: true,
..Default::default()
},
TestBatch::default,
shutdown_trigger.make_shutdown(),
);

let batch_future = batch_builder.next_batch_set(None);
pin!(batch_future);
assert_eq!(Poll::Pending, poll!(batch_future.as_mut()));

batch_builder.send(once(("a", expect_finish(4096))));
assert_eq!(Poll::Pending, poll!(batch_future.as_mut()));

// Sleep to advance past the batch fill time.
51.milliseconds().sleep().await;
assert_eq!(Some(vec![vec!["a"]]), map_return(batch_future.await));

// Add 2 items that together will meet the max batch size. This should flush it and cancel the
// fill wait task.
batch_builder.send(once(("a", None)));
batch_builder.send(once(("b", complete(4096))));
51.milliseconds().sleep().await;
assert_eq!(
Some(vec![vec!["a", "b"]]),
map_return(batch_builder.next_batch_set(Some(1)).await)
);

// Add another item, and advance past the batch fill time and make sure we get it back.
batch_builder.send(once(("a", expect_finish(1))));
51.milliseconds().sleep().await;
assert_eq!(
Some(vec![vec!["a"]]),
map_return(batch_builder.next_batch_set(Some(16)).await)
);

// Add another item, and advance past the batch fill time and make sure we get it back.
batch_builder.send(once(("b", expect_finish(1))));
51.milliseconds().sleep().await;
assert_eq!(
Some(vec![vec!["b"]]),
map_return(batch_builder.next_batch_set(None).await)
);
}

#[tokio::test(start_paused = true)]
async fn fifo_ordering() {
let shutdown_trigger = ComponentShutdownTrigger::default();
let batch_builder = BatchBuilder::new(
&Collector::default().scope("test"),
&QueuePolicy {
use_fifo: true,
..Default::default()
},
TestBatch::default,
shutdown_trigger.make_shutdown(),
);

// Send multiple batches and verify they are returned in FIFO order (oldest first).
batch_builder.send(once(("a", None)));
batch_builder.send(once(("b", complete(2))));
batch_builder.send(once(("c", None)));
batch_builder.send(once(("d", complete(2))));
batch_builder.send(once(("e", None)));
batch_builder.send(once(("f", complete(2))));

// In FIFO mode, oldest batch should come first.
assert_eq!(
vec![vec!["a", "b"], vec!["c", "d"], vec!["e", "f"]],
map_return(batch_builder.next_batch_set(None).await).unwrap()
);
}

#[tokio::test(start_paused = true)]
async fn fifo_overflow() {
let shutdown_trigger = ComponentShutdownTrigger::default();
let batch_builder = BatchBuilder::new(
&Collector::default().scope("test"),
&QueuePolicy {
queue_max_bytes: Some(6),
use_fifo: true,
..Default::default()
},
TestBatch::default,
shutdown_trigger.make_shutdown(),
);

// Send multiple batches that exceed the queue size.
batch_builder.send(once(("a", None)));
batch_builder.send(once(("b", complete(2))));
batch_builder.send(once(("c", complete(2))));
batch_builder.send(once(("d", None)));
batch_builder.send(once(("e", complete(3))));

// In FIFO mode with overflow:
// - Batches are pushed to back of queue: ["a","b"], ["c"], ["d","e"]
// - Queue max is 6 bytes
// - After ["a","b"](2) + ["c"](2) = 4 bytes
// - Adding ["d","e"](3) would be 7 bytes total
// - Need to evict 1 byte, so evict from back: drop ["d","e"] itself? No, we evict BEFORE adding
// - Actually the logic evicts AFTER checking we need room, so it evicts ["c"](2) from back
// - Final state: ["a","b"](2) + ["d","e"](3) = 5 bytes, ["c"] was dropped
assert_eq!(
Some(vec![vec!["a", "b"]]),
map_return(batch_builder.next_batch_set(Some(1)).await)
);
assert_eq!(
Some(vec![vec!["d", "e"]]),
map_return(batch_builder.next_batch_set(Some(1)).await)
);

let poll_future = batch_builder.next_batch_set(Some(16));
pin!(poll_future);
assert_eq!(Poll::Pending, poll!(poll_future));
}

#[tokio::test(start_paused = true)]
async fn fifo_multiple_queues() {
let shutdown_trigger = ComponentShutdownTrigger::default();
let batch_builder = BatchBuilder::new(
&Collector::default().scope("test"),
&QueuePolicy {
concurrent_batch_queues: Some(2),
use_fifo: true,
..Default::default()
},
TestBatch::default,
shutdown_trigger.make_shutdown(),
);

batch_builder.send(
[
("a", None),
("b", complete(2)),
("c", None),
("d", complete(2)),
]
.into_iter(),
);

// In FIFO mode, batches should be returned in order they were completed.
assert_eq!(
vec![vec!["a", "b"], vec!["c", "d"]],
map_return(batch_builder.next_batch_set(None).await).unwrap()
);
}
Loading
Loading