Skip to content

Conversation

@featzhang
Copy link
Member

[FLINK-38825] Introduce AsyncBatchFunction and AsyncBatchWaitOperator (initial API & size-based batching)

Motivation

Many AI / ML inference and external service workloads benefit significantly from batching rather than per-record async invocation. Typical examples include:

  • GPU-based model inference where batching improves throughput and utilization
  • External services that expose batch APIs
  • Databases or RPC systems with high per-request overhead

While Flink currently provides AsyncFunction and AsyncWaitOperator for record-level async I/O, there is no native abstraction for batch-oriented async processing in the DataStream API.

This PR introduces a minimal and extensible foundation for async batch processing in Flink.


What is included in this PR

This PR intentionally focuses on a small, additive, and reviewer-friendly first step:

  1. New public API

    • Introduces AsyncBatchFunction<IN, OUT> (annotated with @PublicEvolving)
    • Allows users to implement async I/O over a batch of inputs
  2. New runtime operator

    • Adds AsyncBatchWaitOperator with unordered semantics
    • Supports size-based batching only
    • Flushes remaining records on end-of-input
    • Keeps implementation intentionally simple
  3. Stream API entry point

    • Adds AsyncDataStream.unorderedWaitBatch(...)
    • Mirrors existing async API style without modifying existing methods
  4. Unit tests

    • Verifies batch-size triggering behavior
    • Verifies correct result emission
    • Verifies exception propagation and failure behavior

What is intentionally NOT included

To keep this PR minimal and avoid premature design constraints, the following are explicitly out of scope and will be addressed in follow-up PRs:

  • Ordered batching semantics
  • Time-based or event-time-based batching
  • Retry / timeout handling at the async invocation level
  • Metrics and backpressure tuning
  • SQL / Table API / Python API integration
  • Multiple in-flight batch concurrency control

Design notes

  • The new API is additive and backward-compatible; no existing behavior is modified.
  • The operator is designed to be easy to evolve, with clear extension points documented via TODOs.
  • Implementation favors explicit logic over reuse or abstraction to simplify review and future changes.

Follow-up work

Planned follow-up PRs (tracked under FLINK-38825) include:

  • Time-based batch triggering
  • Ordered batch mode
  • Event-time batching
  • Retry / timeout strategies
  • Metrics and observability
  • Higher-level API and ecosystem integrations

Checklist

  • API annotated as @PublicEvolving
  • No changes to existing async APIs
  • Fully covered by unit tests
  • Backward-compatible and additive only

This PR establishes the minimal building block for async batch processing in Flink and enables iterative enhancement without locking in premature design decisions.

@flinkbot
Copy link
Collaborator

flinkbot commented Dec 21, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants