From 6188ce2d57578e0f785820b2afc20e2cb8bd1b36 Mon Sep 17 00:00:00 2001 From: Jerry Peng Date: Tue, 9 Dec 2025 22:55:26 +0000 Subject: [PATCH 1/4] [SPARK-54660] Add RTM trigger to python and tests --- dev/requirements.txt | 5 + .../sql/tests/streaming/KAFKA_TESTING.md | 487 ++++++++++++++++++ .../sql/tests/streaming/kafka_utils.py | 367 +++++++++++++ .../streaming/test_streaming_kafka_rtm.py | 177 +++++++ 4 files changed, 1036 insertions(+) create mode 100644 python/pyspark/sql/tests/streaming/KAFKA_TESTING.md create mode 100644 python/pyspark/sql/tests/streaming/kafka_utils.py create mode 100644 python/pyspark/sql/tests/streaming/test_streaming_kafka_rtm.py diff --git a/dev/requirements.txt b/dev/requirements.txt index cde0957715bf..d7a836ad898c 100644 --- a/dev/requirements.txt +++ b/dev/requirements.txt @@ -21,6 +21,11 @@ openpyxl # PySpark test dependencies (optional) coverage +# Kafka streaming test dependencies (optional) +# Required for running Kafka integration tests with Docker test containers +testcontainers[kafka]>=3.7.0 +kafka-python>=2.0.2 + # Linter mypy==1.8.0 pytest-mypy-plugins==1.9.3 diff --git a/python/pyspark/sql/tests/streaming/KAFKA_TESTING.md b/python/pyspark/sql/tests/streaming/KAFKA_TESTING.md new file mode 100644 index 000000000000..b447ddb02065 --- /dev/null +++ b/python/pyspark/sql/tests/streaming/KAFKA_TESTING.md @@ -0,0 +1,487 @@ +# Kafka Testing with PySpark + +This document explains how to write PySpark unit tests that interact with a Kafka cluster using Docker containers. + +## Overview + +The `kafka_utils.py` module provides `KafkaUtils` class that launches a single-broker Kafka cluster via Docker using the `testcontainers-python` library. This enables end-to-end integration testing of PySpark Kafka streaming applications without requiring manual Kafka setup. + +## Prerequisites + +### 1. Docker + +Docker must be installed and running on your system. The Kafka test container requires Docker to launch the Kafka broker. + +- **Install Docker**: https://docs.docker.com/get-docker/ +- **Verify Docker is running**: `docker ps` + +### 2. Python Dependencies + +Install the required Python packages: + +```bash +pip install testcontainers[kafka] kafka-python +``` + +Or install all dev dependencies: + +```bash +cd $SPARK_HOME +pip install -r dev/requirements.txt +``` + +## Quick Start + +Here's a minimal example of writing a Kafka test: + +```python +import unittest +from pyspark.sql.tests.streaming.kafka_utils import KafkaUtils +from pyspark.testing.sqlutils import ReusedSQLTestCase + +class MyKafkaTest(ReusedSQLTestCase): + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.kafka_utils = KafkaUtils() + cls.kafka_utils.setup() + + @classmethod + def tearDownClass(cls): + cls.kafka_utils.teardown() + super().tearDownClass() + + def test_kafka_read_write(self): + # Create a topic + topic = "test-topic" + self.kafka_utils.create_topics([topic]) + + # Send test data + messages = [("key1", "value1"), ("key2", "value2")] + self.kafka_utils.send_messages(topic, messages) + + # Read with Spark + df = ( + self.spark.read + .format("kafka") + .option("kafka.bootstrap.servers", self.kafka_utils.broker) + .option("subscribe", topic) + .option("startingOffsets", "earliest") + .load() + ) + + # Verify data + results = df.selectExpr( + "CAST(key AS STRING) as key", + "CAST(value AS STRING) as value" + ).collect() + + self.assertEqual(len(results), 2) +``` + +## KafkaUtils API Reference + +### Initialization and Lifecycle + +#### `__init__(kafka_version="7.4.0")` + +Create a new KafkaUtils instance. + +- **kafka_version**: Version of Confluent Kafka to use (default: "7.4.0" for stability) + +#### `setup()` + +Start the Kafka container and initialize clients. This must be called before using any other methods. + +**Raises:** +- `ImportError`: If required dependencies are not installed +- `RuntimeError`: If Kafka container fails to start + +**Note:** Container startup can take 10-30 seconds on first run while Docker pulls the image. + +#### `teardown()` + +Stop the Kafka container and clean up resources. Safe to call multiple times. + +### Topic Management + +#### `create_topics(topic_names, num_partitions=1, replication_factor=1)` + +Create one or more Kafka topics. + +**Parameters:** +- **topic_names** (List[str]): List of topic names to create +- **num_partitions** (int): Number of partitions per topic (default: 1) +- **replication_factor** (int): Replication factor (default: 1, max: 1 for single broker) + +**Example:** +```python +# Create single partition topics +kafka_utils.create_topics(["topic1", "topic2"]) + +# Create multi-partition topic +kafka_utils.create_topics(["multi-partition-topic"], num_partitions=3) +``` + +#### `delete_topics(topic_names)` + +Delete one or more Kafka topics. + +**Parameters:** +- **topic_names** (List[str]): List of topic names to delete + +### Producing Data + +#### `send_messages(topic, messages)` + +Send messages to a Kafka topic. + +**Parameters:** +- **topic** (str): Topic name +- **messages** (List[tuple]): List of (key, value) tuples + +**Example:** +```python +kafka_utils.send_messages("test-topic", [ + ("user1", "login"), + ("user2", "logout"), + ("user1", "purchase"), +]) +``` + +### Reading Data + +#### `get_all_records(spark, topic, key_deserializer="STRING", value_deserializer="STRING")` + +Read all records from a Kafka topic using Spark batch read. + +**Parameters:** +- **spark**: SparkSession instance +- **topic** (str): Topic name +- **key_deserializer** (str): How to deserialize keys (default: "STRING") +- **value_deserializer** (str): How to deserialize values (default: "STRING") + +**Returns:** Dictionary mapping keys to values + +**Example:** +```python +records = kafka_utils.get_all_records(self.spark, "test-topic") +assert records == {"key1": "value1", "key2": "value2"} +``` + +### Testing Utilities + +#### `assert_eventually(result_func, expected, timeout=60, interval=1.0)` + +Assert that a condition becomes true within a timeout. Useful for testing streaming queries with eventually consistent results. + +**Parameters:** +- **result_func** (Callable): Function that returns the current result +- **expected**: Expected result value +- **timeout** (int): Maximum time to wait in seconds (default: 60) +- **interval** (float): Time between checks in seconds (default: 1.0) + +**Raises:** `AssertionError` if condition doesn't become true within timeout + +**Example:** +```python +kafka_utils.assert_eventually( + lambda: kafka_utils.get_all_records(self.spark, "sink-topic"), + {"key1": "processed-value1"}, + timeout=30 +) +``` + +#### `wait_for_query_alive(query, timeout=60, interval=1.0)` + +Wait for a streaming query to become active and ready to process data. + +**Parameters:** +- **query**: StreamingQuery instance +- **timeout** (int): Maximum time to wait in seconds (default: 60) +- **interval** (float): Time between checks in seconds (default: 1.0) + +**Raises:** `AssertionError` if query doesn't become active within timeout + +**Example:** +```python +query = df.writeStream.format("memory").start() +kafka_utils.wait_for_query_alive(query, timeout=30) +``` + +### Properties + +#### `broker` + +Get the Kafka bootstrap server address (e.g., "localhost:9093"). + +#### `producer` + +Get the underlying KafkaProducer instance for advanced usage. + +#### `admin_client` + +Get the underlying KafkaAdminClient instance for advanced usage. + +## Common Testing Patterns + +### Pattern 1: Batch Read/Write + +Test basic Kafka read and write with Spark batch processing: + +```python +def test_kafka_batch(self): + topic = "test-topic" + self.kafka_utils.create_topics([topic]) + + # Write with Spark DataFrame + df = self.spark.createDataFrame([("key1", "value1")], ["key", "value"]) + ( + df.selectExpr("CAST(key AS BINARY)", "CAST(value AS BINARY)") + .write + .format("kafka") + .option("kafka.bootstrap.servers", self.kafka_utils.broker) + .option("topic", topic) + .save() + ) + + # Read back + records = self.kafka_utils.get_all_records(self.spark, topic) + assert records == {"key1": "value1"} +``` + +### Pattern 2: Streaming Queries + +Test streaming queries with checkpoint management: + +```python +def test_kafka_streaming(self): + import tempfile + import os + + # Setup topics + source_topic = "source" + sink_topic = "sink" + self.kafka_utils.create_topics([source_topic, sink_topic]) + + # Produce initial data + self.kafka_utils.send_messages(source_topic, [("k1", "v1")]) + + # Start streaming query + df = ( + self.spark.readStream + .format("kafka") + .option("kafka.bootstrap.servers", self.kafka_utils.broker) + .option("subscribe", source_topic) + .option("startingOffsets", "earliest") + .load() + ) + + checkpoint_dir = os.path.join(tempfile.mkdtemp(), "checkpoint") + query = ( + df.writeStream + .format("kafka") + .option("kafka.bootstrap.servers", self.kafka_utils.broker) + .option("topic", sink_topic) + .option("checkpointLocation", checkpoint_dir) + .start() + ) + + try: + self.kafka_utils.wait_for_query_alive(query) + self.kafka_utils.assert_eventually( + lambda: self.kafka_utils.get_all_records(self.spark, sink_topic), + {"k1": "v1"} + ) + finally: + query.stop() +``` + +### Pattern 3: Stateful Aggregations + +Test streaming aggregations: + +```python +def test_kafka_aggregation(self): + # Send data for aggregation + self.kafka_utils.send_messages("source", [ + ("user1", "1"), + ("user2", "1"), + ("user1", "1"), + ]) + + # Aggregate by key + df = ( + self.spark.readStream + .format("kafka") + .option("kafka.bootstrap.servers", self.kafka_utils.broker) + .option("subscribe", "source") + .load() + .groupBy(col("key")) + .count() + .selectExpr("CAST(key AS BINARY)", "CAST(count AS STRING) AS value") + ) + + query = df.writeStream.format("kafka") # ... start query + + # Verify aggregated results + self.kafka_utils.assert_eventually( + lambda: self.kafka_utils.get_all_records(self.spark, "sink"), + {"user1": "2", "user2": "1"} + ) +``` + +### Pattern 4: Multiple Topics + +Test writing to multiple topics based on data: + +```python +def test_multiple_topics(self): + topic1, topic2 = "topic1", "topic2" + self.kafka_utils.create_topics([topic1, topic2]) + + # Write with topic column + df = self.spark.createDataFrame([ + (topic1, "key1", "value1"), + (topic2, "key2", "value2"), + ], ["topic", "key", "value"]) + + ( + df.selectExpr("topic", "CAST(key AS BINARY)", "CAST(value AS BINARY)") + .write + .format("kafka") + .option("kafka.bootstrap.servers", self.kafka_utils.broker) + .save() + ) + + # Verify data in each topic + assert self.kafka_utils.get_all_records(self.spark, topic1) == {"key1": "value1"} + assert self.kafka_utils.get_all_records(self.spark, topic2) == {"key2": "value2"} +``` + +## Running Tests + +### Run All Kafka Tests + +```bash +cd $SPARK_HOME/python +python -m pytest pyspark/sql/tests/streaming/test_streaming_kafka.py -v +``` + +### Run Specific Test + +```bash +python -m pytest pyspark/sql/tests/streaming/test_streaming_kafka.py::StreamingKafkaTests::test_kafka_batch_read -v +``` + +### Run with unittest + +```bash +cd $SPARK_HOME/python +python -m unittest pyspark.sql.tests.streaming.test_streaming_kafka +``` + +## Troubleshooting + +### Docker Not Running + +**Error:** +``` +Cannot connect to the Docker daemon at unix:///var/run/docker.sock +``` + +**Solution:** Start Docker Desktop or Docker daemon + +### Container Startup Timeout + +**Error:** +``` +Kafka container failed to start within timeout +``` + +**Solutions:** +1. Increase timeout in test code +2. Check Docker resource allocation (CPU/memory) +3. Check Docker logs: `docker logs ` + +### Port Conflicts + +**Error:** +``` +Port 9093 already in use +``` + +**Solution:** testcontainers automatically allocates random ports. Ensure no manual port binding in tests. + +### Missing Dependencies + +**Error:** +``` +ImportError: testcontainers is required for Kafka tests +``` + +**Solution:** +```bash +pip install testcontainers[kafka] kafka-python +``` + +## Best Practices + +1. **Use Class-Level Setup**: Initialize KafkaUtils once per test class in `setUpClass()` to avoid repeated container startup overhead. + +2. **Unique Topic Names**: Use UUID or test-specific names to prevent interference between tests: + ```python + topic = f"test-{uuid.uuid4().hex}" + ``` + +3. **Clean Up Topics**: Delete topics in `tearDown()` to avoid state leakage between tests. + +4. **Timeouts**: Use reasonable timeouts for `assert_eventually()` and `wait_for_query_alive()` (typically 30-60 seconds). + +5. **Checkpoint Management**: Use unique checkpoint directories per test to avoid conflicts: + ```python + checkpoint_dir = os.path.join(tempfile.mkdtemp(), "checkpoint") + ``` + +6. **Skip When Unavailable**: Mark tests as skippable when dependencies are missing: + ```python + @unittest.skipIf(not have_kafka_dependencies, "Kafka dependencies not available") + ``` + +## Architecture + +The implementation uses: + +- **testcontainers-python**: Manages Docker container lifecycle +- **kafka-python**: Provides Kafka admin and producer clients +- **Confluent Kafka Docker Image**: Runs single-broker Kafka cluster with embedded ZooKeeper +- **Dynamic Port Allocation**: testcontainers allocates random ports to avoid conflicts + +## Comparison with Scala KafkaTestUtils + +| Feature | Scala KafkaTestUtils | Python KafkaUtils | +|---------|---------------------|-------------------| +| Implementation | Embedded Kafka server | Docker container | +| Setup time | ~5-10 seconds | ~10-30 seconds (first run) | +| Docker required | No | Yes | +| Isolation | Process-level | Container-level | +| Port conflicts | Possible | Rare (dynamic ports) | +| SSL/Kerberos | Supported | Not yet implemented | + +## Future Enhancements + +Potential improvements: + +- SSL/TLS support for encrypted connections +- SASL/Kerberos authentication +- Multi-broker cluster support +- Schema Registry integration +- Custom Kafka configuration options +- Performance optimization for faster startup + +## See Also + +- [test_streaming_kafka.py](test_streaming_kafka.py) - Example test implementations +- [kafka_utils.py](kafka_utils.py) - Source code and API documentation +- [Spark Structured Streaming + Kafka Integration Guide](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html) +- [testcontainers-python Documentation](https://testcontainers-python.readthedocs.io/) diff --git a/python/pyspark/sql/tests/streaming/kafka_utils.py b/python/pyspark/sql/tests/streaming/kafka_utils.py new file mode 100644 index 000000000000..7dc63f0d16d4 --- /dev/null +++ b/python/pyspark/sql/tests/streaming/kafka_utils.py @@ -0,0 +1,367 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Utilities for running PySpark tests against a Kafka cluster using Docker containers. + +This module provides KafkaUtils class that launches a single-broker Kafka cluster +via Docker using testcontainers-python library. It's designed to be used with +Python unittest-based PySpark tests. + +Example usage: + class MyKafkaTest(ReusedSQLTestCase): + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.kafka_utils = KafkaUtils() + cls.kafka_utils.setup() + + @classmethod + def tearDownClass(cls): + cls.kafka_utils.teardown() + super().tearDownClass() + + def test_kafka_streaming(self): + topic = "test-topic" + self.kafka_utils.create_topics([topic]) + # ... use self.kafka_utils.broker for bootstrap.servers +""" + +import time +from typing import List, Dict, Optional, Callable, Any + + +class KafkaUtils: + """ + Utility class for managing a Kafka test cluster using Docker containers. + + This class provides methods to: + - Start/stop a single-broker Kafka cluster in a Docker container + - Create and delete topics + - Send messages to topics + - Query topic data using Spark + - Helper methods for testing (assert_eventually, wait_for_query_alive) + + Attributes: + broker (str): The bootstrap server address (e.g., "localhost:9093") + initialized (bool): Whether the Kafka cluster has been started + """ + + def __init__(self, kafka_version: str = "7.4.0"): + """ + Initialize KafkaUtils. + + Args: + kafka_version: Version of Confluent Kafka to use (default: 7.4.0 for stability) + """ + self.kafka_version = kafka_version + self.initialized = False + self._kafka_container = None + self._admin_client = None + self._producer = None + self.broker = None + + def setup(self) -> None: + """ + Start the Kafka container and initialize admin client and producer. + + This method: + 1. Starts a Kafka container using testcontainers + 2. Creates an admin client for topic management + 3. Creates a producer for sending test messages + + Raises: + ImportError: If required dependencies (testcontainers, kafka-python) are not installed + RuntimeError: If Kafka container fails to start + """ + if self.initialized: + return + + try: + from testcontainers.kafka import KafkaContainer + except ImportError as e: + raise ImportError( + "testcontainers is required for Kafka tests. " + "Install it with: pip install testcontainers[kafka]" + ) from e + + try: + from kafka import KafkaProducer + from kafka.admin import KafkaAdminClient + except ImportError as e: + raise ImportError( + "kafka-python is required for Kafka tests. " + "Install it with: pip install kafka-python" + ) from e + + # Start Kafka container with specific version for test stability + self._kafka_container = KafkaContainer(f"confluentinc/cp-kafka:{self.kafka_version}") + self._kafka_container.start() + + # Get bootstrap server address + self.broker = self._kafka_container.get_bootstrap_server() + + # Initialize admin client for topic management + self._admin_client = KafkaAdminClient( + bootstrap_servers=self.broker, + request_timeout_ms=10000, + api_version_auto_timeout_ms=10000 + ) + + # Initialize producer for sending test messages + self._producer = KafkaProducer( + bootstrap_servers=self.broker, + key_serializer=lambda k: str(k).encode("utf-8") if k is not None else None, + value_serializer=lambda v: str(v).encode("utf-8") if v is not None else None, + request_timeout_ms=10000, + max_block_ms=10000 + ) + + self.initialized = True + + def teardown(self) -> None: + """ + Stop the Kafka container and clean up resources. + + This method closes the admin client, producer, and stops the Kafka container. + It's safe to call multiple times. + """ + if not self.initialized: + return + + # Close admin client + if self._admin_client is not None: + try: + self._admin_client.close() + except Exception: + pass + self._admin_client = None + + # Close producer + if self._producer is not None: + try: + self._producer.close(timeout=5) + except Exception: + pass + self._producer = None + + # Stop Kafka container + if self._kafka_container is not None: + try: + self._kafka_container.stop() + except Exception: + pass + self._kafka_container = None + + self.broker = None + self.initialized = False + + def _assert_initialized(self) -> None: + """Check if KafkaUtils has been initialized, raise error if not.""" + if not self.initialized: + raise RuntimeError( + "KafkaUtils has not been initialized. Call setup() first." + ) + + def create_topics(self, topic_names: List[str], num_partitions: int = 1, + replication_factor: int = 1) -> None: + """ + Create Kafka topics. + + Args: + topic_names: List of topic names to create + num_partitions: Number of partitions per topic (default: 1) + replication_factor: Replication factor (default: 1, max: 1 for single broker) + + Note: + If a topic already exists, it will be silently ignored. + """ + self._assert_initialized() + + from kafka.admin import NewTopic + from kafka.errors import TopicAlreadyExistsError + + topics = [ + NewTopic( + name=name, + num_partitions=num_partitions, + replication_factor=replication_factor + ) + for name in topic_names + ] + + try: + self._admin_client.create_topics(new_topics=topics, validate_only=False) + except TopicAlreadyExistsError: + # Topic already exists, ignore + pass + + def delete_topics(self, topic_names: List[str]) -> None: + """ + Delete Kafka topics. + + Args: + topic_names: List of topic names to delete + + Note: + If a topic doesn't exist, it will be silently ignored. + """ + self._assert_initialized() + + from kafka.errors import UnknownTopicOrPartitionError + + try: + self._admin_client.delete_topics(topics=topic_names) + except UnknownTopicOrPartitionError: + # Topic doesn't exist, ignore + pass + + def send_messages(self, topic: str, messages: List[tuple]) -> None: + """ + Send messages to a Kafka topic. + + Args: + topic: Topic name to send messages to + messages: List of (key, value) tuples to send + + Example: + kafka_utils.send_messages("test-topic", [ + ("key1", "value1"), + ("key2", "value2"), + ]) + """ + self._assert_initialized() + + for key, value in messages: + future = self._producer.send(topic, key=key, value=value) + future.get(timeout=10) # Wait for send to complete + + self._producer.flush() + + def get_all_records(self, spark, topic: str, + key_deserializer: str = "STRING", + value_deserializer: str = "STRING") -> Dict[str, str]: + """ + Read all records from a Kafka topic using Spark. + + Args: + spark: SparkSession instance + topic: Topic name to read from + key_deserializer: How to deserialize keys (default: "STRING") + value_deserializer: How to deserialize values (default: "STRING") + + Returns: + Dictionary mapping keys to values + + Example: + records = kafka_utils.get_all_records(self.spark, "test-topic") + assert records == {"key1": "value1", "key2": "value2"} + """ + self._assert_initialized() + + df = ( + spark.read + .format("kafka") + .option("kafka.bootstrap.servers", self.broker) + .option("subscribe", topic) + .option("startingOffsets", "earliest") + .option("endingOffsets", "latest") + .load() + ) + + df = df.selectExpr( + f"CAST(key AS {key_deserializer}) AS key_str", + f"CAST(value AS {value_deserializer}) AS value_str" + ) + + rows = df.collect() + return {row.key_str: row.value_str for row in rows} + + def assert_eventually(self, result_func: Callable[[], Any], expected: Any, + timeout: int = 60, interval: float = 1.0) -> None: + """ + Assert that a condition becomes true within a timeout. + + This is useful for testing streaming queries where results are eventually consistent. + + Args: + result_func: Function that returns the current result + expected: Expected result value + timeout: Maximum time to wait in seconds (default: 60) + interval: Time between checks in seconds (default: 1.0) + + Raises: + AssertionError: If the condition doesn't become true within timeout + + Example: + kafka_utils.assert_eventually( + lambda: kafka_utils.get_all_records(spark, topic), + {"key1": "value1"} + ) + """ + deadline = time.time() + timeout + last_result = None + + while time.time() < deadline: + last_result = result_func() + if last_result == expected: + return # Success! + time.sleep(interval) + + # Timeout reached, raise assertion error + raise AssertionError( + f"Condition not met within {timeout}s. " + f"Expected: {expected}, Got: {last_result}" + ) + + def wait_for_query_alive(self, query, timeout: int = 60, interval: float = 1.0) -> None: + """ + Wait for a streaming query to become active and ready to process data. + + Args: + query: StreamingQuery instance + timeout: Maximum time to wait in seconds (default: 60) + interval: Time between checks in seconds (default: 1.0) + + Raises: + AssertionError: If the query doesn't become active within timeout + + Example: + query = df.writeStream.format("memory").start() + kafka_utils.wait_for_query_alive(query) + """ + deadline = time.time() + timeout + + while time.time() < deadline: + status = query.status + if status["isDataAvailable"] or status["isTriggerActive"]: + return # Query is alive! + time.sleep(interval) + + raise AssertionError(f"Query did not become active within {timeout}s.") + + @property + def producer(self): + """Get the Kafka producer instance for advanced usage.""" + self._assert_initialized() + return self._producer + + @property + def admin_client(self): + """Get the Kafka admin client instance for advanced usage.""" + self._assert_initialized() + return self._admin_client diff --git a/python/pyspark/sql/tests/streaming/test_streaming_kafka_rtm.py b/python/pyspark/sql/tests/streaming/test_streaming_kafka_rtm.py new file mode 100644 index 000000000000..70239c1fd27f --- /dev/null +++ b/python/pyspark/sql/tests/streaming/test_streaming_kafka_rtm.py @@ -0,0 +1,177 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +PySpark tests for Kafka streaming integration using Docker test containers. + +These tests demonstrate how to use KafkaUtils to test Spark streaming with Kafka. +Tests require Docker to be running and the following Python packages: +- testcontainers[kafka] +- kafka-python +""" + +import os +import tempfile +import unittest +import uuid + +from pyspark.testing.sqlutils import ReusedSQLTestCase, search_jar, read_classpath + +# Setup Kafka JAR on classpath before SparkSession is created +# This follows the same pattern as streamingutils.py for Kinesis +kafka_sql_jar = search_jar( + "connector/kafka-0-10-sql", + "spark-sql-kafka-0-10-", + "spark-sql-kafka-0-10_", +) + +if kafka_sql_jar is None: + raise RuntimeError( + "Kafka SQL connector JAR was not found. " + "To run these tests, you need to build Spark with " + "'build/mvn -Pkafka-0-10 package' or 'build/sbt kafka-0-10-sql/package' " + "before running this test." + ) + + +# Read the full classpath including all dependencies +kafka_classpath = read_classpath("connector/kafka-0-10-sql") + +# Combine the main JAR with its dependencies +all_jars = f"{kafka_sql_jar},{kafka_classpath}" + +# Add Kafka JAR to PYSPARK_SUBMIT_ARGS before SparkSession is created +existing_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "pyspark-shell") +jars_args = "--jars %s" % all_jars + +os.environ["PYSPARK_SUBMIT_ARGS"] = " ".join([jars_args, existing_args]) + +from pyspark.sql.functions import col +from pyspark.sql.tests.streaming.kafka_utils import KafkaUtils + + +# Check if required Python dependencies are available +try: + import testcontainers # noqa: F401 + import kafka # noqa: F401 +except ImportError as e: + raise ImportError( + "Kafka test dependencies not available. " + "Install with: pip install testcontainers[kafka] kafka-python" + ) from e + + +class StreamingKafkaTestsMixin: + """ + Base mixin for Kafka streaming tests that provides KafkaUtils setup/teardown + and topic management. + """ + + @classmethod + def setUpClass(cls): + super().setUpClass() + # Start Kafka container - this may take 10-30 seconds on first run + cls.kafka_utils = KafkaUtils() + cls.kafka_utils.setup() + + + @classmethod + def tearDownClass(cls): + # Stop Kafka container and clean up resources + if hasattr(cls, 'kafka_utils'): + cls.kafka_utils.teardown() + super().tearDownClass() + + def setUp(self): + super().setUp() + # Create unique topics for each test to avoid interference + self.source_topic = f"source-{uuid.uuid4().hex}" + self.sink_topic = f"sink-{uuid.uuid4().hex}" + self.kafka_utils.create_topics([self.source_topic, self.sink_topic]) + # Set log level to INFO for better visibility during testing + self.spark.sparkContext.setLogLevel("INFO") + + def tearDown(self): + # Clean up topics after each test + self.kafka_utils.delete_topics([self.source_topic, self.sink_topic]) + super().tearDown() + + +class StreamingKafkaTests(StreamingKafkaTestsMixin, ReusedSQLTestCase): + """ + Tests for Kafka streaming integration with PySpark. + """ + + def test_streaming_stateless(self): + """ + Test stateless rtm query with earliest offset. + """ + + # produce test data to source_topic + for i in range(10): + self.kafka_utils.producer.send( + self.source_topic, + key=i, + value=i, + ).get(timeout=10) + self.kafka_utils.producer.flush() + + # Build streaming query for Kafka to Kafka. + kafka_source = ( + self.spark.readStream + .format("kafka") + .option("kafka.bootstrap.servers", self.kafka_utils.broker) + .option("subscribe", self.source_topic) + .option("startingOffsets", "earliest") + .load() + ) + + checkpoint_dir = os.path.join(tempfile.mkdtemp(), "checkpoint") + + query = ( + kafka_source.writeStream + .format("kafka") + .option("kafka.bootstrap.servers", self.kafka_utils.broker) + .option("topic", self.sink_topic) + .option("checkpointLocation", checkpoint_dir) + .outputMode("update") + .trigger(realTime="30 seconds") + .start() + ) + + # Wait for the streaming to process data + self.kafka_utils.wait_for_query_alive(query) + + # Validate results + expected = {k: k for k in sorted([str(i) for i in range(10)])} + try: + self.kafka_utils.assert_eventually( + result_func=lambda: self.kafka_utils.get_all_records( + self.spark, self.sink_topic + ), expected=expected + ) + finally: + query.stop() + +if __name__ == "__main__": + try: + import xmlrunner + + testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2) + except ImportError: + testRunner = None + unittest.main(testRunner=testRunner, verbosity=2) From 29ed7fe1fd869aa347eeeec88d21f78a3a835319 Mon Sep 17 00:00:00 2001 From: Jerry Peng Date: Tue, 9 Dec 2025 21:47:17 -0800 Subject: [PATCH 2/4] adding --- connector/kafka-0-10-sql/pom.xml | 20 ++++++++++ python/pyspark/sql/streaming/readwriter.py | 26 +++++++++++-- .../sql/tests/streaming/KAFKA_TESTING.md | 6 +-- python/pyspark/testing/sqlutils.py | 39 +++++++++++++++++++ 4 files changed, 85 insertions(+), 6 deletions(-) diff --git a/connector/kafka-0-10-sql/pom.xml b/connector/kafka-0-10-sql/pom.xml index fd0ca36d36df..383e63cce788 100644 --- a/connector/kafka-0-10-sql/pom.xml +++ b/connector/kafka-0-10-sql/pom.xml @@ -186,6 +186,26 @@ target/scala-${scala.binary.version}/classes target/scala-${scala.binary.version}/test-classes + + + org.apache.maven.plugins + maven-dependency-plugin + 3.6.1 + + + write-classpath + + build-classpath + + generate-resources + + runtime + ${project.build.directory}/classpath.txt + + + + + diff --git a/python/pyspark/sql/streaming/readwriter.py b/python/pyspark/sql/streaming/readwriter.py index 8121dd609950..27223f531425 100644 --- a/python/pyspark/sql/streaming/readwriter.py +++ b/python/pyspark/sql/streaming/readwriter.py @@ -1243,6 +1243,7 @@ def trigger( once: Optional[bool] = None, continuous: Optional[str] = None, availableNow: Optional[bool] = None, + realTime: Optional[str] = None ) -> "DataStreamWriter": """Set the trigger for the stream query. If this is not set it will run the query as fast as possible, which is equivalent to setting the trigger to ``processingTime='0 seconds'``. @@ -1268,6 +1269,10 @@ def trigger( availableNow : bool, optional if set to True, set a trigger that processes all available data in multiple batches then terminates the query. Only one trigger can be set. + realTime : str, optional + a batch duration as a string, e.g. '5 seconds', '1 minute'. + Set a trigger that runs a real time mode query with + batch at the specified duration. Only one trigger can be set. Notes ----- @@ -1291,15 +1296,20 @@ def trigger( >>> df.writeStream.trigger(availableNow=True) <...streaming.readwriter.DataStreamWriter object ...> + + Trigger the query for real time mode execution every 5 seconds. + + >>> df.writeStream.trigger(realTime='5 seconds') + <...streaming.readwriter.DataStreamWriter object ...> """ - params = [processingTime, once, continuous, availableNow] + params = [processingTime, once, continuous, availableNow, realTime] - if params.count(None) == 4: + if params.count(None) == 5: raise PySparkValueError( errorClass="ONLY_ALLOW_SINGLE_TRIGGER", messageParameters={}, ) - elif params.count(None) < 3: + elif params.count(None) < 4: raise PySparkValueError( errorClass="ONLY_ALLOW_SINGLE_TRIGGER", messageParameters={}, @@ -1342,6 +1352,16 @@ def trigger( jTrigger = getattr( self._spark._sc._jvm, "org.apache.spark.sql.streaming.Trigger" ).Continuous(interval) + elif realTime is not None: + if type(realTime) != str or len(realTime.strip()) == 0: + raise PySparkValueError( + errorClass="VALUE_NOT_NON_EMPTY_STR", + messageParameters={"arg_name": "realTime", "arg_value": str(realTime)}, + ) + batch_duration = realTime.strip() + jTrigger = getattr( + self._spark._sc._jvm, "org.apache.spark.sql.streaming.Trigger" + ).RealTime(batch_duration) else: if availableNow is not True: raise PySparkValueError( diff --git a/python/pyspark/sql/tests/streaming/KAFKA_TESTING.md b/python/pyspark/sql/tests/streaming/KAFKA_TESTING.md index b447ddb02065..8a83fd577d48 100644 --- a/python/pyspark/sql/tests/streaming/KAFKA_TESTING.md +++ b/python/pyspark/sql/tests/streaming/KAFKA_TESTING.md @@ -365,13 +365,13 @@ def test_multiple_topics(self): ```bash cd $SPARK_HOME/python -python -m pytest pyspark/sql/tests/streaming/test_streaming_kafka.py -v +python -m pytest pyspark/sql/tests/streaming/test_streaming_kafka_rtm.py -v ``` ### Run Specific Test ```bash -python -m pytest pyspark/sql/tests/streaming/test_streaming_kafka.py::StreamingKafkaTests::test_kafka_batch_read -v +python -m pytest pyspark/sql/tests/streaming/test_streaming_kafka_rtm.py::StreamingKafkaTests::test_kafka_batch_read -v ``` ### Run with unittest @@ -481,7 +481,7 @@ Potential improvements: ## See Also -- [test_streaming_kafka.py](test_streaming_kafka.py) - Example test implementations +- [test_streaming_kafka.py](test_streaming_kafka_rtm.py) - Example test implementations - [kafka_utils.py](kafka_utils.py) - Source code and API documentation - [Spark Structured Streaming + Kafka Integration Guide](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html) - [testcontainers-python Documentation](https://testcontainers-python.readthedocs.io/) diff --git a/python/pyspark/testing/sqlutils.py b/python/pyspark/testing/sqlutils.py index b63c98f96f4e..3c3d3bc50e80 100644 --- a/python/pyspark/testing/sqlutils.py +++ b/python/pyspark/testing/sqlutils.py @@ -63,6 +63,45 @@ def search_jar(project_relative_path, sbt_jar_name_prefix, mvn_jar_name_prefix): return jars[0] +def read_classpath(project_relative_path): + """ + Read the classpath file for a project and return it as a comma-separated string. + + The classpath file is typically generated by Maven and contains colon-separated paths. + This function reads the file and converts it to comma-separated format for use with + spark-submit's --jars option. + + Args: + project_relative_path: Relative path from SPARK_HOME to the project directory + + Returns: + Comma-separated string of JAR paths + + Raises: + RuntimeError: If the classpath.txt file does not exist + + Example: + >>> classpath = read_classpath("connector/kafka-0-10-sql") + >>> # Returns: "/path/to/jar1.jar,/path/to/jar2.jar,..." + """ + project_full_path = os.path.join(SPARK_HOME, project_relative_path) + classpath_file = os.path.join(project_full_path, "target", "classpath.txt") + + if not os.path.exists(classpath_file): + raise RuntimeError( + f"Classpath file not found at {classpath_file}. " + f"Please build the project with Maven: " + f"'build/mvn -pl {project_relative_path} dependency:build-classpath " + f"-Dmdep.outputFile=target/classpath.txt'" + ) + + with open(classpath_file, "r") as f: + classpath = f.read().strip() + + # Replace colon with comma for spark-submit --jars format + return classpath.replace(":", ",") + + test_not_compiled_message = None try: from pyspark.sql.utils import require_test_compiled From 5d19c8730f979bf52df14e6d6f36e47da3b3554a Mon Sep 17 00:00:00 2001 From: Jerry Peng Date: Wed, 10 Dec 2025 15:03:49 -0800 Subject: [PATCH 3/4] testing --- .../sql/connect/streaming/readwriter.py | 19 +++++- .../sql/tests/streaming/KAFKA_TESTING.md | 61 ------------------- .../streaming/test_streaming_kafka_rtm.py | 2 - 3 files changed, 16 insertions(+), 66 deletions(-) diff --git a/python/pyspark/sql/connect/streaming/readwriter.py b/python/pyspark/sql/connect/streaming/readwriter.py index 21c513f88c0f..b7ca7dd1cbc0 100644 --- a/python/pyspark/sql/connect/streaming/readwriter.py +++ b/python/pyspark/sql/connect/streaming/readwriter.py @@ -493,6 +493,10 @@ def trigger(self, *, continuous: str) -> "DataStreamWriter": def trigger(self, *, availableNow: bool) -> "DataStreamWriter": ... + @overload + def trigger(self, *, realTime: str) -> "DataStreamWriter": + ... + def trigger( self, *, @@ -500,15 +504,16 @@ def trigger( once: Optional[bool] = None, continuous: Optional[str] = None, availableNow: Optional[bool] = None, + realTime: Optional[str] = None, ) -> "DataStreamWriter": - params = [processingTime, once, continuous, availableNow] + params = [processingTime, once, continuous, availableNow, realTime] - if params.count(None) == 4: + if params.count(None) == 5: raise PySparkValueError( errorClass="ONLY_ALLOW_SINGLE_TRIGGER", messageParameters={}, ) - elif params.count(None) < 3: + elif params.count(None) < 4: raise PySparkValueError( errorClass="ONLY_ALLOW_SINGLE_TRIGGER", messageParameters={}, @@ -541,6 +546,14 @@ def trigger( ) self._write_proto.continuous_checkpoint_interval = continuous.strip() + elif realTime is not None: + if type(realTime) != str or len(realTime.strip()) == 0: + raise PySparkValueError( + errorClass="VALUE_NOT_NON_EMPTY_STR", + messageParameters={"arg_name": "realTime", "arg_value": str(realTime)}, + ) + self._write_proto.real_time_batch_duration = realTime.strip() + else: if availableNow is not True: raise PySparkValueError( diff --git a/python/pyspark/sql/tests/streaming/KAFKA_TESTING.md b/python/pyspark/sql/tests/streaming/KAFKA_TESTING.md index 8a83fd577d48..e5df122726fe 100644 --- a/python/pyspark/sql/tests/streaming/KAFKA_TESTING.md +++ b/python/pyspark/sql/tests/streaming/KAFKA_TESTING.md @@ -424,64 +424,3 @@ ImportError: testcontainers is required for Kafka tests ```bash pip install testcontainers[kafka] kafka-python ``` - -## Best Practices - -1. **Use Class-Level Setup**: Initialize KafkaUtils once per test class in `setUpClass()` to avoid repeated container startup overhead. - -2. **Unique Topic Names**: Use UUID or test-specific names to prevent interference between tests: - ```python - topic = f"test-{uuid.uuid4().hex}" - ``` - -3. **Clean Up Topics**: Delete topics in `tearDown()` to avoid state leakage between tests. - -4. **Timeouts**: Use reasonable timeouts for `assert_eventually()` and `wait_for_query_alive()` (typically 30-60 seconds). - -5. **Checkpoint Management**: Use unique checkpoint directories per test to avoid conflicts: - ```python - checkpoint_dir = os.path.join(tempfile.mkdtemp(), "checkpoint") - ``` - -6. **Skip When Unavailable**: Mark tests as skippable when dependencies are missing: - ```python - @unittest.skipIf(not have_kafka_dependencies, "Kafka dependencies not available") - ``` - -## Architecture - -The implementation uses: - -- **testcontainers-python**: Manages Docker container lifecycle -- **kafka-python**: Provides Kafka admin and producer clients -- **Confluent Kafka Docker Image**: Runs single-broker Kafka cluster with embedded ZooKeeper -- **Dynamic Port Allocation**: testcontainers allocates random ports to avoid conflicts - -## Comparison with Scala KafkaTestUtils - -| Feature | Scala KafkaTestUtils | Python KafkaUtils | -|---------|---------------------|-------------------| -| Implementation | Embedded Kafka server | Docker container | -| Setup time | ~5-10 seconds | ~10-30 seconds (first run) | -| Docker required | No | Yes | -| Isolation | Process-level | Container-level | -| Port conflicts | Possible | Rare (dynamic ports) | -| SSL/Kerberos | Supported | Not yet implemented | - -## Future Enhancements - -Potential improvements: - -- SSL/TLS support for encrypted connections -- SASL/Kerberos authentication -- Multi-broker cluster support -- Schema Registry integration -- Custom Kafka configuration options -- Performance optimization for faster startup - -## See Also - -- [test_streaming_kafka.py](test_streaming_kafka_rtm.py) - Example test implementations -- [kafka_utils.py](kafka_utils.py) - Source code and API documentation -- [Spark Structured Streaming + Kafka Integration Guide](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html) -- [testcontainers-python Documentation](https://testcontainers-python.readthedocs.io/) diff --git a/python/pyspark/sql/tests/streaming/test_streaming_kafka_rtm.py b/python/pyspark/sql/tests/streaming/test_streaming_kafka_rtm.py index 70239c1fd27f..af43a396debb 100644 --- a/python/pyspark/sql/tests/streaming/test_streaming_kafka_rtm.py +++ b/python/pyspark/sql/tests/streaming/test_streaming_kafka_rtm.py @@ -102,8 +102,6 @@ def setUp(self): self.source_topic = f"source-{uuid.uuid4().hex}" self.sink_topic = f"sink-{uuid.uuid4().hex}" self.kafka_utils.create_topics([self.source_topic, self.sink_topic]) - # Set log level to INFO for better visibility during testing - self.spark.sparkContext.setLogLevel("INFO") def tearDown(self): # Clean up topics after each test From f36ded5eccd5eac0c4c09c61bf17bcccd7b22afb Mon Sep 17 00:00:00 2001 From: Jerry Peng Date: Wed, 10 Dec 2025 21:09:44 -0800 Subject: [PATCH 4/4] fixing --- dev/sparktestsupport/modules.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index ee2a953a7402..88c1dee3a833 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -659,6 +659,7 @@ def __hash__(self): "pyspark.sql.tests.streaming.test_streaming", "pyspark.sql.tests.streaming.test_streaming_foreach", "pyspark.sql.tests.streaming.test_streaming_foreach_batch", + "pyspark.sql.tests.streaming.test_streaming_kafka_rtm", "pyspark.sql.tests.streaming.test_streaming_listener", "pyspark.sql.tests.pandas.test_pandas_grouped_map_with_state", "pyspark.sql.tests.pandas.streaming.test_pandas_transform_with_state",