Skip to content

feat(dfir_lang)!: use custom dfir_pipes::Pull trait [ci-bench]#2618

Merged
MingweiSamuel merged 4 commits intomainfrom
dfir-pipes
Mar 9, 2026
Merged

feat(dfir_lang)!: use custom dfir_pipes::Pull trait [ci-bench]#2618
MingweiSamuel merged 4 commits intomainfrom
dfir-pipes

Conversation

@MingweiSamuel
Copy link
Member

@MingweiSamuel MingweiSamuel commented Mar 2, 2026

This is the pull-half of a big change from using other iterators (std::iter::Iterator or futures_core::stream::Stream) to our own Pull trait. Key to this more powerful iterator trait is the step enum:

pub enum Step<Item, Meta, CanPend: Toggle, CanEnd: Toggle> {
    /// An item is ready with associated metadata.
    Ready(Item, Meta),
    /// The pull is not ready yet (only possible when `CanPend = Yes`).
    Pending(CanPend),
    /// The pull has ended (only possible when `CanEnd = Yes`).
    Ended(CanEnd),
}

This abstraction allows Pull to represent both synchronous Iterators and asynchronous Streams with zero cost. (As well as distinguishing between infinite vs finite iterators, which I guess is not actually that useful to us). In the future we will also add an Error variant (#2635). The Meta metadata field may be used for full record-level tracing (#2242).

This trait has some pseudo-specialization around Fuse, and further performance improvements may come from true nightly min_specialization, as well as from converting from Pusherator/Sink to a new Push trait.

Other changes:

  • Moves much of dfir_rs::compiled::pull into dfir_pipes, using new trait
  • Update itertools to 0.14

Copilot AI review requested due to automatic review settings March 2, 2026 19:30
@github-actions
Copy link
Contributor

github-actions bot commented Mar 2, 2026

📊 Benchmark Results

✅ Benchmark completed! You can download the results from the links below.

Run History:

Last updated: 2026-03-09T20:58:27.967Z

@cloudflare-workers-and-pages
Copy link

cloudflare-workers-and-pages bot commented Mar 2, 2026

Deploying hydro with  Cloudflare Pages  Cloudflare Pages

Latest commit: 22cf3f7
Status: ✅  Deploy successful!
Preview URL: https://3e98eddd.hydroflow.pages.dev
Branch Preview URL: https://dfir-pipes.hydroflow.pages.dev

View logs

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a new dfir_pipes pull-based streaming crate and migrates DFIR/Hydro “pull” operator codegen and runtime plumbing from futures::Stream/tokio_stream adapters to the new Pull API, with additional changes to snapshot tooling and workspace configuration.

Changes:

  • Add the new dfir_pipes crate (core Pull/Step types plus a set of pull combinators) and wire it into dfir_rs.
  • Update dfir_lang operator codegen to use dfir_pipes constructors/combinators and add a new symmetric hash join implementation for pull mode.
  • Make hydro_build_utils’s insta usage feature-gated and adjust dev-dependencies; tweak workspace membership and some benchmarks.

Reviewed changes

Copilot reviewed 91 out of 93 changed files in this pull request and generated 12 comments.

Show a summary per file
File Description
Cargo.toml Adds dfir_pipes to workspace members and comments out multiple hydro_* crates.
lattices_macro/Cargo.toml Enables hydro_build_utils’s insta feature in dev-deps.
hydro_test/Cargo.toml Enables hydro_build_utils’s insta feature in dev-deps.
hydro_lang/Cargo.toml Formats feature lists and enables hydro_build_utils/insta for dev-deps.
hydro_lang/src/deploy/deploy_graph.rs Adds a rustflags.is_none() condition when deciding linking mode.
hydro_build_utils/Cargo.toml Introduces an insta feature and makes insta optional.
hydro_build_utils/src/lib.rs Feature-gates insta re-export and snapshot macros behind insta.
dfir_rs/Cargo.toml Adds dependency on dfir_pipes and enables hydro_build_utils/insta for dev-deps.
dfir_rs/src/lib.rs Re-exports dfir_pipes from dfir_rs.
dfir_rs/src/declarative_macro.rs Adds #[cfg(test)] to assert_graphvis_snapshots!.
dfir_rs/src/compiled/pull/mod.rs Exposes the new symmetric_hash_join_pull module.
dfir_rs/src/compiled/pull/symmetric_hash_join_pull.rs Adds a pull-based symmetric hash join implementation and tests.
dfir_pipes/Cargo.toml New crate manifest for dfir_pipes (no_std + std/alloc features).
dfir_pipes/build.rs Emits nightly cfg via hydro_build_utils.
dfir_pipes/src/lib.rs Defines the Pull/Step/capability system and exposes combinator methods.
dfir_pipes/src/test_utils.rs Adds internal test pulls and type-assert helper for capability algebra tests.
dfir_pipes/src/chain.rs Implements Pull::chain and capability propagation + tests.
dfir_pipes/src/collect.rs Implements async Pull::collect.
dfir_pipes/src/empty.rs Implements an empty pull source.
dfir_pipes/src/enumerate.rs Implements Pull::enumerate.
dfir_pipes/src/filter.rs Implements Pull::filter.
dfir_pipes/src/filter_map.rs Implements Pull::filter_map.
dfir_pipes/src/flat_map.rs Implements Pull::flat_map.
dfir_pipes/src/flatten.rs Implements Pull::flatten.
dfir_pipes/src/for_each.rs Implements async Pull::for_each.
dfir_pipes/src/fuse.rs Implements Pull::fuse and FusedPull.
dfir_pipes/src/inspect.rs Implements Pull::inspect.
dfir_pipes/src/iter.rs Implements from_iter pull source.
dfir_pipes/src/map.rs Implements Pull::map.
dfir_pipes/src/merge.rs Implements Pull::merge and capability propagation + tests.
dfir_pipes/src/once.rs Implements a single-item pull source.
dfir_pipes/src/pull_fn.rs Implements from_fn pull source.
dfir_pipes/src/scan.rs Implements Pull::scan.
dfir_pipes/src/send_sink.rs Implements Pull::send_sink (pull-to-sink future).
dfir_pipes/src/skip.rs Implements Pull::skip.
dfir_pipes/src/skip_while.rs Implements Pull::skip_while.
dfir_pipes/src/source_stream.rs Implements from_stream_with_waker pull adapter.
dfir_pipes/src/stream.rs Implements from_stream pull adapter (Ctx = task Context).
dfir_pipes/src/take.rs Implements Pull::take.
dfir_pipes/src/take_while.rs Implements Pull::take_while.
dfir_lang/src/graph/ops/mod.rs Updates identity/null operator codegen to use dfir_pipes.
dfir_lang/src/graph/ops/anti_join.rs Migrates pull-path combinators to dfir_pipes.
dfir_lang/src/graph/ops/chain_first_n.rs Migrates pull-path take combinator usage.
dfir_lang/src/graph/ops/cross_join.rs Migrates pull-path map combinator usage.
dfir_lang/src/graph/ops/cross_join_multiset.rs Migrates pull-path accumulation + iter construction to dfir_pipes.
dfir_lang/src/graph/ops/defer_signal.rs Migrates iter construction to dfir_pipes.
dfir_lang/src/graph/ops/demux_enum.rs Migrates pull-path map combinator usage.
dfir_lang/src/graph/ops/difference.rs Migrates pull-path map combinator usage.
dfir_lang/src/graph/ops/enumerate.rs Migrates pull-path enumerate mapping to dfir_pipes.
dfir_lang/src/graph/ops/filter.rs Migrates pull-path filter to dfir_pipes.
dfir_lang/src/graph/ops/filter_map.rs Migrates pull-path filter_map to dfir_pipes.
dfir_lang/src/graph/ops/flat_map.rs Migrates pull-path flat_map/flatten to dfir_pipes.
dfir_lang/src/graph/ops/flatten.rs Migrates pull-path flatten to dfir_pipes.
dfir_lang/src/graph/ops/fold.rs Migrates pull-path for_each + once usage to dfir_pipes.
dfir_lang/src/graph/ops/fold_keyed.rs Migrates pull-path accumulation and iter construction to dfir_pipes.
dfir_lang/src/graph/ops/inspect.rs Migrates pull-path inspect to dfir_pipes.
dfir_lang/src/graph/ops/join.rs Switches pull join to new symmetric_hash_join_pull implementation and Pull types.
dfir_lang/src/graph/ops/join_fused.rs Migrates pull-path iter construction to dfir_pipes.
dfir_lang/src/graph/ops/join_fused_lhs.rs Migrates pull-path filter_map/iter construction to dfir_pipes.
dfir_lang/src/graph/ops/join_fused_rhs.rs Migrates pull-path map combinator usage.
dfir_lang/src/graph/ops/lattice_bimorphism.rs Migrates pull-path fuse usage to dfir_pipes.
dfir_lang/src/graph/ops/map.rs Migrates pull-path map to dfir_pipes.
dfir_lang/src/graph/ops/multiset_delta.rs Migrates pull-path filter to dfir_pipes.
dfir_lang/src/graph/ops/next_iteration.rs Migrates pull-path filter to dfir_pipes.
dfir_lang/src/graph/ops/persist.rs Migrates pull-path for_each + iter construction to dfir_pipes.
dfir_lang/src/graph/ops/persist_mut.rs Migrates pull-path iter construction to dfir_pipes.
dfir_lang/src/graph/ops/persist_mut_keyed.rs Migrates pull-path iter construction to dfir_pipes.
dfir_lang/src/graph/ops/prefix.rs Migrates pull-path iter construction to dfir_pipes.
dfir_lang/src/graph/ops/reduce.rs Migrates pull-path iter construction to dfir_pipes.
dfir_lang/src/graph/ops/reduce_keyed.rs Migrates pull-path iter construction to dfir_pipes.
dfir_lang/src/graph/ops/repeat_n.rs Migrates pull-path iter construction to dfir_pipes.
dfir_lang/src/graph/ops/resolve_futures.rs Migrates resolve_futures pull-path plumbing to dfir_pipes.
dfir_lang/src/graph/ops/scan.rs Migrates pull-path filter_map to dfir_pipes.
dfir_lang/src/graph/ops/sort.rs Migrates pull-path collect + iter construction to dfir_pipes.
dfir_lang/src/graph/ops/sort_by_key.rs Migrates pull-path collect + iter construction to dfir_pipes.
dfir_lang/src/graph/ops/source_file.rs Migrates pull-path map combinator usage.
dfir_lang/src/graph/ops/source_interval.rs Migrates pull-path map combinator usage.
dfir_lang/src/graph/ops/source_iter.rs Migrates pull-path iter source creation to dfir_pipes.
dfir_lang/src/graph/ops/source_stdin.rs Migrates pull-path stdin source to dfir_pipes.
dfir_lang/src/graph/ops/source_stream.rs Migrates pull-path stream source to dfir_pipes.
dfir_lang/src/graph/ops/source_stream_serde.rs Migrates pull-path serde stream source to dfir_pipes.
dfir_lang/src/graph/ops/state_by.rs Migrates pull-path filter usage to dfir_pipes.
dfir_lang/src/graph/ops/union.rs Migrates pull-path union chaining to dfir_pipes.
dfir_lang/src/graph/ops/unique.rs Migrates pull-path filter usage to dfir_pipes.
dfir_lang/src/graph/ops/zip.rs Migrates pull-path iter source creation to dfir_pipes.
dfir_lang/src/graph/ops/zip_longest.rs Migrates pull-path fuse usage to dfir_pipes.
dfir_lang/src/graph/ops/_counter.rs Migrates pull-path inspect usage to dfir_pipes.
dfir_lang/src/graph/ops/_lattice_fold_batch.rs Migrates pull-path iter construction to dfir_pipes.
dfir_lang/src/graph/ops/_lattice_join_fused_join.rs Migrates pull-path map usage to dfir_pipes.
dfir_lang/src/graph/meta_graph.rs Migrates pull-to-sink plumbing to dfir_pipes and disables type guards.
benches/benches/futures.rs Minor bench updates (for_each(drop) style).

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@MingweiSamuel MingweiSamuel changed the title dfir_pipes testing [ci-bench] feat(dfir_lang): use custom dfir_pipes::Pull trait [ci-bench] Mar 4, 2026
@MingweiSamuel MingweiSamuel force-pushed the dfir-pipes branch 2 times, most recently from dfee151 to b192213 Compare March 5, 2026 19:35
@MingweiSamuel MingweiSamuel changed the title feat(dfir_lang): use custom dfir_pipes::Pull trait [ci-bench] feat(dfir_lang)!: use custom dfir_pipes::Pull trait [ci-bench] Mar 5, 2026
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 125 out of 126 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 127 out of 128 changed files in this pull request and generated 8 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 137 out of 138 changed files in this pull request and generated 1 comment.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 137 out of 138 changed files in this pull request and generated 3 comments.

Comments suppressed due to low confidence (1)

hydro_build_utils/src/lib.rs:46

  • hydro_build_utils now gates the insta re-export and snapshot macros behind the insta feature. Any crate that calls hydro_build_utils::assert_snapshot! / assert_debug_snapshot! must enable hydro_build_utils's insta feature in its (dev-)dependencies, otherwise those macros will be missing and builds/tests will fail. Please verify all workspace crates that use these macros have been updated accordingly (e.g., hydro_lang currently uses them).

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 137 out of 138 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 138 out of 139 changed files in this pull request and generated 6 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 138 out of 139 changed files in this pull request and generated 3 comments.

Comments suppressed due to low confidence (1)

hydro_build_utils/src/lib.rs:46

  • assert_snapshot!/assert_debug_snapshot!/nightly_wrapper! are now only exported when the insta feature is enabled. There are existing call sites in the repo (e.g. hydro_lang/src/compile/ir/mod.rs) that depend on these macros without enabling hydro_build_utils/insta, which will cause compilation failures. Either ensure all crates/tests that use these macros enable the insta feature, or keep the macros always available (e.g. no-op / compile_error guidance when the feature is off).

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@MingweiSamuel MingweiSamuel merged commit a662ff3 into main Mar 9, 2026
19 checks passed
@MingweiSamuel MingweiSamuel deleted the dfir-pipes branch March 9, 2026 21:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants