Skip to content

Commit 67ce533

Browse files
committed
PR review
1 parent 50e3580 commit 67ce533

File tree

2 files changed

+46
-28
lines changed

2 files changed

+46
-28
lines changed

examples/limit.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ fn limit_by_node_id(allowed_nodes: HashSet<NodeId>) -> EventSender {
8181
let mask = EventMask {
8282
// We want a request for each incoming connection so we can accept
8383
// or reject them. We don't need any other events.
84-
connected: ConnectMode::Request,
84+
connected: ConnectMode::Intercept,
8585
..EventMask::DEFAULT
8686
};
8787
let (tx, mut rx) = EventSender::channel(32, mask);
@@ -108,7 +108,7 @@ fn limit_by_hash(allowed_hashes: HashSet<Hash>) -> EventSender {
108108
// We want to get a request for each get request that we can answer
109109
// with OK or not OK depending on the hash. We do not want detailed
110110
// events once it has been decided to handle a request.
111-
get: RequestMode::Request,
111+
get: RequestMode::Intercept,
112112
..EventMask::DEFAULT
113113
};
114114
let (tx, mut rx) = EventSender::channel(32, mask);
@@ -136,7 +136,7 @@ fn throttle(delay_ms: u64) -> EventSender {
136136
let mask = EventMask {
137137
// We want to get requests for each sent user data blob, so we can add a delay.
138138
// Other than that, we don't need any events.
139-
throttle: ThrottleMode::Throttle,
139+
throttle: ThrottleMode::Intercept,
140140
..EventMask::DEFAULT
141141
};
142142
let (tx, mut rx) = EventSender::channel(32, mask);
@@ -190,7 +190,7 @@ fn limit_max_connections(max_connections: usize) -> EventSender {
190190
// based on the current connection count if we want to accept or reject.
191191
// We also want detailed logging of events for the get request, so we can
192192
// detect when the request is finished one way or another.
193-
connected: ConnectMode::Request,
193+
connected: ConnectMode::Intercept,
194194
..EventMask::DEFAULT
195195
};
196196
let (tx, mut rx) = EventSender::channel(32, mask);

src/provider/events.rs

Lines changed: 42 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use crate::{
1616
Hash,
1717
};
1818

19+
/// Mode for connect events.
1920
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
2021
#[repr(u8)]
2122
pub enum ConnectMode {
@@ -25,9 +26,10 @@ pub enum ConnectMode {
2526
/// We get a notification for connect events.
2627
Notify,
2728
/// We get a request for connect events and can reject incoming connections.
28-
Request,
29+
Intercept,
2930
}
3031

32+
/// Request mode for observe requests.
3133
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
3234
#[repr(u8)]
3335
pub enum ObserveMode {
@@ -37,9 +39,10 @@ pub enum ObserveMode {
3739
/// We get a notification for connect events.
3840
Notify,
3941
/// We get a request for connect events and can reject incoming connections.
40-
Request,
42+
Intercept,
4143
}
4244

45+
/// Request mode for all data related requests.
4346
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
4447
#[repr(u8)]
4548
pub enum RequestMode {
@@ -49,32 +52,39 @@ pub enum RequestMode {
4952
/// We get a notification for each request, but no transfer events.
5053
Notify,
5154
/// We get a request for each request, and can reject incoming requests, but no transfer events.
52-
Request,
55+
Intercept,
5356
/// We get a notification for each request as well as detailed transfer events.
5457
NotifyLog,
5558
/// We get a request for each request, and can reject incoming requests.
5659
/// We also get detailed transfer events.
57-
RequestLog,
60+
InterceptLog,
5861
/// This request type is completely disabled. All requests will be rejected.
62+
///
63+
/// This means that requests of this kind will always be rejected, whereas
64+
/// None means that we don't get any events, but requests will be processed normally.
5965
Disabled,
6066
}
6167

68+
/// Throttling mode for requests that support throttling.
6269
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
6370
#[repr(u8)]
6471
pub enum ThrottleMode {
6572
/// We don't get these kinds of events at all
6673
#[default]
6774
None,
6875
/// We call throttle to give the event handler a way to throttle requests
69-
Throttle,
76+
Intercept,
7077
}
7178

7279
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
7380
pub enum AbortReason {
81+
/// The request was aborted because a limit was exceeded. It is OK to try again later.
7482
RateLimited,
83+
/// The request was aborted because the client does not have permission to perform the operation.
7584
Permission,
7685
}
7786

87+
/// Errors that can occur when sending progress updates.
7888
#[derive(Debug, Snafu)]
7989
pub enum ProgressError {
8090
Limit,
@@ -141,6 +151,10 @@ impl From<irpc::channel::SendError> for ProgressError {
141151
pub type EventResult = Result<(), AbortReason>;
142152
pub type ClientResult = Result<(), ProgressError>;
143153

154+
/// Event mask to configure which events are sent to the event handler.
155+
///
156+
/// This can also be used to completely disable certain request types. E.g.
157+
/// push requests are disabled by default, as they can write to the local store.
144158
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
145159
pub struct EventMask {
146160
/// Connection event mask
@@ -180,12 +194,12 @@ impl EventMask {
180194
/// need to do it manually. Providing constants that have push enabled would
181195
/// risk misuse.
182196
pub const ALL_READONLY: Self = Self {
183-
connected: ConnectMode::Request,
184-
get: RequestMode::RequestLog,
185-
get_many: RequestMode::RequestLog,
197+
connected: ConnectMode::Intercept,
198+
get: RequestMode::InterceptLog,
199+
get_many: RequestMode::InterceptLog,
186200
push: RequestMode::Disabled,
187-
throttle: ThrottleMode::Throttle,
188-
observe: ObserveMode::Request,
201+
throttle: ThrottleMode::Intercept,
202+
observe: ObserveMode::Intercept,
189203
};
190204
}
191205

@@ -239,7 +253,7 @@ impl RequestTracker {
239253
throttle: None,
240254
};
241255

242-
/// Transfer for index `index` started, size `size`
256+
/// Transfer for index `index` started, size `size` in bytes.
243257
pub async fn transfer_started(&self, index: u64, hash: &Hash, size: u64) -> irpc::Result<()> {
244258
if let RequestUpdates::Active(tx) = &self.updates {
245259
tx.send(
@@ -333,7 +347,10 @@ impl EventSender {
333347
}
334348
while let Some(msg) = rx.recv().await {
335349
match msg {
336-
ProviderMessage::ClientConnected(_) => todo!(),
350+
ProviderMessage::ClientConnected(msg) => {
351+
trace!("{:?}", msg.inner);
352+
msg.tx.send(Ok(())).await.ok();
353+
}
337354
ProviderMessage::ClientConnectedNotify(msg) => {
338355
trace!("{:?}", msg.inner);
339356
}
@@ -395,7 +412,7 @@ impl EventSender {
395412
match self.mask.connected {
396413
ConnectMode::None => {}
397414
ConnectMode::Notify => client.notify(Notify(f())).await?,
398-
ConnectMode::Request => client.rpc(f()).await??,
415+
ConnectMode::Intercept => client.rpc(f()).await??,
399416
}
400417
};
401418
Ok(())
@@ -445,7 +462,7 @@ impl EventSender {
445462
client.unwrap().notify_streaming(Notify(msg), 32).await?,
446463
)
447464
}
448-
RequestMode::Request if client.is_some() => {
465+
RequestMode::Intercept if client.is_some() => {
449466
let msg = RequestReceived {
450467
request: f(),
451468
connection_id,
@@ -464,7 +481,7 @@ impl EventSender {
464481
};
465482
RequestUpdates::Active(client.unwrap().notify_streaming(Notify(msg), 32).await?)
466483
}
467-
RequestMode::RequestLog if client.is_some() => {
484+
RequestMode::InterceptLog if client.is_some() => {
468485
let msg = RequestReceived {
469486
request: f(),
470487
connection_id,
@@ -491,7 +508,7 @@ impl EventSender {
491508
) -> RequestTracker {
492509
let throttle = match self.mask.throttle {
493510
ThrottleMode::None => None,
494-
ThrottleMode::Throttle => self
511+
ThrottleMode::Intercept => self
495512
.inner
496513
.clone()
497514
.map(|client| (client, connection_id, request_id)),
@@ -515,38 +532,39 @@ pub enum ProviderProto {
515532
#[rpc(tx = NoSender)]
516533
ConnectionClosed(ConnectionClosed),
517534

518-
#[rpc(rx = mpsc::Receiver<RequestUpdate>, tx = oneshot::Sender<EventResult>)]
519535
/// A new get request was received from the provider.
536+
#[rpc(rx = mpsc::Receiver<RequestUpdate>, tx = oneshot::Sender<EventResult>)]
520537
GetRequestReceived(RequestReceived<GetRequest>),
521538

539+
/// A new get request was received from the provider (notify variant).
522540
#[rpc(rx = mpsc::Receiver<RequestUpdate>, tx = NoSender)]
523-
/// A new get request was received from the provider.
524541
GetRequestReceivedNotify(Notify<RequestReceived<GetRequest>>),
525542

526-
/// A new get request was received from the provider.
543+
/// A new get many request was received from the provider.
527544
#[rpc(rx = mpsc::Receiver<RequestUpdate>, tx = oneshot::Sender<EventResult>)]
528545
GetManyRequestReceived(RequestReceived<GetManyRequest>),
529546

530-
/// A new get request was received from the provider.
547+
/// A new get many request was received from the provider (notify variant).
531548
#[rpc(rx = mpsc::Receiver<RequestUpdate>, tx = NoSender)]
532549
GetManyRequestReceivedNotify(Notify<RequestReceived<GetManyRequest>>),
533550

534-
/// A new get request was received from the provider.
551+
/// A new push request was received from the provider.
535552
#[rpc(rx = mpsc::Receiver<RequestUpdate>, tx = oneshot::Sender<EventResult>)]
536553
PushRequestReceived(RequestReceived<PushRequest>),
537554

538-
/// A new get request was received from the provider.
555+
/// A new push request was received from the provider (notify variant).
539556
#[rpc(rx = mpsc::Receiver<RequestUpdate>, tx = NoSender)]
540557
PushRequestReceivedNotify(Notify<RequestReceived<PushRequest>>),
541558

542-
/// A new get request was received from the provider.
559+
/// A new observe request was received from the provider.
543560
#[rpc(rx = mpsc::Receiver<RequestUpdate>, tx = oneshot::Sender<EventResult>)]
544561
ObserveRequestReceived(RequestReceived<ObserveRequest>),
545562

546-
/// A new get request was received from the provider.
563+
/// A new observe request was received from the provider (notify variant).
547564
#[rpc(rx = mpsc::Receiver<RequestUpdate>, tx = NoSender)]
548565
ObserveRequestReceivedNotify(Notify<RequestReceived<ObserveRequest>>),
549566

567+
/// Request to throttle sending for a specific data request.
550568
#[rpc(tx = oneshot::Sender<EventResult>)]
551569
Throttle(Throttle),
552570
}

0 commit comments

Comments
 (0)