-
Couldn't load subscription status.
- Fork 130
chore(ups): add message chunking & ups protocol #2874
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore(ups): add message chunking & ups protocol #2874
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub.
1 Skipped Deployment
|
|
Claude encountered an error —— View job I'll analyze this and get back to you. |
commit: |
| async fn next(&mut self) -> Result<DriverOutput> { | ||
| loop { | ||
| tokio::select! { | ||
| biased; | ||
| // Prefer local messages to reduce latency | ||
| res = self.local_rx.recv() => { | ||
| match res { | ||
| std::result::Result::Ok(payload) => { | ||
| return Ok(DriverOutput::Message { subject: self.subject.clone(), payload }); | ||
| } | ||
| std::result::Result::Err(broadcast::error::RecvError::Lagged(_)) => { | ||
| // Skip lagged and continue | ||
| continue; | ||
| } | ||
| std::result::Result::Err(broadcast::error::RecvError::Closed) => { | ||
| // Local channel closed; fall back to driver only | ||
| // Replace with a closed receiver to avoid busy loop | ||
| // We simply continue and rely on driver | ||
| } | ||
| } | ||
| } | ||
| res = self.driver.next() => { | ||
| return res; | ||
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There appears to be a potential issue in the error handling for broadcast::error::RecvError::Closed. The comment indicates that the code should replace the receiver with a closed one to avoid a busy loop, but the implementation doesn't actually do this. When the local channel is closed, the code continues the loop without modifying self.local_rx, which means it will repeatedly hit the same closed channel error.
Consider replacing self.local_rx with a permanently closed receiver when this error is encountered, perhaps by creating a new closed channel:
Err(broadcast::error::RecvError::Closed) => {
// Replace with a permanently closed receiver to avoid busy loop
let (tx, rx) = broadcast::channel::<Vec<u8>>(1);
drop(tx); // Close the channel
self.local_rx = rx;
continue;
}| async fn next(&mut self) -> Result<DriverOutput> { | |
| loop { | |
| tokio::select! { | |
| biased; | |
| // Prefer local messages to reduce latency | |
| res = self.local_rx.recv() => { | |
| match res { | |
| std::result::Result::Ok(payload) => { | |
| return Ok(DriverOutput::Message { subject: self.subject.clone(), payload }); | |
| } | |
| std::result::Result::Err(broadcast::error::RecvError::Lagged(_)) => { | |
| // Skip lagged and continue | |
| continue; | |
| } | |
| std::result::Result::Err(broadcast::error::RecvError::Closed) => { | |
| // Local channel closed; fall back to driver only | |
| // Replace with a closed receiver to avoid busy loop | |
| // We simply continue and rely on driver | |
| } | |
| } | |
| } | |
| res = self.driver.next() => { | |
| return res; | |
| } | |
| } | |
| } | |
| } | |
| async fn next(&mut self) -> Result<DriverOutput> { | |
| loop { | |
| tokio::select! { | |
| biased; | |
| // Prefer local messages to reduce latency | |
| res = self.local_rx.recv() => { | |
| match res { | |
| std::result::Result::Ok(payload) => { | |
| return Ok(DriverOutput::Message { subject: self.subject.clone(), payload }); | |
| } | |
| std::result::Result::Err(broadcast::error::RecvError::Lagged(_)) => { | |
| // Skip lagged and continue | |
| continue; | |
| } | |
| std::result::Result::Err(broadcast::error::RecvError::Closed) => { | |
| // Local channel closed; fall back to driver only | |
| // Replace with a closed receiver to avoid busy loop | |
| let (tx, rx) = broadcast::channel::<Vec<u8>>(1); | |
| drop(tx); // Close the channel | |
| self.local_rx = rx; | |
| continue; | |
| } | |
| } | |
| } | |
| res = self.driver.next() => { | |
| return res; | |
| } | |
| } | |
| } | |
| } |
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
9d59211 to
bf98d22
Compare
cf0534f to
90ce950
Compare
|
Claude encountered an error —— View job I'll analyze this and get back to you. |
|
Claude encountered an error —— View job I'll analyze this and get back to you. |
c140b96 to
f452a88
Compare
bf98d22 to
c9cff29
Compare
|
Claude encountered an error —— View job I'll analyze this and get back to you. |
c9cff29 to
a294c2e
Compare
|
Claude encountered an error —— View job I'll analyze this and get back to you. |
Merge activity
|
Fixes RVT-5129 Fixes RVT-5118

Fixes RVT-5129
Fixes RVT-5118