Skip to content

Commit 051fe97

Browse files
committed
[SPARK-54305][SQL][PYTHON] Add admission control support to Python DataSource streaming API
This change adds admission control capabilities to the Python DataSource streaming API, bringing it to feature parity with the Scala SupportsAdmissionControl interface. Changes include: - Modified PythonMicroBatchStream to implement SupportsAdmissionControl - Updated PythonStreamingSourceRunner to serialize ReadLimit to Python - Enhanced python_streaming_source_runner.py to deserialize and pass parameters - Extended DataSourceStreamReader.latestOffset() to accept start_offset and read_limit - Added reportLatestOffset() method for monitoring - Full backward compatibility maintained - Added comprehensive unit tests - Added example demonstrating admission control This enables Python streaming sources to: - Control microbatch sizes via maxRecordsPerBatch option - Implement rate limiting and backpressure - Match capabilities of built-in Scala sources (Kafka, Delta) JIRA: https://issues.apache.org/jira/browse/SPARK-54305
1 parent 551b922 commit 051fe97

File tree

8 files changed

+904
-69
lines changed

8 files changed

+904
-69
lines changed
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
"""
19+
Demonstrates admission control in Python streaming data sources.
20+
21+
This example implements a simple blockchain-like streaming source that generates
22+
sequential blocks and shows how to use admission control to limit batch sizes.
23+
24+
Usage: structured_blockchain_admission_control.py [<max-blocks-per-batch>]
25+
<max-blocks-per-batch> Maximum number of blocks to process per microbatch (default: 10)
26+
27+
Run the example:
28+
`$ bin/spark-submit examples/src/main/python/sql/streaming/structured_blockchain_admission_control.py 5`
29+
30+
The example will process blocks in controlled batches of 5, demonstrating admission control.
31+
"""
32+
import sys
33+
import time
34+
35+
from pyspark.sql import SparkSession
36+
from pyspark.sql.datasource import DataSource, DataSourceStreamReader, InputPartition
37+
from pyspark.sql.types import StructType, StructField, IntegerType, LongType, StringType
38+
39+
40+
class SimpleBlockchainReader(DataSourceStreamReader):
41+
"""A simple streaming source that generates sequential blockchain blocks."""
42+
43+
def __init__(self, max_block=1000):
44+
self.max_block = max_block
45+
self.current_block = 0
46+
47+
def initialOffset(self):
48+
"""Start from block 0."""
49+
return {"block": self.current_block}
50+
51+
def latestOffset(self, start_offset=None, read_limit=None):
52+
"""
53+
Return the latest offset, respecting admission control limits.
54+
55+
This demonstrates the key admission control pattern:
56+
- Without limit: process all available blocks
57+
- With maxRows limit: cap the end block to respect batch size
58+
"""
59+
# Determine where we are now
60+
if start_offset is None:
61+
start_block = self.current_block
62+
else:
63+
start_block = start_offset["block"]
64+
65+
# Simulate blockchain growth - advance by 20 blocks each time
66+
latest_available = min(start_block + 20, self.max_block)
67+
68+
# Apply admission control if configured
69+
if read_limit and read_limit.get("type") == "maxRows":
70+
max_blocks = read_limit["maxRows"]
71+
# Cap at the configured limit
72+
end_block = min(start_block + max_blocks, latest_available)
73+
print(f" [Admission Control] Start: {start_block}, Available: {latest_available}, "
74+
f"Capped: {end_block} (limit: {max_blocks})")
75+
else:
76+
# No limit - process all available
77+
end_block = latest_available
78+
print(f" [No Limit] Start: {start_block}, End: {end_block}")
79+
80+
return {"block": end_block}
81+
82+
def reportLatestOffset(self):
83+
"""Report the true latest block for monitoring."""
84+
# In a real implementation, this would query the actual blockchain
85+
return {"block": min(self.current_block + 20, self.max_block)}
86+
87+
def partitions(self, start, end):
88+
"""Create a single partition for the block range."""
89+
start_block = start["block"]
90+
end_block = end["block"]
91+
return [InputPartition(f"{start_block}:{end_block}".encode())]
92+
93+
def read(self, partition):
94+
"""Generate block data for the partition."""
95+
# Parse the block range
96+
range_str = partition.value.decode()
97+
start_block, end_block = map(int, range_str.split(":"))
98+
99+
# Generate block data
100+
for block_num in range(start_block, end_block):
101+
# Simulate block data: block number, timestamp, simple hash
102+
yield (
103+
block_num,
104+
int(time.time() * 1000),
105+
f"0x{'0' * 60}{block_num:04x}"
106+
)
107+
108+
109+
class SimpleBlockchainSource(DataSource):
110+
"""Data source for simple blockchain streaming."""
111+
112+
@classmethod
113+
def name(cls):
114+
return "simple_blockchain"
115+
116+
def schema(self):
117+
return "block_number INT, timestamp LONG, block_hash STRING"
118+
119+
def streamReader(self, schema):
120+
return SimpleBlockchainReader(max_block=1000)
121+
122+
123+
if __name__ == "__main__":
124+
max_blocks_per_batch = int(sys.argv[1]) if len(sys.argv) > 1 else 10
125+
126+
print(f"""
127+
=================================================================
128+
Blockchain Streaming with Admission Control
129+
=================================================================
130+
Configuration:
131+
- Max blocks per batch: {max_blocks_per_batch}
132+
- Total blocks to generate: 1000
133+
134+
Watch how admission control limits each microbatch to process
135+
only {max_blocks_per_batch} blocks at a time, even when more data is available.
136+
=================================================================
137+
""")
138+
139+
spark = SparkSession \
140+
.builder \
141+
.appName("StructuredBlockchainAdmissionControl") \
142+
.getOrCreate()
143+
144+
# Register the custom data source
145+
spark.dataSource.register(SimpleBlockchainSource)
146+
147+
# Create streaming DataFrame with admission control
148+
blocks = spark \
149+
.readStream \
150+
.format("simple_blockchain") \
151+
.option("maxRecordsPerBatch", str(max_blocks_per_batch)) \
152+
.load()
153+
154+
# Show block statistics per microbatch
155+
query = blocks \
156+
.writeStream \
157+
.outputMode("append") \
158+
.format("console") \
159+
.option("numRows", "20") \
160+
.option("truncate", "false") \
161+
.trigger(processingTime="3 seconds") \
162+
.start()
163+
164+
query.awaitTermination()
165+

python/docs/source/tutorial/sql/python_data_source.rst

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,168 @@ This is a dummy streaming data reader that generate 2 rows in every microbatch.
266266
for i in range(start, end):
267267
yield (i, str(i))
268268
269+
Implementing Admission Control for Streaming Sources
270+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
271+
272+
Admission control allows streaming sources to control batch sizes, ensuring predictable resource usage
273+
and preventing overload. This is essential for production systems that need to respect API rate limits,
274+
control memory usage, or maintain consistent processing times.
275+
276+
**Understanding the Problem**
277+
278+
Without admission control, a streaming source must process all available data in each microbatch,
279+
which can lead to:
280+
281+
- Unpredictable batch sizes
282+
- Memory exhaustion during data bursts
283+
- Inability to honor external API rate limits
284+
- Difficult failure recovery
285+
286+
**The Solution: Parameters in latestOffset()**
287+
288+
The :meth:`DataSourceStreamReader.latestOffset` method now accepts optional parameters:
289+
290+
- ``start_offset`` (dict): The current stream position
291+
- ``read_limit`` (dict): Configured batch size limits
292+
293+
These parameters enable sources to return capped offsets that respect configured limits.
294+
295+
**Example: Blockchain Source with Admission Control**
296+
297+
Here's a complete example showing how to implement admission control for a blockchain-like streaming source:
298+
299+
.. code-block:: python
300+
301+
from pyspark.sql.datasource import DataSource, DataSourceStreamReader, InputPartition
302+
303+
class BlockchainStreamReader(DataSourceStreamReader):
304+
def __init__(self, initial_block=0, max_block=10000):
305+
self.initial_block = initial_block
306+
self.max_block = max_block
307+
308+
def initialOffset(self) -> dict:
309+
return {"block": self.initial_block}
310+
311+
def latestOffset(self, start_offset=None, read_limit=None) -> dict:
312+
"""
313+
Return capped offset respecting admission control limits.
314+
"""
315+
# Determine current position
316+
if start_offset is None:
317+
start_block = self.initial_block
318+
else:
319+
start_block = start_offset["block"]
320+
321+
# Get latest available block from blockchain
322+
latest_available = self.get_chain_head() # e.g., returns 5000
323+
324+
# Apply admission control if configured
325+
if read_limit and read_limit.get("type") == "maxRows":
326+
max_blocks = read_limit["maxRows"]
327+
# Cap the end block to respect batch size limit
328+
end_block = min(start_block + max_blocks, latest_available)
329+
else:
330+
# No limit - process all available blocks
331+
end_block = latest_available
332+
333+
return {"block": end_block}
334+
335+
def reportLatestOffset(self) -> dict:
336+
"""
337+
Report true latest for monitoring (without limits applied).
338+
"""
339+
return {"block": self.get_chain_head()}
340+
341+
def partitions(self, start: dict, end: dict):
342+
start_block = start["block"]
343+
end_block = end["block"]
344+
# Create partitions for the block range
345+
return [InputPartition(f"{start_block}:{end_block}".encode())]
346+
347+
def read(self, partition):
348+
# Fetch and yield block data
349+
range_str = partition.value.decode()
350+
start_block, end_block = map(int, range_str.split(":"))
351+
for block_num in range(start_block, end_block):
352+
yield self.fetch_block(block_num) # Returns block tuple
353+
354+
**Configuring Admission Control**
355+
356+
Use the ``maxRecordsPerBatch`` option when reading from the stream:
357+
358+
.. code-block:: python
359+
360+
# Process maximum 50 blocks per microbatch
361+
df = spark.readStream \\
362+
.format("blockchain") \\
363+
.option("maxRecordsPerBatch", "50") \\
364+
.load()
365+
366+
query = df.writeStream \\
367+
.format("console") \\
368+
.start()
369+
370+
**Read Limit Types**
371+
372+
The ``read_limit`` parameter supports several limit types:
373+
374+
.. code-block:: python
375+
376+
# Maximum rows/records
377+
{"type": "maxRows", "maxRows": 1000}
378+
379+
# Maximum files (for file-based sources)
380+
{"type": "maxFiles", "maxFiles": 10}
381+
382+
# Maximum data size in bytes
383+
{"type": "maxBytes", "maxBytes": 10485760}
384+
385+
# Minimum rows with timeout (for low-throughput sources)
386+
{"type": "minRows", "minRows": 100, "maxTriggerDelayMs": 30000}
387+
388+
# Process all available data
389+
{"type": "allAvailable"}
390+
391+
# Composite limits (multiple constraints)
392+
{"type": "composite", "limits": [
393+
{"type": "minRows", "minRows": 100, "maxTriggerDelayMs": 30000},
394+
{"type": "maxRows", "maxRows": 1000}
395+
]}
396+
397+
**Best Practices**
398+
399+
1. **Always handle None parameters**: For the first batch, ``start_offset`` will be None
400+
2. **Use min() for capping**: Ensure you don't exceed available data
401+
3. **Implement reportLatestOffset()**: Helps with monitoring when rate limiting is active
402+
4. **Validate limit types**: Check that the limit type is one your source supports
403+
5. **Test both modes**: Verify your source works with and without limits
404+
405+
**Backward Compatibility**
406+
407+
For backward compatibility, the parameters are optional. Existing sources that don't implement
408+
admission control can use the old signature and will continue to work. The framework automatically
409+
detects which signature is implemented:
410+
411+
.. code-block:: python
412+
413+
def latestOffset(self):
414+
# Old signature still works - framework detects this automatically
415+
return {"offset": self.get_latest()}
416+
417+
New sources can opt-in to admission control by accepting the parameters:
418+
419+
.. code-block:: python
420+
421+
def latestOffset(self, start_offset=None, read_limit=None):
422+
# New signature with admission control
423+
if read_limit and read_limit.get("type") == "maxRows":
424+
# Apply limit
425+
...
426+
return {"offset": ...}
427+
428+
Note: ``SimpleDataSourceStreamReader`` does not support admission control. If you need
429+
admission control, use ``DataSourceStreamReader`` instead.
430+
269431
Alternative: Implement a Simple Streaming Reader
270432
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
271433

0 commit comments

Comments
 (0)