diff --git a/connector/kafka-0-10-sql/pom.xml b/connector/kafka-0-10-sql/pom.xml
index fd0ca36d36df..383e63cce788 100644
--- a/connector/kafka-0-10-sql/pom.xml
+++ b/connector/kafka-0-10-sql/pom.xml
@@ -186,6 +186,26 @@
target/scala-${scala.binary.version}/classes
target/scala-${scala.binary.version}/test-classes
+
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+ 3.6.1
+
+
+ write-classpath
+
+ build-classpath
+
+ generate-resources
+
+ runtime
+ ${project.build.directory}/classpath.txt
+
+
+
+
+
diff --git a/dev/requirements.txt b/dev/requirements.txt
index cde0957715bf..d7a836ad898c 100644
--- a/dev/requirements.txt
+++ b/dev/requirements.txt
@@ -21,6 +21,11 @@ openpyxl
# PySpark test dependencies (optional)
coverage
+# Kafka streaming test dependencies (optional)
+# Required for running Kafka integration tests with Docker test containers
+testcontainers[kafka]>=3.7.0
+kafka-python>=2.0.2
+
# Linter
mypy==1.8.0
pytest-mypy-plugins==1.9.3
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index ee2a953a7402..88c1dee3a833 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -659,6 +659,7 @@ def __hash__(self):
"pyspark.sql.tests.streaming.test_streaming",
"pyspark.sql.tests.streaming.test_streaming_foreach",
"pyspark.sql.tests.streaming.test_streaming_foreach_batch",
+ "pyspark.sql.tests.streaming.test_streaming_kafka_rtm",
"pyspark.sql.tests.streaming.test_streaming_listener",
"pyspark.sql.tests.pandas.test_pandas_grouped_map_with_state",
"pyspark.sql.tests.pandas.streaming.test_pandas_transform_with_state",
diff --git a/python/pyspark/sql/connect/streaming/readwriter.py b/python/pyspark/sql/connect/streaming/readwriter.py
index 21c513f88c0f..b7ca7dd1cbc0 100644
--- a/python/pyspark/sql/connect/streaming/readwriter.py
+++ b/python/pyspark/sql/connect/streaming/readwriter.py
@@ -493,6 +493,10 @@ def trigger(self, *, continuous: str) -> "DataStreamWriter":
def trigger(self, *, availableNow: bool) -> "DataStreamWriter":
...
+ @overload
+ def trigger(self, *, realTime: str) -> "DataStreamWriter":
+ ...
+
def trigger(
self,
*,
@@ -500,15 +504,16 @@ def trigger(
once: Optional[bool] = None,
continuous: Optional[str] = None,
availableNow: Optional[bool] = None,
+ realTime: Optional[str] = None,
) -> "DataStreamWriter":
- params = [processingTime, once, continuous, availableNow]
+ params = [processingTime, once, continuous, availableNow, realTime]
- if params.count(None) == 4:
+ if params.count(None) == 5:
raise PySparkValueError(
errorClass="ONLY_ALLOW_SINGLE_TRIGGER",
messageParameters={},
)
- elif params.count(None) < 3:
+ elif params.count(None) < 4:
raise PySparkValueError(
errorClass="ONLY_ALLOW_SINGLE_TRIGGER",
messageParameters={},
@@ -541,6 +546,14 @@ def trigger(
)
self._write_proto.continuous_checkpoint_interval = continuous.strip()
+ elif realTime is not None:
+ if type(realTime) != str or len(realTime.strip()) == 0:
+ raise PySparkValueError(
+ errorClass="VALUE_NOT_NON_EMPTY_STR",
+ messageParameters={"arg_name": "realTime", "arg_value": str(realTime)},
+ )
+ self._write_proto.real_time_batch_duration = realTime.strip()
+
else:
if availableNow is not True:
raise PySparkValueError(
diff --git a/python/pyspark/sql/streaming/readwriter.py b/python/pyspark/sql/streaming/readwriter.py
index 8121dd609950..27223f531425 100644
--- a/python/pyspark/sql/streaming/readwriter.py
+++ b/python/pyspark/sql/streaming/readwriter.py
@@ -1243,6 +1243,7 @@ def trigger(
once: Optional[bool] = None,
continuous: Optional[str] = None,
availableNow: Optional[bool] = None,
+ realTime: Optional[str] = None
) -> "DataStreamWriter":
"""Set the trigger for the stream query. If this is not set it will run the query as fast
as possible, which is equivalent to setting the trigger to ``processingTime='0 seconds'``.
@@ -1268,6 +1269,10 @@ def trigger(
availableNow : bool, optional
if set to True, set a trigger that processes all available data in multiple
batches then terminates the query. Only one trigger can be set.
+ realTime : str, optional
+ a batch duration as a string, e.g. '5 seconds', '1 minute'.
+ Set a trigger that runs a real time mode query with
+ batch at the specified duration. Only one trigger can be set.
Notes
-----
@@ -1291,15 +1296,20 @@ def trigger(
>>> df.writeStream.trigger(availableNow=True)
<...streaming.readwriter.DataStreamWriter object ...>
+
+ Trigger the query for real time mode execution every 5 seconds.
+
+ >>> df.writeStream.trigger(realTime='5 seconds')
+ <...streaming.readwriter.DataStreamWriter object ...>
"""
- params = [processingTime, once, continuous, availableNow]
+ params = [processingTime, once, continuous, availableNow, realTime]
- if params.count(None) == 4:
+ if params.count(None) == 5:
raise PySparkValueError(
errorClass="ONLY_ALLOW_SINGLE_TRIGGER",
messageParameters={},
)
- elif params.count(None) < 3:
+ elif params.count(None) < 4:
raise PySparkValueError(
errorClass="ONLY_ALLOW_SINGLE_TRIGGER",
messageParameters={},
@@ -1342,6 +1352,16 @@ def trigger(
jTrigger = getattr(
self._spark._sc._jvm, "org.apache.spark.sql.streaming.Trigger"
).Continuous(interval)
+ elif realTime is not None:
+ if type(realTime) != str or len(realTime.strip()) == 0:
+ raise PySparkValueError(
+ errorClass="VALUE_NOT_NON_EMPTY_STR",
+ messageParameters={"arg_name": "realTime", "arg_value": str(realTime)},
+ )
+ batch_duration = realTime.strip()
+ jTrigger = getattr(
+ self._spark._sc._jvm, "org.apache.spark.sql.streaming.Trigger"
+ ).RealTime(batch_duration)
else:
if availableNow is not True:
raise PySparkValueError(
diff --git a/python/pyspark/sql/tests/streaming/KAFKA_TESTING.md b/python/pyspark/sql/tests/streaming/KAFKA_TESTING.md
new file mode 100644
index 000000000000..e5df122726fe
--- /dev/null
+++ b/python/pyspark/sql/tests/streaming/KAFKA_TESTING.md
@@ -0,0 +1,426 @@
+# Kafka Testing with PySpark
+
+This document explains how to write PySpark unit tests that interact with a Kafka cluster using Docker containers.
+
+## Overview
+
+The `kafka_utils.py` module provides `KafkaUtils` class that launches a single-broker Kafka cluster via Docker using the `testcontainers-python` library. This enables end-to-end integration testing of PySpark Kafka streaming applications without requiring manual Kafka setup.
+
+## Prerequisites
+
+### 1. Docker
+
+Docker must be installed and running on your system. The Kafka test container requires Docker to launch the Kafka broker.
+
+- **Install Docker**: https://docs.docker.com/get-docker/
+- **Verify Docker is running**: `docker ps`
+
+### 2. Python Dependencies
+
+Install the required Python packages:
+
+```bash
+pip install testcontainers[kafka] kafka-python
+```
+
+Or install all dev dependencies:
+
+```bash
+cd $SPARK_HOME
+pip install -r dev/requirements.txt
+```
+
+## Quick Start
+
+Here's a minimal example of writing a Kafka test:
+
+```python
+import unittest
+from pyspark.sql.tests.streaming.kafka_utils import KafkaUtils
+from pyspark.testing.sqlutils import ReusedSQLTestCase
+
+class MyKafkaTest(ReusedSQLTestCase):
+ @classmethod
+ def setUpClass(cls):
+ super().setUpClass()
+ cls.kafka_utils = KafkaUtils()
+ cls.kafka_utils.setup()
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.kafka_utils.teardown()
+ super().tearDownClass()
+
+ def test_kafka_read_write(self):
+ # Create a topic
+ topic = "test-topic"
+ self.kafka_utils.create_topics([topic])
+
+ # Send test data
+ messages = [("key1", "value1"), ("key2", "value2")]
+ self.kafka_utils.send_messages(topic, messages)
+
+ # Read with Spark
+ df = (
+ self.spark.read
+ .format("kafka")
+ .option("kafka.bootstrap.servers", self.kafka_utils.broker)
+ .option("subscribe", topic)
+ .option("startingOffsets", "earliest")
+ .load()
+ )
+
+ # Verify data
+ results = df.selectExpr(
+ "CAST(key AS STRING) as key",
+ "CAST(value AS STRING) as value"
+ ).collect()
+
+ self.assertEqual(len(results), 2)
+```
+
+## KafkaUtils API Reference
+
+### Initialization and Lifecycle
+
+#### `__init__(kafka_version="7.4.0")`
+
+Create a new KafkaUtils instance.
+
+- **kafka_version**: Version of Confluent Kafka to use (default: "7.4.0" for stability)
+
+#### `setup()`
+
+Start the Kafka container and initialize clients. This must be called before using any other methods.
+
+**Raises:**
+- `ImportError`: If required dependencies are not installed
+- `RuntimeError`: If Kafka container fails to start
+
+**Note:** Container startup can take 10-30 seconds on first run while Docker pulls the image.
+
+#### `teardown()`
+
+Stop the Kafka container and clean up resources. Safe to call multiple times.
+
+### Topic Management
+
+#### `create_topics(topic_names, num_partitions=1, replication_factor=1)`
+
+Create one or more Kafka topics.
+
+**Parameters:**
+- **topic_names** (List[str]): List of topic names to create
+- **num_partitions** (int): Number of partitions per topic (default: 1)
+- **replication_factor** (int): Replication factor (default: 1, max: 1 for single broker)
+
+**Example:**
+```python
+# Create single partition topics
+kafka_utils.create_topics(["topic1", "topic2"])
+
+# Create multi-partition topic
+kafka_utils.create_topics(["multi-partition-topic"], num_partitions=3)
+```
+
+#### `delete_topics(topic_names)`
+
+Delete one or more Kafka topics.
+
+**Parameters:**
+- **topic_names** (List[str]): List of topic names to delete
+
+### Producing Data
+
+#### `send_messages(topic, messages)`
+
+Send messages to a Kafka topic.
+
+**Parameters:**
+- **topic** (str): Topic name
+- **messages** (List[tuple]): List of (key, value) tuples
+
+**Example:**
+```python
+kafka_utils.send_messages("test-topic", [
+ ("user1", "login"),
+ ("user2", "logout"),
+ ("user1", "purchase"),
+])
+```
+
+### Reading Data
+
+#### `get_all_records(spark, topic, key_deserializer="STRING", value_deserializer="STRING")`
+
+Read all records from a Kafka topic using Spark batch read.
+
+**Parameters:**
+- **spark**: SparkSession instance
+- **topic** (str): Topic name
+- **key_deserializer** (str): How to deserialize keys (default: "STRING")
+- **value_deserializer** (str): How to deserialize values (default: "STRING")
+
+**Returns:** Dictionary mapping keys to values
+
+**Example:**
+```python
+records = kafka_utils.get_all_records(self.spark, "test-topic")
+assert records == {"key1": "value1", "key2": "value2"}
+```
+
+### Testing Utilities
+
+#### `assert_eventually(result_func, expected, timeout=60, interval=1.0)`
+
+Assert that a condition becomes true within a timeout. Useful for testing streaming queries with eventually consistent results.
+
+**Parameters:**
+- **result_func** (Callable): Function that returns the current result
+- **expected**: Expected result value
+- **timeout** (int): Maximum time to wait in seconds (default: 60)
+- **interval** (float): Time between checks in seconds (default: 1.0)
+
+**Raises:** `AssertionError` if condition doesn't become true within timeout
+
+**Example:**
+```python
+kafka_utils.assert_eventually(
+ lambda: kafka_utils.get_all_records(self.spark, "sink-topic"),
+ {"key1": "processed-value1"},
+ timeout=30
+)
+```
+
+#### `wait_for_query_alive(query, timeout=60, interval=1.0)`
+
+Wait for a streaming query to become active and ready to process data.
+
+**Parameters:**
+- **query**: StreamingQuery instance
+- **timeout** (int): Maximum time to wait in seconds (default: 60)
+- **interval** (float): Time between checks in seconds (default: 1.0)
+
+**Raises:** `AssertionError` if query doesn't become active within timeout
+
+**Example:**
+```python
+query = df.writeStream.format("memory").start()
+kafka_utils.wait_for_query_alive(query, timeout=30)
+```
+
+### Properties
+
+#### `broker`
+
+Get the Kafka bootstrap server address (e.g., "localhost:9093").
+
+#### `producer`
+
+Get the underlying KafkaProducer instance for advanced usage.
+
+#### `admin_client`
+
+Get the underlying KafkaAdminClient instance for advanced usage.
+
+## Common Testing Patterns
+
+### Pattern 1: Batch Read/Write
+
+Test basic Kafka read and write with Spark batch processing:
+
+```python
+def test_kafka_batch(self):
+ topic = "test-topic"
+ self.kafka_utils.create_topics([topic])
+
+ # Write with Spark DataFrame
+ df = self.spark.createDataFrame([("key1", "value1")], ["key", "value"])
+ (
+ df.selectExpr("CAST(key AS BINARY)", "CAST(value AS BINARY)")
+ .write
+ .format("kafka")
+ .option("kafka.bootstrap.servers", self.kafka_utils.broker)
+ .option("topic", topic)
+ .save()
+ )
+
+ # Read back
+ records = self.kafka_utils.get_all_records(self.spark, topic)
+ assert records == {"key1": "value1"}
+```
+
+### Pattern 2: Streaming Queries
+
+Test streaming queries with checkpoint management:
+
+```python
+def test_kafka_streaming(self):
+ import tempfile
+ import os
+
+ # Setup topics
+ source_topic = "source"
+ sink_topic = "sink"
+ self.kafka_utils.create_topics([source_topic, sink_topic])
+
+ # Produce initial data
+ self.kafka_utils.send_messages(source_topic, [("k1", "v1")])
+
+ # Start streaming query
+ df = (
+ self.spark.readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", self.kafka_utils.broker)
+ .option("subscribe", source_topic)
+ .option("startingOffsets", "earliest")
+ .load()
+ )
+
+ checkpoint_dir = os.path.join(tempfile.mkdtemp(), "checkpoint")
+ query = (
+ df.writeStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", self.kafka_utils.broker)
+ .option("topic", sink_topic)
+ .option("checkpointLocation", checkpoint_dir)
+ .start()
+ )
+
+ try:
+ self.kafka_utils.wait_for_query_alive(query)
+ self.kafka_utils.assert_eventually(
+ lambda: self.kafka_utils.get_all_records(self.spark, sink_topic),
+ {"k1": "v1"}
+ )
+ finally:
+ query.stop()
+```
+
+### Pattern 3: Stateful Aggregations
+
+Test streaming aggregations:
+
+```python
+def test_kafka_aggregation(self):
+ # Send data for aggregation
+ self.kafka_utils.send_messages("source", [
+ ("user1", "1"),
+ ("user2", "1"),
+ ("user1", "1"),
+ ])
+
+ # Aggregate by key
+ df = (
+ self.spark.readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", self.kafka_utils.broker)
+ .option("subscribe", "source")
+ .load()
+ .groupBy(col("key"))
+ .count()
+ .selectExpr("CAST(key AS BINARY)", "CAST(count AS STRING) AS value")
+ )
+
+ query = df.writeStream.format("kafka") # ... start query
+
+ # Verify aggregated results
+ self.kafka_utils.assert_eventually(
+ lambda: self.kafka_utils.get_all_records(self.spark, "sink"),
+ {"user1": "2", "user2": "1"}
+ )
+```
+
+### Pattern 4: Multiple Topics
+
+Test writing to multiple topics based on data:
+
+```python
+def test_multiple_topics(self):
+ topic1, topic2 = "topic1", "topic2"
+ self.kafka_utils.create_topics([topic1, topic2])
+
+ # Write with topic column
+ df = self.spark.createDataFrame([
+ (topic1, "key1", "value1"),
+ (topic2, "key2", "value2"),
+ ], ["topic", "key", "value"])
+
+ (
+ df.selectExpr("topic", "CAST(key AS BINARY)", "CAST(value AS BINARY)")
+ .write
+ .format("kafka")
+ .option("kafka.bootstrap.servers", self.kafka_utils.broker)
+ .save()
+ )
+
+ # Verify data in each topic
+ assert self.kafka_utils.get_all_records(self.spark, topic1) == {"key1": "value1"}
+ assert self.kafka_utils.get_all_records(self.spark, topic2) == {"key2": "value2"}
+```
+
+## Running Tests
+
+### Run All Kafka Tests
+
+```bash
+cd $SPARK_HOME/python
+python -m pytest pyspark/sql/tests/streaming/test_streaming_kafka_rtm.py -v
+```
+
+### Run Specific Test
+
+```bash
+python -m pytest pyspark/sql/tests/streaming/test_streaming_kafka_rtm.py::StreamingKafkaTests::test_kafka_batch_read -v
+```
+
+### Run with unittest
+
+```bash
+cd $SPARK_HOME/python
+python -m unittest pyspark.sql.tests.streaming.test_streaming_kafka
+```
+
+## Troubleshooting
+
+### Docker Not Running
+
+**Error:**
+```
+Cannot connect to the Docker daemon at unix:///var/run/docker.sock
+```
+
+**Solution:** Start Docker Desktop or Docker daemon
+
+### Container Startup Timeout
+
+**Error:**
+```
+Kafka container failed to start within timeout
+```
+
+**Solutions:**
+1. Increase timeout in test code
+2. Check Docker resource allocation (CPU/memory)
+3. Check Docker logs: `docker logs `
+
+### Port Conflicts
+
+**Error:**
+```
+Port 9093 already in use
+```
+
+**Solution:** testcontainers automatically allocates random ports. Ensure no manual port binding in tests.
+
+### Missing Dependencies
+
+**Error:**
+```
+ImportError: testcontainers is required for Kafka tests
+```
+
+**Solution:**
+```bash
+pip install testcontainers[kafka] kafka-python
+```
diff --git a/python/pyspark/sql/tests/streaming/kafka_utils.py b/python/pyspark/sql/tests/streaming/kafka_utils.py
new file mode 100644
index 000000000000..7dc63f0d16d4
--- /dev/null
+++ b/python/pyspark/sql/tests/streaming/kafka_utils.py
@@ -0,0 +1,367 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Utilities for running PySpark tests against a Kafka cluster using Docker containers.
+
+This module provides KafkaUtils class that launches a single-broker Kafka cluster
+via Docker using testcontainers-python library. It's designed to be used with
+Python unittest-based PySpark tests.
+
+Example usage:
+ class MyKafkaTest(ReusedSQLTestCase):
+ @classmethod
+ def setUpClass(cls):
+ super().setUpClass()
+ cls.kafka_utils = KafkaUtils()
+ cls.kafka_utils.setup()
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.kafka_utils.teardown()
+ super().tearDownClass()
+
+ def test_kafka_streaming(self):
+ topic = "test-topic"
+ self.kafka_utils.create_topics([topic])
+ # ... use self.kafka_utils.broker for bootstrap.servers
+"""
+
+import time
+from typing import List, Dict, Optional, Callable, Any
+
+
+class KafkaUtils:
+ """
+ Utility class for managing a Kafka test cluster using Docker containers.
+
+ This class provides methods to:
+ - Start/stop a single-broker Kafka cluster in a Docker container
+ - Create and delete topics
+ - Send messages to topics
+ - Query topic data using Spark
+ - Helper methods for testing (assert_eventually, wait_for_query_alive)
+
+ Attributes:
+ broker (str): The bootstrap server address (e.g., "localhost:9093")
+ initialized (bool): Whether the Kafka cluster has been started
+ """
+
+ def __init__(self, kafka_version: str = "7.4.0"):
+ """
+ Initialize KafkaUtils.
+
+ Args:
+ kafka_version: Version of Confluent Kafka to use (default: 7.4.0 for stability)
+ """
+ self.kafka_version = kafka_version
+ self.initialized = False
+ self._kafka_container = None
+ self._admin_client = None
+ self._producer = None
+ self.broker = None
+
+ def setup(self) -> None:
+ """
+ Start the Kafka container and initialize admin client and producer.
+
+ This method:
+ 1. Starts a Kafka container using testcontainers
+ 2. Creates an admin client for topic management
+ 3. Creates a producer for sending test messages
+
+ Raises:
+ ImportError: If required dependencies (testcontainers, kafka-python) are not installed
+ RuntimeError: If Kafka container fails to start
+ """
+ if self.initialized:
+ return
+
+ try:
+ from testcontainers.kafka import KafkaContainer
+ except ImportError as e:
+ raise ImportError(
+ "testcontainers is required for Kafka tests. "
+ "Install it with: pip install testcontainers[kafka]"
+ ) from e
+
+ try:
+ from kafka import KafkaProducer
+ from kafka.admin import KafkaAdminClient
+ except ImportError as e:
+ raise ImportError(
+ "kafka-python is required for Kafka tests. "
+ "Install it with: pip install kafka-python"
+ ) from e
+
+ # Start Kafka container with specific version for test stability
+ self._kafka_container = KafkaContainer(f"confluentinc/cp-kafka:{self.kafka_version}")
+ self._kafka_container.start()
+
+ # Get bootstrap server address
+ self.broker = self._kafka_container.get_bootstrap_server()
+
+ # Initialize admin client for topic management
+ self._admin_client = KafkaAdminClient(
+ bootstrap_servers=self.broker,
+ request_timeout_ms=10000,
+ api_version_auto_timeout_ms=10000
+ )
+
+ # Initialize producer for sending test messages
+ self._producer = KafkaProducer(
+ bootstrap_servers=self.broker,
+ key_serializer=lambda k: str(k).encode("utf-8") if k is not None else None,
+ value_serializer=lambda v: str(v).encode("utf-8") if v is not None else None,
+ request_timeout_ms=10000,
+ max_block_ms=10000
+ )
+
+ self.initialized = True
+
+ def teardown(self) -> None:
+ """
+ Stop the Kafka container and clean up resources.
+
+ This method closes the admin client, producer, and stops the Kafka container.
+ It's safe to call multiple times.
+ """
+ if not self.initialized:
+ return
+
+ # Close admin client
+ if self._admin_client is not None:
+ try:
+ self._admin_client.close()
+ except Exception:
+ pass
+ self._admin_client = None
+
+ # Close producer
+ if self._producer is not None:
+ try:
+ self._producer.close(timeout=5)
+ except Exception:
+ pass
+ self._producer = None
+
+ # Stop Kafka container
+ if self._kafka_container is not None:
+ try:
+ self._kafka_container.stop()
+ except Exception:
+ pass
+ self._kafka_container = None
+
+ self.broker = None
+ self.initialized = False
+
+ def _assert_initialized(self) -> None:
+ """Check if KafkaUtils has been initialized, raise error if not."""
+ if not self.initialized:
+ raise RuntimeError(
+ "KafkaUtils has not been initialized. Call setup() first."
+ )
+
+ def create_topics(self, topic_names: List[str], num_partitions: int = 1,
+ replication_factor: int = 1) -> None:
+ """
+ Create Kafka topics.
+
+ Args:
+ topic_names: List of topic names to create
+ num_partitions: Number of partitions per topic (default: 1)
+ replication_factor: Replication factor (default: 1, max: 1 for single broker)
+
+ Note:
+ If a topic already exists, it will be silently ignored.
+ """
+ self._assert_initialized()
+
+ from kafka.admin import NewTopic
+ from kafka.errors import TopicAlreadyExistsError
+
+ topics = [
+ NewTopic(
+ name=name,
+ num_partitions=num_partitions,
+ replication_factor=replication_factor
+ )
+ for name in topic_names
+ ]
+
+ try:
+ self._admin_client.create_topics(new_topics=topics, validate_only=False)
+ except TopicAlreadyExistsError:
+ # Topic already exists, ignore
+ pass
+
+ def delete_topics(self, topic_names: List[str]) -> None:
+ """
+ Delete Kafka topics.
+
+ Args:
+ topic_names: List of topic names to delete
+
+ Note:
+ If a topic doesn't exist, it will be silently ignored.
+ """
+ self._assert_initialized()
+
+ from kafka.errors import UnknownTopicOrPartitionError
+
+ try:
+ self._admin_client.delete_topics(topics=topic_names)
+ except UnknownTopicOrPartitionError:
+ # Topic doesn't exist, ignore
+ pass
+
+ def send_messages(self, topic: str, messages: List[tuple]) -> None:
+ """
+ Send messages to a Kafka topic.
+
+ Args:
+ topic: Topic name to send messages to
+ messages: List of (key, value) tuples to send
+
+ Example:
+ kafka_utils.send_messages("test-topic", [
+ ("key1", "value1"),
+ ("key2", "value2"),
+ ])
+ """
+ self._assert_initialized()
+
+ for key, value in messages:
+ future = self._producer.send(topic, key=key, value=value)
+ future.get(timeout=10) # Wait for send to complete
+
+ self._producer.flush()
+
+ def get_all_records(self, spark, topic: str,
+ key_deserializer: str = "STRING",
+ value_deserializer: str = "STRING") -> Dict[str, str]:
+ """
+ Read all records from a Kafka topic using Spark.
+
+ Args:
+ spark: SparkSession instance
+ topic: Topic name to read from
+ key_deserializer: How to deserialize keys (default: "STRING")
+ value_deserializer: How to deserialize values (default: "STRING")
+
+ Returns:
+ Dictionary mapping keys to values
+
+ Example:
+ records = kafka_utils.get_all_records(self.spark, "test-topic")
+ assert records == {"key1": "value1", "key2": "value2"}
+ """
+ self._assert_initialized()
+
+ df = (
+ spark.read
+ .format("kafka")
+ .option("kafka.bootstrap.servers", self.broker)
+ .option("subscribe", topic)
+ .option("startingOffsets", "earliest")
+ .option("endingOffsets", "latest")
+ .load()
+ )
+
+ df = df.selectExpr(
+ f"CAST(key AS {key_deserializer}) AS key_str",
+ f"CAST(value AS {value_deserializer}) AS value_str"
+ )
+
+ rows = df.collect()
+ return {row.key_str: row.value_str for row in rows}
+
+ def assert_eventually(self, result_func: Callable[[], Any], expected: Any,
+ timeout: int = 60, interval: float = 1.0) -> None:
+ """
+ Assert that a condition becomes true within a timeout.
+
+ This is useful for testing streaming queries where results are eventually consistent.
+
+ Args:
+ result_func: Function that returns the current result
+ expected: Expected result value
+ timeout: Maximum time to wait in seconds (default: 60)
+ interval: Time between checks in seconds (default: 1.0)
+
+ Raises:
+ AssertionError: If the condition doesn't become true within timeout
+
+ Example:
+ kafka_utils.assert_eventually(
+ lambda: kafka_utils.get_all_records(spark, topic),
+ {"key1": "value1"}
+ )
+ """
+ deadline = time.time() + timeout
+ last_result = None
+
+ while time.time() < deadline:
+ last_result = result_func()
+ if last_result == expected:
+ return # Success!
+ time.sleep(interval)
+
+ # Timeout reached, raise assertion error
+ raise AssertionError(
+ f"Condition not met within {timeout}s. "
+ f"Expected: {expected}, Got: {last_result}"
+ )
+
+ def wait_for_query_alive(self, query, timeout: int = 60, interval: float = 1.0) -> None:
+ """
+ Wait for a streaming query to become active and ready to process data.
+
+ Args:
+ query: StreamingQuery instance
+ timeout: Maximum time to wait in seconds (default: 60)
+ interval: Time between checks in seconds (default: 1.0)
+
+ Raises:
+ AssertionError: If the query doesn't become active within timeout
+
+ Example:
+ query = df.writeStream.format("memory").start()
+ kafka_utils.wait_for_query_alive(query)
+ """
+ deadline = time.time() + timeout
+
+ while time.time() < deadline:
+ status = query.status
+ if status["isDataAvailable"] or status["isTriggerActive"]:
+ return # Query is alive!
+ time.sleep(interval)
+
+ raise AssertionError(f"Query did not become active within {timeout}s.")
+
+ @property
+ def producer(self):
+ """Get the Kafka producer instance for advanced usage."""
+ self._assert_initialized()
+ return self._producer
+
+ @property
+ def admin_client(self):
+ """Get the Kafka admin client instance for advanced usage."""
+ self._assert_initialized()
+ return self._admin_client
diff --git a/python/pyspark/sql/tests/streaming/test_streaming_kafka_rtm.py b/python/pyspark/sql/tests/streaming/test_streaming_kafka_rtm.py
new file mode 100644
index 000000000000..af43a396debb
--- /dev/null
+++ b/python/pyspark/sql/tests/streaming/test_streaming_kafka_rtm.py
@@ -0,0 +1,175 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+PySpark tests for Kafka streaming integration using Docker test containers.
+
+These tests demonstrate how to use KafkaUtils to test Spark streaming with Kafka.
+Tests require Docker to be running and the following Python packages:
+- testcontainers[kafka]
+- kafka-python
+"""
+
+import os
+import tempfile
+import unittest
+import uuid
+
+from pyspark.testing.sqlutils import ReusedSQLTestCase, search_jar, read_classpath
+
+# Setup Kafka JAR on classpath before SparkSession is created
+# This follows the same pattern as streamingutils.py for Kinesis
+kafka_sql_jar = search_jar(
+ "connector/kafka-0-10-sql",
+ "spark-sql-kafka-0-10-",
+ "spark-sql-kafka-0-10_",
+)
+
+if kafka_sql_jar is None:
+ raise RuntimeError(
+ "Kafka SQL connector JAR was not found. "
+ "To run these tests, you need to build Spark with "
+ "'build/mvn -Pkafka-0-10 package' or 'build/sbt kafka-0-10-sql/package' "
+ "before running this test."
+ )
+
+
+# Read the full classpath including all dependencies
+kafka_classpath = read_classpath("connector/kafka-0-10-sql")
+
+# Combine the main JAR with its dependencies
+all_jars = f"{kafka_sql_jar},{kafka_classpath}"
+
+# Add Kafka JAR to PYSPARK_SUBMIT_ARGS before SparkSession is created
+existing_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "pyspark-shell")
+jars_args = "--jars %s" % all_jars
+
+os.environ["PYSPARK_SUBMIT_ARGS"] = " ".join([jars_args, existing_args])
+
+from pyspark.sql.functions import col
+from pyspark.sql.tests.streaming.kafka_utils import KafkaUtils
+
+
+# Check if required Python dependencies are available
+try:
+ import testcontainers # noqa: F401
+ import kafka # noqa: F401
+except ImportError as e:
+ raise ImportError(
+ "Kafka test dependencies not available. "
+ "Install with: pip install testcontainers[kafka] kafka-python"
+ ) from e
+
+
+class StreamingKafkaTestsMixin:
+ """
+ Base mixin for Kafka streaming tests that provides KafkaUtils setup/teardown
+ and topic management.
+ """
+
+ @classmethod
+ def setUpClass(cls):
+ super().setUpClass()
+ # Start Kafka container - this may take 10-30 seconds on first run
+ cls.kafka_utils = KafkaUtils()
+ cls.kafka_utils.setup()
+
+
+ @classmethod
+ def tearDownClass(cls):
+ # Stop Kafka container and clean up resources
+ if hasattr(cls, 'kafka_utils'):
+ cls.kafka_utils.teardown()
+ super().tearDownClass()
+
+ def setUp(self):
+ super().setUp()
+ # Create unique topics for each test to avoid interference
+ self.source_topic = f"source-{uuid.uuid4().hex}"
+ self.sink_topic = f"sink-{uuid.uuid4().hex}"
+ self.kafka_utils.create_topics([self.source_topic, self.sink_topic])
+
+ def tearDown(self):
+ # Clean up topics after each test
+ self.kafka_utils.delete_topics([self.source_topic, self.sink_topic])
+ super().tearDown()
+
+
+class StreamingKafkaTests(StreamingKafkaTestsMixin, ReusedSQLTestCase):
+ """
+ Tests for Kafka streaming integration with PySpark.
+ """
+
+ def test_streaming_stateless(self):
+ """
+ Test stateless rtm query with earliest offset.
+ """
+
+ # produce test data to source_topic
+ for i in range(10):
+ self.kafka_utils.producer.send(
+ self.source_topic,
+ key=i,
+ value=i,
+ ).get(timeout=10)
+ self.kafka_utils.producer.flush()
+
+ # Build streaming query for Kafka to Kafka.
+ kafka_source = (
+ self.spark.readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", self.kafka_utils.broker)
+ .option("subscribe", self.source_topic)
+ .option("startingOffsets", "earliest")
+ .load()
+ )
+
+ checkpoint_dir = os.path.join(tempfile.mkdtemp(), "checkpoint")
+
+ query = (
+ kafka_source.writeStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", self.kafka_utils.broker)
+ .option("topic", self.sink_topic)
+ .option("checkpointLocation", checkpoint_dir)
+ .outputMode("update")
+ .trigger(realTime="30 seconds")
+ .start()
+ )
+
+ # Wait for the streaming to process data
+ self.kafka_utils.wait_for_query_alive(query)
+
+ # Validate results
+ expected = {k: k for k in sorted([str(i) for i in range(10)])}
+ try:
+ self.kafka_utils.assert_eventually(
+ result_func=lambda: self.kafka_utils.get_all_records(
+ self.spark, self.sink_topic
+ ), expected=expected
+ )
+ finally:
+ query.stop()
+
+if __name__ == "__main__":
+ try:
+ import xmlrunner
+
+ testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2)
+ except ImportError:
+ testRunner = None
+ unittest.main(testRunner=testRunner, verbosity=2)
diff --git a/python/pyspark/testing/sqlutils.py b/python/pyspark/testing/sqlutils.py
index b63c98f96f4e..3c3d3bc50e80 100644
--- a/python/pyspark/testing/sqlutils.py
+++ b/python/pyspark/testing/sqlutils.py
@@ -63,6 +63,45 @@ def search_jar(project_relative_path, sbt_jar_name_prefix, mvn_jar_name_prefix):
return jars[0]
+def read_classpath(project_relative_path):
+ """
+ Read the classpath file for a project and return it as a comma-separated string.
+
+ The classpath file is typically generated by Maven and contains colon-separated paths.
+ This function reads the file and converts it to comma-separated format for use with
+ spark-submit's --jars option.
+
+ Args:
+ project_relative_path: Relative path from SPARK_HOME to the project directory
+
+ Returns:
+ Comma-separated string of JAR paths
+
+ Raises:
+ RuntimeError: If the classpath.txt file does not exist
+
+ Example:
+ >>> classpath = read_classpath("connector/kafka-0-10-sql")
+ >>> # Returns: "/path/to/jar1.jar,/path/to/jar2.jar,..."
+ """
+ project_full_path = os.path.join(SPARK_HOME, project_relative_path)
+ classpath_file = os.path.join(project_full_path, "target", "classpath.txt")
+
+ if not os.path.exists(classpath_file):
+ raise RuntimeError(
+ f"Classpath file not found at {classpath_file}. "
+ f"Please build the project with Maven: "
+ f"'build/mvn -pl {project_relative_path} dependency:build-classpath "
+ f"-Dmdep.outputFile=target/classpath.txt'"
+ )
+
+ with open(classpath_file, "r") as f:
+ classpath = f.read().strip()
+
+ # Replace colon with comma for spark-submit --jars format
+ return classpath.replace(":", ",")
+
+
test_not_compiled_message = None
try:
from pyspark.sql.utils import require_test_compiled