Skip to content

Conversation

fanatid
Copy link
Contributor

@fanatid fanatid commented Jan 6, 2025

No description provided.

Copy link
Contributor

@lvboudre lvboudre left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conflicting files.

@@ -1089,7 +1106,7 @@ impl GrpcService {

#[tonic::async_trait]
impl Geyser for GrpcService {
type SubscribeStream = ReceiverStream<TonicResult<FilteredUpdate>>;
type SubscribeStream = ReceiverStream;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use BoxStream<'static, TonicResult<FilteredUpdated>> instead

Comment on lines +1322 to +1360
pub struct ReceiverStream {
rx: mpsc::Receiver<TonicResult<FilteredUpdate>>,
metric_connection_slot_lag: bool,
max_slot: Slot,
}

impl ReceiverStream {
const fn new(
rx: mpsc::Receiver<TonicResult<FilteredUpdate>>,
metric_connection_slot_lag: bool,
) -> Self {
Self {
rx,
metric_connection_slot_lag,
max_slot: 0,
}
}
}

impl Stream for ReceiverStream {
type Item = TonicResult<FilteredUpdate>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let value = futures::ready!(self.rx.poll_recv(cx));
if self.metric_connection_slot_lag {
if let Some(slot) = value
.as_ref()
.and_then(|item| item.as_ref().map(|item| item.message.get_slot()).ok())
.flatten()
{
if slot > self.max_slot {
self.max_slot = slot;
metrics::connections_slot_lag_observe(slot);
}
}
}
Poll::Ready(value)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use boxed the tokio stream and use map instead of implementing our own Stream trait.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants