Skip to content

Commit 34da1d1

Browse files
committed
fix(instrument): Buffer counter underflowed (#23872)
1 parent bd6a8f5 commit 34da1d1

File tree

2 files changed

+19
-14
lines changed

2 files changed

+19
-14
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Fix buffer counter underflowed, caused by the counter has not been updated(increase) timely when new event is coming.
2+
3+
authors: sialais

lib/vector-buffers/src/topology/channel/sender.rs

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,15 @@ impl<T: Bufferable> BufferSender<T> {
204204

205205
let mut sent_to_base = true;
206206
let mut was_dropped = false;
207+
208+
if let Some(instrumentation) = self.instrumentation.as_ref()
209+
&& let Some((item_count, item_size)) = item_sizing
210+
{
211+
instrumentation.increment_received_event_count_and_byte_size(
212+
item_count as u64,
213+
item_size as u64,
214+
);
215+
}
207216
match self.when_full {
208217
WhenFull::Block => self.base.send(item).await?,
209218
WhenFull::DropNewest => {
@@ -214,6 +223,7 @@ impl<T: Bufferable> BufferSender<T> {
214223
WhenFull::Overflow => {
215224
if let Some(item) = self.base.try_send(item).await? {
216225
sent_to_base = false;
226+
was_dropped = true;
217227
self.overflow
218228
.as_mut()
219229
.unwrap_or_else(|| unreachable!("overflow must exist"))
@@ -223,23 +233,9 @@ impl<T: Bufferable> BufferSender<T> {
223233
}
224234
}
225235

226-
if (sent_to_base || was_dropped)
227-
&& let (Some(send_duration), Some(send_reference)) =
228-
(self.send_duration.as_ref(), send_reference)
229-
{
230-
send_duration.emit(send_reference.elapsed());
231-
}
232-
233236
if let Some(instrumentation) = self.instrumentation.as_ref()
234237
&& let Some((item_count, item_size)) = item_sizing
235238
{
236-
if sent_to_base {
237-
instrumentation.increment_received_event_count_and_byte_size(
238-
item_count as u64,
239-
item_size as u64,
240-
);
241-
}
242-
243239
if was_dropped {
244240
instrumentation.increment_dropped_event_count_and_byte_size(
245241
item_count as u64,
@@ -248,6 +244,12 @@ impl<T: Bufferable> BufferSender<T> {
248244
);
249245
}
250246
}
247+
if (sent_to_base || was_dropped)
248+
&& let (Some(send_duration), Some(send_reference)) =
249+
(self.send_duration.as_ref(), send_reference)
250+
{
251+
send_duration.emit(send_reference.elapsed());
252+
}
251253

252254
Ok(())
253255
}

0 commit comments

Comments
 (0)