Skip to content

Commit

Permalink
feat: add minitrace-futures to support instrumentation for futures (
Browse files Browse the repository at this point in the history
#202)

* init subrepo

Signed-off-by: iGxnon <[email protected]>

* implement minitrace-futures

Signed-off-by: iGxnon <[email protected]>

---------

Signed-off-by: iGxnon <[email protected]>
Co-authored-by: Andy Lok <[email protected]>
  • Loading branch information
iGxnon and andylokandy authored Feb 7, 2024
1 parent 8a52751 commit 9194409
Show file tree
Hide file tree
Showing 4 changed files with 189 additions and 0 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ members = [
"minitrace-jaeger",
"minitrace-datadog",
"minitrace-opentelemetry",
"minitrace-futures",
"test-statically-disable",
]

Expand Down
21 changes: 21 additions & 0 deletions minitrace-futures/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
[package]
name = "minitrace-futures"
version = "0.6.3"
edition = "2021"
license = "Apache-2.0"
description = "Utilities for instrumenting `futures` with minitrace-rust"
homepage = "https://github.com/tikv/minitrace-rust"
repository = "https://github.com/tikv/minitrace-rust"
documentation = "https://docs.rs/minitrace-futures"
readme = "README.md"
categories = ["development-tools::debugging"]
keywords = ["tracing", "span", "datadog", "jaeger", "opentelemetry"]

[dependencies]
futures = "0.3"
minitrace = { version = "0.6.3", path = "../minitrace" }
pin-project-lite = "0.2.13"

[dev-dependencies]
async-stream = "0.3.5"
tokio = { version = "1", features = ["rt", "time", "macros"] }
7 changes: 7 additions & 0 deletions minitrace-futures/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# minitrace-futures

[![Documentation](https://docs.rs/minitrace-futures/badge.svg)](https://docs.rs/minitrace-futures/)
[![Crates.io](https://img.shields.io/crates/v/minitrace-futures.svg)](https://crates.io/crates/minitrace-futures)
[![LICENSE](https://img.shields.io/github/license/tikv/minitrace-rust.svg)](https://github.com/tikv/minitrace-rust/blob/master/LICENSE)

Some utilities for instrumenting `futures` with [`minitrace`](https://crates.io/crates/minitrace).
160 changes: 160 additions & 0 deletions minitrace-futures/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
#![doc = include_str!("../README.md")]

use std::pin::Pin;
use std::task::Context;
use std::task::Poll;

use futures::Sink;
use futures::Stream;
use minitrace::Span;
use pin_project_lite::pin_project;

// There is no boundary in order to support types that
// implement both stream and sink.
impl<T> Instrumented for T {}

/// Instrument [`futures::Stream`]s and [`futures::Sink`]s.
pub trait Instrumented: Sized {
/// For [`Stream`]s :
///
/// Binds a [`Span`] to the [`Stream`] that continues to record until the stream is **finished**.
///
/// In addition, it sets the span as the local parent at every poll so that [`minitrace::local::LocalSpan`]
/// becomes available within the future. Internally, it calls [`Span::set_local_parent`] when
/// the executor polls it.
///
/// # Examples:
///
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use async_stream::stream;
/// use futures::StreamExt;
/// use minitrace::prelude::*;
/// use minitrace_futures::*;
///
/// let root = Span::root("Root", SpanContext::random());
/// let s = stream! {
/// for i in 0..2 {
/// yield i;
/// }
/// }
/// .in_span(Span::enter_with_parent("Task", &root));
///
/// tokio::pin!(s);
///
/// assert_eq!(s.next().await.unwrap(), 0);
/// assert_eq!(s.next().await.unwrap(), 1);
/// assert_eq!(s.next().await, None);
/// // span ends here.
///
/// # }
/// ```
///
/// For [`Sink`]s :
///
/// Binds a [`Span`] to the [`Sink`] that continues to record until the sink is **closed**.
///
/// In addition, it sets the span as the local parent at every poll so that [`minitrace::local::LocalSpan`]
/// becomes available within the future. Internally, it calls [`Span::set_local_parent`] when
/// the executor polls it.
///
/// # Examples:
///
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use futures::sink;
/// use futures::sink::SinkExt;
/// use minitrace::prelude::*;
/// use minitrace_futures::*;
///
/// let root = Span::root("Root", SpanContext::random());
///
/// let mut drain = sink::drain().in_span(Span::enter_with_parent("Task", &root));
///
/// drain.send(1).await.unwrap();
/// drain.send(2).await.unwrap();
/// drain.close().await.unwrap();
/// // span ends here.
///
/// # }
/// ```
fn in_span(self, span: Span) -> InSpan<Self> {
InSpan {
inner: self,
span: Some(span),
}
}
}

pin_project! {
pub struct InSpan<T> {
#[pin]
inner: T,
span: Option<Span>,
}
}

impl<T> Stream for InSpan<T>
where T: Stream
{
type Item = T::Item;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();

let _guard = this.span.as_ref().map(|s| s.set_local_parent());
let res = this.inner.poll_next(cx);

match res {
r @ Poll::Pending => r,
r @ Poll::Ready(None) => {
// finished
this.span.take();
r
}
other => other,
}
}
}

impl<T, I> Sink<I> for InSpan<T>
where T: Sink<I>
{
type Error = T::Error;

fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let this = self.project();
let _guard = this.span.as_ref().map(|s| s.set_local_parent());
this.inner.poll_ready(cx)
}

fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
let this = self.project();
let _guard = this.span.as_ref().map(|s| s.set_local_parent());
this.inner.start_send(item)
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let this = self.project();
let _guard = this.span.as_ref().map(|s| s.set_local_parent());
this.inner.poll_flush(cx)
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let this = self.project();

let _guard = this.span.as_ref().map(|s| s.set_local_parent());
let res = this.inner.poll_close(cx);

match res {
r @ Poll::Pending => r,
other => {
// closed
this.span.take();
other
}
}
}
}

0 comments on commit 9194409

Please sign in to comment.