Skip to content
4 changes: 2 additions & 2 deletions docs/docs/hydro/reference/slices-atomicity/slices.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ let payloads_with_leader = sliced! {
let all_payloads = unsent_payloads.chain(payload_batch);

// If no leader, buffer everything; otherwise clear the buffer
unsent_payloads = all_payloads.clone().filter_if_none(latest_leader.clone());
unsent_payloads = all_payloads.clone().filter_if(latest_leader.clone().is_none());
all_payloads.cross_singleton(latest_leader)
};
```
Expand Down Expand Up @@ -214,6 +214,6 @@ let periodic_output = sliced! {
let state = use(current_state, nondet!(/** snapshot timing is non-deterministic */));

// Only emit when there is at least one tick in this batch
state.filter_if_some(tick_batch.first()).into_stream()
state.filter_if(tick_batch.first().is_some()).into_stream()
};
```
5 changes: 3 additions & 2 deletions hydro_lang/src/live_collections/keyed_stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2758,11 +2758,12 @@ mod tests {
let tick_triggered_input = node_tick
.singleton(q!((3, 103)))
.into_stream()
.filter_if_some(
.filter_if(
tick_trigger
.clone()
.batch(&node_tick, nondet!(/** test */))
.first(),
.first()
.is_some(),
)
.all_ticks();

Expand Down
152 changes: 142 additions & 10 deletions hydro_lang/src/live_collections/optional.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ where
},
);

from_previous_tick.or(initial.filter_if_some(location.optional_first_tick(q!(()))))
from_previous_tick.or(initial.filter_if(location.optional_first_tick(q!(())).is_some()))
}
}

Expand Down Expand Up @@ -790,6 +790,99 @@ where
self.map(q!(|v| Some(v))).unwrap_or(none_singleton)
}

/// Returns a [`Singleton`] containing `true` if this optional has a value, `false` otherwise.
///
/// # Example
/// ```rust
/// # #[cfg(feature = "deploy")] {
/// # use hydro_lang::prelude::*;
/// # use futures::StreamExt;
/// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
/// let tick = process.tick();
/// // ticks are lazy by default, forces the second tick to run
/// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
///
/// let some_first_tick = tick.optional_first_tick(q!(42));
/// some_first_tick.is_some().all_ticks()
/// # }, |mut stream| async move {
/// // [true /* first tick */, false /* second tick */, ...]
/// # for w in vec![true, false] {
/// # assert_eq!(stream.next().await.unwrap(), w);
/// # }
/// # }));
/// # }
/// ```
#[expect(clippy::wrong_self_convention, reason = "Stream naming")]
pub fn is_some(self) -> Singleton<bool, L, B> {
self.map(q!(|_| ()))
.into_singleton()
.map(q!(|o| o.is_some()))
}

/// Returns a [`Singleton`] containing `true` if this optional is null, `false` otherwise.
///
/// # Example
/// ```rust
/// # #[cfg(feature = "deploy")] {
/// # use hydro_lang::prelude::*;
/// # use futures::StreamExt;
/// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
/// let tick = process.tick();
/// // ticks are lazy by default, forces the second tick to run
/// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
///
/// let some_first_tick = tick.optional_first_tick(q!(42));
/// some_first_tick.is_none().all_ticks()
/// # }, |mut stream| async move {
/// // [false /* first tick */, true /* second tick */, ...]
/// # for w in vec![false, true] {
/// # assert_eq!(stream.next().await.unwrap(), w);
/// # }
/// # }));
/// # }
/// ```
#[expect(clippy::wrong_self_convention, reason = "Stream naming")]
pub fn is_none(self) -> Singleton<bool, L, B> {
self.map(q!(|_| ()))
.into_singleton()
.map(q!(|o| o.is_none()))
}

/// Returns a [`Singleton`] containing `true` if both optionals are non-null and their
/// values are equal, `false` otherwise (including when either is null).
///
/// # Example
/// ```rust
/// # #[cfg(feature = "deploy")] {
/// # use hydro_lang::prelude::*;
/// # use futures::StreamExt;
/// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
/// let tick = process.tick();
/// // ticks are lazy by default, forces the second tick to run
/// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
///
/// let a = tick.optional_first_tick(q!(5)); // Some(5), None
/// let b = tick.optional_first_tick(q!(5)); // Some(5), None
/// a.is_some_and_equals(b).all_ticks()
/// # }, |mut stream| async move {
/// // [true, false]
/// # for w in vec![true, false] {
/// # assert_eq!(stream.next().await.unwrap(), w);
/// # }
/// # }));
/// # }
/// ```
#[expect(clippy::wrong_self_convention, reason = "Stream naming")]
pub fn is_some_and_equals(self, other: Optional<T, L, B>) -> Singleton<bool, L, B>
where
T: PartialEq + Clone,
B: IsBounded,
{
self.into_singleton()
.zip(other.into_singleton())
.map(q!(|(a, b)| a.is_some() && a == b))
}

/// An operator which allows you to "name" a `HydroNode`.
/// This is only used for testing, to correlate certain `HydroNode`s with IDs.
pub fn ir_node_named(self, name: &str) -> Optional<T, L, B> {
Expand Down Expand Up @@ -878,6 +971,47 @@ where
)
}

/// Filters this optional, passing through the value if the boolean signal is `true`,
/// otherwise the output is null.
///
/// # Example
/// ```rust
/// # #[cfg(feature = "deploy")] {
/// # use hydro_lang::prelude::*;
/// # use futures::StreamExt;
/// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
/// let tick = process.tick();
/// // ticks are lazy by default, forces the second tick to run
/// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
///
/// let some_first_tick = tick.optional_first_tick(q!(()));
/// let signal = some_first_tick.is_some(); // true on first tick, false on second
/// let batch_first_tick = process
/// .source_iter(q!(vec![456]))
/// .batch(&tick, nondet!(/** test */));
/// let batch_second_tick = process
/// .source_iter(q!(vec![789]))
/// .batch(&tick, nondet!(/** test */))
/// .defer_tick();
/// batch_first_tick.chain(batch_second_tick).first()
/// .filter_if(signal)
/// .unwrap_or(tick.singleton(q!(0)))
/// .all_ticks()
/// # }, |mut stream| async move {
/// // [456, 0]
/// # for w in vec![456, 0] {
/// # assert_eq!(stream.next().await.unwrap(), w);
/// # }
/// # }));
/// # }
/// ```
pub fn filter_if(self, signal: Singleton<bool, L, B>) -> Optional<T, L, B>
where
B: IsBounded,
{
self.zip(signal.filter(q!(|b| *b))).map(q!(|(d, _)| d))
}

/// Filters this optional, passing through the optional value if it is non-null **and** the
/// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
///
Expand Down Expand Up @@ -914,11 +1048,12 @@ where
/// # }));
/// # }
/// ```
#[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
pub fn filter_if_some<U>(self, signal: Optional<U, L, B>) -> Optional<T, L, B>
where
B: IsBounded,
{
self.zip(signal.map(q!(|_u| ()))).map(q!(|(d, _signal)| d))
self.filter_if(signal.is_some())
}

/// Filters this optional, passing through the optional value if it is non-null **and** the
Expand Down Expand Up @@ -957,16 +1092,12 @@ where
/// # }));
/// # }
/// ```
#[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
pub fn filter_if_none<U>(self, other: Optional<U, L, B>) -> Optional<T, L, B>
where
B: IsBounded,
{
self.filter_if_some(
other
.map(q!(|_| ()))
.into_singleton()
.filter(q!(|o| o.is_none())),
)
self.filter_if(other.is_none())
}

/// If `self` is null, emits a null optional, but if it non-null, emits `value`.
Expand Down Expand Up @@ -997,11 +1128,12 @@ where
/// # }));
/// # }
/// ```
#[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
pub fn if_some_then<U>(self, value: Singleton<U, L, B>) -> Optional<U, L, B>
where
B: IsBounded,
{
value.filter_if_some(self)
value.filter_if(self.is_some())
}
}

Expand Down Expand Up @@ -1132,7 +1264,7 @@ where
let tick = self.location.tick();

self.snapshot(&tick, nondet)
.filter_if_some(samples.batch(&tick, nondet).first())
.filter_if(samples.batch(&tick, nondet).first().is_some())
.all_ticks()
.weaken_retries()
}
Expand Down
Loading