Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions zlink-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ pub use server::{
Server,
};
mod call;
#[cfg(feature = "server")]
#[doc(hidden)]
pub mod notified;
pub use call::Call;
pub mod reply;
pub use reply::Reply;
Expand Down
52 changes: 52 additions & 0 deletions zlink-core/src/notified.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
//! Notified state API traits.
//!
//! This module defines traits that document the expected API for notified types. Runtime crates
//! (like `zlink-tokio` and `zlink-smol`) implement these traits on their concrete types.

use core::{fmt::Debug, future::Future};

use crate::Reply;

/// Trait for a notified state that tracks a value and broadcasts changes.
///
/// This is useful for implementing service properties that notify subscribers when they change.
pub trait State<T, ReplyParams>: Clone
where
T: Into<ReplyParams> + Clone + Debug,
ReplyParams: Clone + Send + 'static + Debug,
{
/// The stream type returned by [`stream`](Self::stream).
type Stream: futures_util::Stream<Item = Reply<ReplyParams>>;

/// Create a new notified state with the given initial value.
fn new(value: T) -> Self;

/// Set the value and notify all listeners.
fn set(&mut self, value: T) -> impl Future<Output = ()> + Send;

/// Get the current value.
fn get(&self) -> T;

/// Get a stream of replies for this state.
fn stream(&self) -> Self::Stream;
}

/// Trait for a one-shot notification (useful for method call handlers).
///
/// This is useful for handling method calls in a separate task/thread, where the result is sent
/// back once.
pub trait Once<ReplyParams>: Sized
where
ReplyParams: Send + 'static + Debug,
{
/// The stream type returned by [`new`](Self::new).
type Stream: futures_util::Stream<Item = Reply<ReplyParams>>;

/// Create a new one-shot notifier and its corresponding stream.
fn new() -> (Self, Self::Stream);

/// Send the notification value. Consumes self.
fn notify<T>(self, value: T)
where
T: Into<ReplyParams> + Debug;
}
1 change: 1 addition & 0 deletions zlink-smol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@
#![cfg_attr(not(doctest), doc = include_str!("../README.md"))]

pub use zlink_core::*;
#[cfg(feature = "server")]
pub mod notified;
pub mod unix;
175 changes: 0 additions & 175 deletions zlink-smol/src/notified.rs

This file was deleted.

8 changes: 8 additions & 0 deletions zlink-smol/src/notified/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
//! Convenience API for maintaining state, that notifies on changes.

mod once;
mod state;

pub use once::{Once, Stream as OnceStream};
pub use state::{State, Stream};
pub use zlink_core::notified as traits;
80 changes: 80 additions & 0 deletions zlink-smol/src/notified/once.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use std::{
fmt::Debug,
pin::Pin,
task::{Context, Poll},
};

use crate::Reply;
use async_channel::{bounded, Receiver as OneshotReceiver, Sender as OneshotSender};
use pin_project_lite::pin_project;

/// A one-shot notified state of a service implementation.
///
/// This is useful for handling method calls in a separate task/thread.
#[derive(Debug)]
pub struct Once<ReplyParams> {
tx: OneshotSender<ReplyParams>,
}

impl<ReplyParams> zlink_core::notified::Once<ReplyParams> for Once<ReplyParams>
where
ReplyParams: Send + 'static + Debug,
{
type Stream = Stream<ReplyParams>;

/// Create a new notified oneshot state.
fn new() -> (Self, Stream<ReplyParams>) {
let (tx, rx) = bounded(1);

(
Self { tx },
Stream {
inner: rx,
terminated: false,
},
)
}

/// Set the value of the notified field and notify all listeners.
fn notify<T>(self, value: T)
where
T: Into<ReplyParams> + Debug,
{
// Failure means that we dropped the receiver stream internally before it received anything
// and that's a big bug that must not happen.
self.tx.try_send(value.into()).unwrap();
}
}

pin_project! {
/// The stream to use as the [`crate::Service::ReplyStream`] in service implementation when
/// using [`Once`].
#[derive(Debug)]
pub struct Stream<ReplyParams> {
#[pin]
inner: OneshotReceiver<ReplyParams>,
terminated: bool,
}
}

impl<ReplyParams> futures_util::Stream for Stream<ReplyParams>
where
ReplyParams: Send + 'static,
{
type Item = Reply<ReplyParams>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
if *this.terminated {
return Poll::Ready(None);
}

match futures_util::ready!(this.inner.poll_next(cx)) {
Some(reply) => {
*this.terminated = true;
Poll::Ready(Some(Reply::new(Some(reply)).set_continues(Some(false))))
}
None => Poll::Ready(None),
}
}
}
Loading