Skip to content

Commit

Permalink
Keep the Channel in clients receivers & senders
Browse files Browse the repository at this point in the history
Channel contains the Arc<ChannelInner> so that
```rust
let streaming_stuff = Client::new(init_channel()).some_call();
```
does not die due to channel being dropped at the end of the line.
  • Loading branch information
Ten0 committed Oct 28, 2019
1 parent 963c6c3 commit 2ba6b4b
Showing 1 changed file with 42 additions and 8 deletions.
50 changes: 42 additions & 8 deletions src/call/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,12 @@ impl Call {
tag,
)
});
Ok(ClientUnaryReceiver::new(call, cq_f, method.resp_de()))
Ok(ClientUnaryReceiver::new(
call,
cq_f,
method.resp_de(),
channel,
))
}

pub fn client_streaming<Req, Resp>(
Expand All @@ -151,11 +156,12 @@ impl Call {
});

let share_call = Arc::new(SpinLock::new(ShareCall::new(call, cq_f)));
let sink = ClientCStreamSender::new(share_call.clone(), method.req_ser());
let sink = ClientCStreamSender::new(share_call.clone(), method.req_ser(), channel);
let recv = ClientCStreamReceiver {
call: share_call,
resp_de: method.resp_de(),
finished: false,
_channel_keepalive: channel.clone(),
};
Ok((sink, recv))
}
Expand Down Expand Up @@ -189,7 +195,12 @@ impl Call {
grpc_sys::grpcwrap_call_recv_initial_metadata(call.call, ctx, tag)
});

Ok(ClientSStreamReceiver::new(call, cq_f, method.resp_de()))
Ok(ClientSStreamReceiver::new(
call,
cq_f,
method.resp_de(),
channel,
))
}

pub fn duplex_streaming<Req, Resp>(
Expand All @@ -216,8 +227,8 @@ impl Call {
});

let share_call = Arc::new(SpinLock::new(ShareCall::new(call, cq_f)));
let sink = ClientDuplexSender::new(share_call.clone(), method.req_ser());
let recv = ClientDuplexReceiver::new(share_call, method.resp_de());
let sink = ClientDuplexSender::new(share_call.clone(), method.req_ser(), channel);
let recv = ClientDuplexReceiver::new(share_call, method.resp_de(), channel);
Ok((sink, recv))
}
}
Expand All @@ -230,14 +241,21 @@ pub struct ClientUnaryReceiver<T> {
call: Call,
resp_f: BatchFuture,
resp_de: DeserializeFn<T>,
_channel_keepalive: Channel,
}

impl<T> ClientUnaryReceiver<T> {
fn new(call: Call, resp_f: BatchFuture, resp_de: DeserializeFn<T>) -> ClientUnaryReceiver<T> {
fn new(
call: Call,
resp_f: BatchFuture,
resp_de: DeserializeFn<T>,
_channel_keepalive: &Channel,
) -> ClientUnaryReceiver<T> {
ClientUnaryReceiver {
call,
resp_f,
resp_de,
_channel_keepalive: _channel_keepalive.clone(),
}
}

Expand Down Expand Up @@ -276,6 +294,7 @@ pub struct ClientCStreamReceiver<T> {
call: Arc<SpinLock<ShareCall>>,
resp_de: DeserializeFn<T>,
finished: bool,
_channel_keepalive: Channel,
}

impl<T> ClientCStreamReceiver<T> {
Expand Down Expand Up @@ -326,15 +345,21 @@ pub struct StreamingCallSink<Req> {
sink_base: SinkBase,
close_f: Option<BatchFuture>,
req_ser: SerializeFn<Req>,
_channel_keepalive: Channel,
}

impl<Req> StreamingCallSink<Req> {
fn new(call: Arc<SpinLock<ShareCall>>, req_ser: SerializeFn<Req>) -> StreamingCallSink<Req> {
fn new(
call: Arc<SpinLock<ShareCall>>,
req_ser: SerializeFn<Req>,
_channel_keepalive: &Channel,
) -> StreamingCallSink<Req> {
StreamingCallSink {
call,
sink_base: SinkBase::new(false),
close_f: None,
req_ser,
_channel_keepalive: _channel_keepalive.clone(),
}
}

Expand Down Expand Up @@ -490,17 +515,20 @@ impl<H: ShareCallHolder, T> ResponseStreamImpl<H, T> {
#[must_use = "if unused the ClientSStreamReceiver may immediately cancel the RPC"]
pub struct ClientSStreamReceiver<Resp> {
imp: ResponseStreamImpl<ShareCall, Resp>,
_channel_keepalive: Channel,
}

impl<Resp> ClientSStreamReceiver<Resp> {
fn new(
call: Call,
finish_f: BatchFuture,
de: DeserializeFn<Resp>,
_channel_keepalive: &Channel,
) -> ClientSStreamReceiver<Resp> {
let share_call = ShareCall::new(call, finish_f);
ClientSStreamReceiver {
imp: ResponseStreamImpl::new(share_call, de),
_channel_keepalive: _channel_keepalive.clone(),
}
}

Expand Down Expand Up @@ -528,12 +556,18 @@ impl<Resp> Stream for ClientSStreamReceiver<Resp> {
#[must_use = "if unused the ClientDuplexReceiver may immediately cancel the RPC"]
pub struct ClientDuplexReceiver<Resp> {
imp: ResponseStreamImpl<Arc<SpinLock<ShareCall>>, Resp>,
_channel_keepalive: Channel,
}

impl<Resp> ClientDuplexReceiver<Resp> {
fn new(call: Arc<SpinLock<ShareCall>>, de: DeserializeFn<Resp>) -> ClientDuplexReceiver<Resp> {
fn new(
call: Arc<SpinLock<ShareCall>>,
de: DeserializeFn<Resp>,
_channel_keepalive: &Channel,
) -> ClientDuplexReceiver<Resp> {
ClientDuplexReceiver {
imp: ResponseStreamImpl::new(call, de),
_channel_keepalive: _channel_keepalive.clone(),
}
}

Expand Down

0 comments on commit 2ba6b4b

Please sign in to comment.