Skip to content

Commit bdf85e3

Browse files
authored
Merge pull request #1052 from input-output-hk/inbound-streaming-update
Update chain-deps for inbound streaming changes
2 parents 3478f14 + db49f59 commit bdf85e3

File tree

11 files changed

+652
-417
lines changed

11 files changed

+652
-417
lines changed

Cargo.lock

Lines changed: 101 additions & 90 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

chain-deps

jormungandr/src/intercom.rs

Lines changed: 93 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::blockcfg::{Block, Epoch, Fragment, FragmentId, Header, HeaderHash};
22
use crate::network::p2p::comm::PeerStats;
33
use crate::network::p2p::topology::NodeId;
4+
use crate::utils::async_msg::{self, MessageBox, MessageQueue};
45
use blockchain::Checkpoints;
56
use futures::prelude::*;
67
use futures::sync::{mpsc, oneshot};
@@ -31,6 +32,16 @@ impl Error {
3132
}
3233
}
3334

35+
pub fn aborted<T>(cause: T) -> Self
36+
where
37+
T: Into<Box<dyn error::Error + Send + Sync>>,
38+
{
39+
Error {
40+
code: core_error::Code::Aborted,
41+
cause: cause.into(),
42+
}
43+
}
44+
3445
pub fn canceled<T>(cause: T) -> Self
3546
where
3647
T: Into<Box<dyn error::Error + Send + Sync>>,
@@ -164,22 +175,21 @@ where
164175
type Error = E;
165176

166177
fn poll(&mut self) -> Poll<T, E> {
167-
let item = match self.receiver.poll() {
168-
Ok(Async::NotReady) => {
169-
return Ok(Async::NotReady);
178+
match self.receiver.poll() {
179+
Ok(Async::NotReady) => Ok(Async::NotReady),
180+
Ok(Async::Ready(Ok(item))) => {
181+
debug!(self.logger, "request processed");
182+
Ok(Async::Ready(item))
170183
}
171-
Ok(Async::Ready(Ok(item))) => item,
172184
Ok(Async::Ready(Err(e))) => {
173-
warn!(self.logger, "error processing request: {:?}", e);
174-
return Err(Error::from(e).into());
185+
info!(self.logger, "error processing request"; "reason" => %e);
186+
Err(e.into())
175187
}
176188
Err(oneshot::Canceled) => {
177189
warn!(self.logger, "response canceled by the processing task");
178-
return Err(Error::from(oneshot::Canceled).into());
190+
Err(Error::from(oneshot::Canceled).into())
179191
}
180-
};
181-
182-
Ok(Async::Ready(item))
192+
}
183193
}
184194
}
185195

@@ -279,63 +289,103 @@ where
279289
handler.close();
280290
}
281291

282-
pub struct RequestSink<T, E> {
283-
sender: mpsc::Sender<T>,
284-
_phantom_error: PhantomData<E>,
292+
#[derive(Debug)]
293+
pub struct RequestStreamHandle<T, R> {
294+
receiver: MessageQueue<T>,
295+
reply: ReplyHandle<R>,
296+
}
297+
298+
pub struct RequestSink<T, R, E> {
299+
sender: MessageBox<T>,
300+
reply_future: Option<ReplyFuture<R, E>>,
301+
logger: Logger,
302+
}
303+
304+
impl<T, R> RequestStreamHandle<T, R> {
305+
pub fn stream(&mut self) -> &mut MessageQueue<T> {
306+
&mut self.receiver
307+
}
308+
309+
/// Drops the request stream and returns the reply handle.
310+
pub fn into_reply(self) -> ReplyHandle<R> {
311+
self.reply
312+
}
313+
}
314+
315+
impl<T, R, E> RequestSink<T, R, E> {
316+
pub fn logger(&self) -> &Logger {
317+
&self.logger
318+
}
319+
320+
// This is for network which implements request_stream::MapResponse
321+
// for this type.
322+
pub fn take_reply_future(&mut self) -> ReplyFuture<R, E> {
323+
self.reply_future
324+
.take()
325+
.expect("there can be only one waiting for the reply")
326+
}
285327
}
286328

287-
fn convert_request_sender_error<T, E>(_err: mpsc::SendError<T>) -> E
329+
impl<T, R, E> RequestSink<T, R, E>
288330
where
289331
E: From<Error>,
290332
{
291-
Error::canceled("request stream processing ended before all items were sent").into()
333+
fn map_send_error(&self, err: mpsc::SendError<T>, msg: &'static str) -> E {
334+
debug!(
335+
self.logger,
336+
"{}", msg;
337+
);
338+
Error::aborted("request stream processing ended before all items were sent").into()
339+
}
292340
}
293341

294-
impl<T, E> Sink for RequestSink<T, E>
342+
impl<T, R, E> Sink for RequestSink<T, R, E>
295343
where
296344
E: From<Error>,
297345
{
298346
type SinkItem = T;
299347
type SinkError = E;
300348

301349
fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
302-
self.sender
303-
.start_send(item)
304-
.map_err(convert_request_sender_error)
350+
self.sender.start_send(item).map_err(|e| {
351+
self.map_send_error(
352+
e,
353+
"request stream processing ended before receiving some items",
354+
)
355+
})
305356
}
306357

307358
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
308-
self.sender
309-
.poll_complete()
310-
.map_err(convert_request_sender_error)
359+
self.sender.poll_complete().map_err(|e| {
360+
self.map_send_error(
361+
e,
362+
"request stream processing ended before receiving some items",
363+
)
364+
})
311365
}
312366

313367
fn close(&mut self) -> Poll<(), Self::SinkError> {
314-
self.sender.close().map_err(convert_request_sender_error)
315-
}
316-
}
317-
318-
#[derive(Debug)]
319-
pub struct RequestStreamHandle<T> {
320-
receiver: mpsc::Receiver<T>,
321-
}
322-
323-
impl<T> Stream for RequestStreamHandle<T> {
324-
type Item = T;
325-
type Error = Error;
326-
327-
fn poll(&mut self) -> Poll<Option<T>, Error> {
328-
let async_poll = self.receiver.poll().unwrap();
329-
Ok(async_poll)
368+
self.sender.close().map_err(|e| {
369+
self.map_send_error(
370+
e,
371+
"request stream processing channel did not close gracefully, \
372+
the task possibly failed to receive some items",
373+
)
374+
})
330375
}
331376
}
332377

333-
pub fn stream_request<T, E>(buffer: usize) -> (RequestStreamHandle<T>, RequestSink<T, E>) {
334-
let (sender, receiver) = mpsc::channel(buffer);
335-
let handle = RequestStreamHandle { receiver };
378+
pub fn stream_request<T, R, E>(
379+
buffer: usize,
380+
logger: Logger,
381+
) -> (RequestStreamHandle<T, R>, RequestSink<T, R, E>) {
382+
let (sender, receiver) = async_msg::channel(buffer);
383+
let (reply, reply_future) = unary_reply(logger.clone());
384+
let handle = RequestStreamHandle { receiver, reply };
336385
let sink = RequestSink {
337386
sender,
338-
_phantom_error: PhantomData,
387+
reply_future: Some(reply_future),
388+
logger,
339389
};
340390
(handle, sink)
341391
}
@@ -413,7 +463,7 @@ pub enum BlockMsg {
413463
/// The stream of headers for missing chain blocks has been received
414464
/// from the network in response to a PullHeaders request or a Missing
415465
/// solicitation event.
416-
ChainHeaders(RequestStreamHandle<Header>),
466+
ChainHeaders(RequestStreamHandle<Header, ()>),
417467
}
418468

419469
/// Propagation requests for the network task.

jormungandr/src/main.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,8 @@ fn start_services(bootstrapped_node: BootstrappedNode) -> Result<(), start_up::E
179179
let mut network_msgbox = network_msgbox.clone();
180180
let mut fragment_msgbox = fragment_msgbox.clone();
181181
let mut explorer_msg_box = explorer.as_ref().map(|(msg_box, _context)| msg_box.clone());
182+
// TODO: we should get this value from the configuration
183+
let block_cache_ttl: Duration = Duration::from_secs(3600);
182184
let stats_counter = stats_counter.clone();
183185
services.spawn_future_with_inputs("block", move |info, input| {
184186
blockchain::handle_input(

0 commit comments

Comments
 (0)