From 365838bbb9223111dbdd6ce3de1a26ac5d65a5e4 Mon Sep 17 00:00:00 2001 From: Vincent Potucek Date: Thu, 18 Dec 2025 11:43:09 +0100 Subject: [PATCH] MAJOR: Hotfix broken `spotless` config, adhere `SSOT` #21165 #21161 #21168 Signed-off-by: Vincent Potucek --- .github/scripts/checkstyle.py | 4 +- .github/scripts/develocity_reports.py | 352 +++++++++--------- .github/scripts/pr-format.py | 6 +- .github/workflows/build.yml | 5 +- build.gradle | 19 +- committer-tools/verify_license.py | 18 +- .../StaticInitializerThrowsRestExtension.java | 4 - .../converters/ByteArrayConverter.java | 3 - .../test/plugins/ThingTwo.java | 1 - .../test/plugins/ReadVersionFromResource.java | 1 - .../test/plugins/ReadVersionFromResource.java | 1 - .../test/plugins/SamplingConfigProvider.java | 4 - .../test/plugins/SamplingConnector.java | 1 - .../test/plugins/SamplingHeaderConverter.java | 1 - .../SubclassOfClasspathOverridePolicy.java | 1 - .../test/plugins/VersionedPredicate.java | 1 - .../test/plugins/VersionedTransformation.java | 1 - docker/docker_build_test.py | 2 +- .../extract_docker_official_image_artifact.py | 10 +- .../prepare_docker_official_image_source.py | 4 +- docker/test/docker_sanity_test.py | 28 +- gradle/spotless.gradle | 41 ++ release/git.py | 2 +- .../benchmarks/core/benchmark_test.py | 12 +- .../services/console_share_consumer.py | 4 +- tests/kafkatest/services/kafka/kafka.py | 10 +- .../services/log_compaction_tester.py | 2 +- .../services/security/security_config.py | 2 +- .../kafkatest/services/verifiable_consumer.py | 12 +- .../services/verifiable_share_consumer.py | 14 +- .../client/consumer_rolling_upgrade_test.py | 2 +- tests/kafkatest/tests/client/consumer_test.py | 12 +- .../tests/connect/connect_distributed_test.py | 22 +- .../tests/connect/connect_rest_test.py | 4 +- .../tests/core/network_degrade_test.py | 2 +- .../tests/core/replica_scale_test.py | 2 +- tests/kafkatest/tests/end_to_end.py | 6 +- .../streams/streams_broker_bounce_test.py | 14 +- .../tests/streams/streams_upgrade_test.py | 22 +- .../tests/verifiable_share_consumer_test.py | 6 +- tests/kafkatest/utils/util.py | 4 +- 41 files changed, 335 insertions(+), 327 deletions(-) create mode 100644 gradle/spotless.gradle diff --git a/.github/scripts/checkstyle.py b/.github/scripts/checkstyle.py index d6af54c438eda..44316696d2c89 100644 --- a/.github/scripts/checkstyle.py +++ b/.github/scripts/checkstyle.py @@ -42,7 +42,7 @@ def parse_report(workspace_path, fp) -> Tuple[int, int]: error_count = 0 for (event, elem) in xml.etree.ElementTree.iterparse(fp, events=["start", "end"]): if event == "start": - stack.append(elem) + stack.append(elem) if elem.tag == "file": file_count += 1 errors.clear() @@ -77,7 +77,7 @@ def parse_report(workspace_path, fp) -> Tuple[int, int]: if not os.getenv("GITHUB_WORKSPACE"): print("This script is intended to by run by GitHub Actions.") exit(1) - + reports = glob(pathname="**/checkstyle/*.xml", recursive=True) logger.debug(f"Found {len(reports)} checkstyle reports") total_file_count = 0 diff --git a/.github/scripts/develocity_reports.py b/.github/scripts/develocity_reports.py index 2d30cbffa5550..e9294c5d7d4d1 100644 --- a/.github/scripts/develocity_reports.py +++ b/.github/scripts/develocity_reports.py @@ -33,7 +33,7 @@ @dataclass class TestOutcome: passed: int - failed: int + failed: int skipped: int flaky: int not_selected: int = field(metadata={'name': 'notSelected'}) @@ -70,18 +70,18 @@ class TestContainerResult: class TestCaseResult(TestResult): """Extends TestResult to include container-specific information""" container_name: str = "" - + @dataclass class BuildCache: last_update: datetime builds: Dict[str, 'BuildInfo'] - + def to_dict(self): return { 'last_update': self.last_update.isoformat(), 'builds': {k: asdict(v) for k, v in self.builds.items()} } - + @classmethod def from_dict(cls, data: dict) -> 'BuildCache': return cls( @@ -93,7 +93,7 @@ class CacheProvider(ABC): @abstractmethod def get_cache(self) -> Optional[BuildCache]: pass - + @abstractmethod def save_cache(self, cache: BuildCache): pass @@ -104,7 +104,7 @@ def __init__(self, cache_dir: str = None): cache_dir = os.path.join(os.path.expanduser("~"), ".develocity_cache") self.cache_file = os.path.join(cache_dir, "build_cache.pkl") os.makedirs(cache_dir, exist_ok=True) - + def get_cache(self) -> Optional[BuildCache]: try: if os.path.exists(self.cache_file): @@ -113,7 +113,7 @@ def get_cache(self) -> Optional[BuildCache]: except Exception as e: logger.warning(f"Failed to load local cache: {e}") return None - + def save_cache(self, cache: BuildCache): try: with open(self.cache_file, 'wb') as f: @@ -124,16 +124,16 @@ def save_cache(self, cache: BuildCache): class GitHubActionsCacheProvider(CacheProvider): def __init__(self): self.cache_key = "develocity-build-cache" - + def get_cache(self) -> Optional[BuildCache]: try: # Check if running in GitHub Actions if not os.environ.get('GITHUB_ACTIONS'): return None - + cache_path = os.environ.get('GITHUB_WORKSPACE', '') cache_file = os.path.join(cache_path, self.cache_key + '.json') - + if os.path.exists(cache_file): with open(cache_file, 'r') as f: data = json.load(f) @@ -141,15 +141,15 @@ def get_cache(self) -> Optional[BuildCache]: except Exception as e: logger.warning(f"Failed to load GitHub Actions cache: {e}") return None - + def save_cache(self, cache: BuildCache): try: if not os.environ.get('GITHUB_ACTIONS'): return - + cache_path = os.environ.get('GITHUB_WORKSPACE', '') cache_file = os.path.join(cache_path, self.cache_key + '.json') - + with open(cache_file, 'w') as f: json.dump(cache.to_dict(), f) except Exception as e: @@ -165,7 +165,7 @@ def __init__(self, base_url: str, auth_token: str): self.default_chunk_size = timedelta(days=14) self.api_retry_delay = 2 # seconds self.max_api_retries = 3 - + # Initialize cache providers self.cache_providers = [ GitHubActionsCacheProvider(), @@ -173,7 +173,7 @@ def __init__(self, base_url: str, auth_token: str): ] self.build_cache = None self._load_cache() - + def _load_cache(self): """Load cache from the first available provider""" for provider in self.cache_providers: @@ -183,7 +183,7 @@ def _load_cache(self): logger.info(f"Loaded cache from {provider.__class__.__name__}") return logger.info("No existing cache found") - + def _save_cache(self): """Save cache to all providers""" if self.build_cache: @@ -200,13 +200,13 @@ def build_query( ) -> str: """ Constructs the query string to be used in both build info and test containers API calls. - + Args: project: The project name. chunk_start: The start datetime for the chunk. chunk_end: The end datetime for the chunk. test_tags: A list of tags to include. - + Returns: A formatted query string. """ @@ -222,7 +222,7 @@ def build_query( tags = " ".join(tags) return f"project:{project} buildStartTime:[{chunk_start.isoformat()} TO {chunk_end.isoformat()}] gradle.requestedTasks:test {tags}" - + def process_chunk( self, chunk_start: datetime, @@ -234,10 +234,10 @@ def process_chunk( ) -> Dict[str, BuildInfo]: """Helper method to process a single chunk of build information""" chunk_builds = {} - + # Use the helper method to build the query query = self.build_query(project, chunk_start, chunk_end, test_tags) - + # Initialize pagination for this chunk from_build = None continue_chunk = True @@ -251,10 +251,10 @@ def process_chunk( 'reverse': 'false', 'fromInstant': int(chunk_start.timestamp() * 1000) } - + if from_build: query_params['fromBuild'] = from_build - + for attempt in range(self.max_api_retries): try: response = requests.get( @@ -273,23 +273,23 @@ def process_chunk( raise response_json = response.json() - + if not response_json: break - + for build in response_json: build_id = build['id'] - + if 'models' in build and 'gradleAttributes' in build['models']: gradle_attrs = build['models']['gradleAttributes'] if 'model' in gradle_attrs: attrs = gradle_attrs['model'] build_timestamp = datetime.fromtimestamp(attrs['buildStartTime'] / 1000, pytz.UTC) - + if build_timestamp >= chunk_end: continue_chunk = False break - + if remaining_build_ids is None or build_id in remaining_build_ids: if 'problem' not in gradle_attrs: chunk_builds[build_id] = BuildInfo( @@ -300,14 +300,14 @@ def process_chunk( ) if remaining_build_ids is not None: remaining_build_ids.remove(build_id) - + if continue_chunk and response_json: from_build = response_json[-1]['id'] else: continue_chunk = False - + time.sleep(0.5) # Rate limiting between pagination requests - + return chunk_builds def get_build_info( @@ -323,15 +323,15 @@ def get_build_info( max_builds_per_request = 100 cutoff_date = datetime.now(pytz.UTC) - timedelta(days=query_days) current_time = datetime.now(pytz.UTC) - + if not fetch_all and not build_ids: raise ValueError(f"Either build_ids must be provided or fetch_all must be True: {build_ids} {fetch_all}") - + # Get builds from cache if available and bypass_cache is False if not bypass_cache and self.build_cache: cached_builds = self.build_cache.builds cached_cutoff = self.build_cache.last_update - timedelta(days=query_days) - + if fetch_all: # Use all cached builds within the time period for build_id, build in cached_builds.items(): @@ -344,19 +344,19 @@ def get_build_info( build = cached_builds[build_id] if build.timestamp >= cached_cutoff: builds[build_id] = build - + # Update cutoff date to only fetch new data cutoff_date = self.build_cache.last_update logger.info(f"Using cached data up to {cutoff_date.isoformat()}") - + if not fetch_all: # Remove already found builds from the search list build_ids = [bid for bid in build_ids if bid not in builds] - + if not build_ids: logger.info("All builds found in cache") return builds - + # Fetch remaining builds from API remaining_build_ids = set(build_ids) if not fetch_all else None chunk_size = self.default_chunk_size @@ -375,10 +375,10 @@ def get_build_info( with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: future_to_chunk = { executor.submit( - self.process_chunk, - chunk[0], - chunk[1], - project, + self.process_chunk, + chunk[0], + chunk[1], + project, test_tags, remaining_build_ids.copy() if remaining_build_ids else None, max_builds_per_request @@ -401,7 +401,7 @@ def get_build_info( f"\n Builds Retrieved: {len(builds)}" f"\n Builds Not Found: {len(remaining_build_ids) if remaining_build_ids else 0}" ) - + # Update cache with new data if not bypassing cache if builds and not bypass_cache: if not self.build_cache: @@ -409,7 +409,7 @@ def get_build_info( self.build_cache.builds.update(builds) self.build_cache.last_update = current_time self._save_cache() - + return builds def get_test_results( @@ -425,24 +425,24 @@ def get_test_results( outcomes = ["failed", "flaky"] logger.debug(f"Fetching test results for project {project}, last {threshold_days} days") - + end_time = datetime.now(pytz.UTC) start_time = end_time - timedelta(days=threshold_days) - + all_results = {} build_ids = set() test_container_results = defaultdict(list) - + chunk_size = self.default_chunk_size chunk_start = start_time - + while chunk_start < end_time: chunk_end = min(chunk_start + chunk_size, end_time) logger.debug(f"Processing chunk: {chunk_start} to {chunk_end}") - + # Use the helper method to build the query query = self.build_query(project, chunk_start, chunk_end, test_tags) - + query_params = { 'query': query, 'testOutcomes': outcomes, @@ -456,22 +456,22 @@ def get_test_results( params=query_params ) response.raise_for_status() - + for test in response.json()['content']: test_name = test['name'] logger.debug(f"Processing test: {test_name}") - + if test_name not in all_results: outcome_data = test['outcomeDistribution'] if 'notSelected' in outcome_data: outcome_data['not_selected'] = outcome_data.pop('notSelected') outcome = TestOutcome(**outcome_data) all_results[test_name] = TestResult(test_name, outcome, chunk_start) - + # Collect build IDs by outcome if 'buildScanIdsByOutcome' in test: scan_ids = test['buildScanIdsByOutcome'] - + for outcome, ids in scan_ids.items(): if ids: # Only process if we have IDs for build_id in ids: @@ -479,11 +479,11 @@ def get_test_results( test_container_results[test_name].append( TestContainerResult(build_id=build_id, outcome=outcome) ) - + chunk_start = chunk_end logger.debug(f"Total unique build IDs collected: {len(build_ids)}") - + # Fetch build information using the updated get_build_info method print(build_ids) print(list(build_ids)) @@ -507,7 +507,7 @@ def get_test_results( )) else: logger.warning(f"Build ID {container_result.build_id} not found in builds response") - + # Sort timeline by timestamp result.timeline = sorted(timeline, key=lambda x: x.timestamp) logger.debug(f"Final timeline entries for {test_name}: {len(result.timeline)}") @@ -516,7 +516,7 @@ def get_test_results( logger.debug("Timeline entries:") for entry in timeline: logger.debug(f"Build ID: {entry.build_id}, Timestamp: {entry.timestamp}, Outcome: {entry.outcome}") - + # Calculate recent failure rate recent_cutoff = datetime.now(pytz.UTC) - timedelta(days=30) recent_runs = [t for t in timeline if t.timestamp >= recent_cutoff] @@ -531,30 +531,30 @@ def get_defective_tests(self, results: List[TestResult]) -> Dict[str, TestResult Analyze test results to find defective tests (failed or flaky) """ defective_tests = {} - + for result in results: if result.outcome_distribution.failed > 0 or result.outcome_distribution.flaky > 0: defective_tests[result.name] = result - + return defective_tests def get_long_quarantined_tests(self, results: List[TestResult], quarantine_threshold_days: int = 60) -> Dict[str, TestResult]: """ Find tests that have been quarantined longer than the threshold. These are candidates for removal or rewriting. - + Args: results: List of test results quarantine_threshold_days: Number of days after which a quarantined test should be considered for removal/rewrite """ long_quarantined = {} current_time = datetime.now(pytz.UTC) - + for result in results: days_quarantined = (current_time - result.first_seen).days if days_quarantined >= quarantine_threshold_days: long_quarantined[result.name] = (result, days_quarantined) - + return long_quarantined def get_problematic_quarantined_tests( @@ -568,7 +568,7 @@ def get_problematic_quarantined_tests( problematic_tests = {} current_time = datetime.now(pytz.UTC) chunk_start = current_time - timedelta(days=7) # Last 7 days for test cases - + for result in results: days_quarantined = (current_time - result.first_seen).days if days_quarantined >= quarantine_threshold_days: @@ -576,18 +576,18 @@ def get_problematic_quarantined_tests( if total_runs > 0: problem_runs = result.outcome_distribution.failed + result.outcome_distribution.flaky failure_rate = problem_runs / total_runs - + if failure_rate >= min_failure_rate or result.recent_failure_rate >= recent_failure_threshold: # Get detailed test case information try: test_cases = self.get_test_case_details( - result.name, - "kafka", + result.name, + "kafka", chunk_start, current_time, test_tags=["+trunk", "+flaky"] ) - + problematic_tests[result.name] = { 'container_result': result, 'days_quarantined': days_quarantined, @@ -597,7 +597,7 @@ def get_problematic_quarantined_tests( } except Exception as e: logger.error(f"Error getting test case details for {result.name}: {str(e)}") - + return problematic_tests def get_test_case_details( @@ -610,7 +610,7 @@ def get_test_case_details( ) -> List[TestCaseResult]: """ Fetch detailed test case results for a specific container. - + Args: container_name: Name of the test container project: The project name @@ -620,7 +620,7 @@ def get_test_case_details( """ # Use the helper method to build the query, similar to get_test_results query = self.build_query(project, chunk_start, chunk_end, test_tags) - + query_params = { 'query': query, 'testOutcomes': ['failed', 'flaky'], @@ -636,33 +636,33 @@ def get_test_case_details( params=query_params ) response.raise_for_status() - + test_cases = [] content = response.json().get('content', []) - + # Collect all build IDs first build_ids = set() for test in content: if 'buildScanIdsByOutcome' in test: for outcome_type, ids in test['buildScanIdsByOutcome'].items(): build_ids.update(ids) - + # Get build info for all build IDs builds = self.get_build_info(list(build_ids), project, test_tags, 7) # 7 days for test cases - + for test in content: outcome_data = test['outcomeDistribution'] if 'notSelected' in outcome_data: outcome_data['not_selected'] = outcome_data.pop('notSelected') outcome = TestOutcome(**outcome_data) - + test_case = TestCaseResult( name=test['name'], outcome_distribution=outcome, first_seen=chunk_start, container_name=container_name ) - + # Add build information with proper timestamps if 'buildScanIdsByOutcome' in test: for outcome_type, build_ids in test['buildScanIdsByOutcome'].items(): @@ -678,22 +678,22 @@ def get_test_case_details( ) else: logger.warning(f"Build ID {build_id} not found for test case {test['name']}") - + # Sort timeline by timestamp test_case.timeline.sort(key=lambda x: x.timestamp) test_cases.append(test_case) - + return test_cases - + except requests.exceptions.RequestException as e: logger.error(f"Error fetching test case details for {container_name}: {str(e)}") raise - def get_flaky_test_regressions(self, project: str, results: List[TestResult], + def get_flaky_test_regressions(self, project: str, results: List[TestResult], recent_days: int = 7, min_flaky_rate: float = 0.2) -> Dict[str, Dict]: """ Identify tests that have recently started showing flaky behavior. - + Args: project: The project name results: List of test results @@ -703,28 +703,28 @@ def get_flaky_test_regressions(self, project: str, results: List[TestResult], flaky_regressions = {} current_time = datetime.now(pytz.UTC) recent_cutoff = current_time - timedelta(days=recent_days) - + for result in results: # Skip tests with no timeline data if not result.timeline: continue - + # Split timeline into recent and historical periods recent_entries = [t for t in result.timeline if t.timestamp >= recent_cutoff] historical_entries = [t for t in result.timeline if t.timestamp < recent_cutoff] - + if not recent_entries or not historical_entries: continue - + # Calculate flaky rates recent_flaky = sum(1 for t in recent_entries if t.outcome == 'flaky') recent_total = len(recent_entries) recent_flaky_rate = recent_flaky / recent_total if recent_total > 0 else 0 - + historical_flaky = sum(1 for t in historical_entries if t.outcome == 'flaky') historical_total = len(historical_entries) historical_flaky_rate = historical_flaky / historical_total if historical_total > 0 else 0 - + # Check if there's a significant increase in flakiness if recent_flaky_rate >= min_flaky_rate and recent_flaky_rate > historical_flaky_rate * 1.5: flaky_regressions[result.name] = { @@ -734,14 +734,14 @@ def get_flaky_test_regressions(self, project: str, results: List[TestResult], 'recent_executions': recent_entries, 'historical_executions': historical_entries } - + return flaky_regressions - def get_cleared_tests(self, project: str, results: List[TestResult], + def get_cleared_tests(self, project: str, results: List[TestResult], success_threshold: float = 0.7, min_executions: int = 5) -> Dict[str, Dict]: """ Identify quarantined tests that are consistently passing and could be cleared. - + Args: project: The project name results: List of test results @@ -751,24 +751,24 @@ def get_cleared_tests(self, project: str, results: List[TestResult], cleared_tests = {} current_time = datetime.now(pytz.UTC) chunk_start = current_time - timedelta(days=7) # Last 7 days for test cases - + for result in results: # Only consider tests with sufficient recent executions recent_executions = result.timeline if len(recent_executions) < min_executions: continue - + # Calculate success rate at class level - successful_runs = sum(1 for t in recent_executions + successful_runs = sum(1 for t in recent_executions if t.outcome == 'passed') success_rate = successful_runs / len(recent_executions) - + # Check if the test meets clearing criteria at class level if success_rate >= success_threshold: # Verify no recent failures or flaky behavior - has_recent_issues = any(t.outcome in ['failed', 'flaky'] + has_recent_issues = any(t.outcome in ['failed', 'flaky'] for t in recent_executions[-min_executions:]) - + if not has_recent_issues: try: # Get test case details @@ -779,32 +779,32 @@ def get_cleared_tests(self, project: str, results: List[TestResult], current_time, test_tags=["+trunk", "+flaky"] ) - + # Only include if all test cases are also passing consistently all_cases_passing = True passing_test_cases = [] - + for test_case in test_cases: case_total = test_case.outcome_distribution.total if case_total >= min_executions: case_success_rate = test_case.outcome_distribution.passed / case_total - + # Check recent executions for the test case recent_case_issues = any(t.outcome in ['failed', 'flaky'] for t in test_case.timeline[-min_executions:]) - + if case_success_rate >= success_threshold and not recent_case_issues: passing_test_cases.append({ 'name': test_case.name, 'success_rate': case_success_rate, 'total_executions': case_total, - 'recent_executions': sorted(test_case.timeline, + 'recent_executions': sorted(test_case.timeline, key=lambda x: x.timestamp)[-min_executions:] }) else: all_cases_passing = False break - + if all_cases_passing and passing_test_cases: cleared_tests[result.name] = { 'result': result, @@ -814,35 +814,35 @@ def get_cleared_tests(self, project: str, results: List[TestResult], 'recent_executions': recent_executions[-min_executions:], 'test_cases': passing_test_cases } - + except Exception as e: logger.error(f"Error getting test case details for {result.name}: {str(e)}") - + return cleared_tests def update_cache(self, builds: Dict[str, BuildInfo]): """ Update the build cache with new build information. - + Args: builds: Dictionary of build IDs to BuildInfo objects """ current_time = datetime.now(pytz.UTC) - + # Initialize cache if it doesn't exist if not self.build_cache: self.build_cache = BuildCache(current_time, {}) - + # Update builds and last update time self.build_cache.builds.update(builds) self.build_cache.last_update = current_time - + # Save to all cache providers self._save_cache() - + logger.info(f"Updated cache with {len(builds)} builds") - def get_persistent_failing_tests(self, results: List[TestResult], + def get_persistent_failing_tests(self, results: List[TestResult], min_failure_rate: float = 0.2, min_executions: int = 5) -> Dict[str, Dict]: """ @@ -852,7 +852,7 @@ def get_persistent_failing_tests(self, results: List[TestResult], persistent_failures = {} current_time = datetime.now(pytz.UTC) chunk_start = current_time - timedelta(days=7) # Last 7 days for test cases - + # Group results by class class_groups = {} for result in results: @@ -860,18 +860,18 @@ def get_persistent_failing_tests(self, results: List[TestResult], if class_name not in class_groups: class_groups[class_name] = [] class_groups[class_name].append(result) - + # Analyze each class and its test cases for class_name, class_results in class_groups.items(): class_total = sum(r.outcome_distribution.total for r in class_results) - class_problems = sum(r.outcome_distribution.failed + r.outcome_distribution.flaky + class_problems = sum(r.outcome_distribution.failed + r.outcome_distribution.flaky for r in class_results) - + if class_total < min_executions: continue - + class_failure_rate = class_problems / class_total if class_total > 0 else 0 - + # Only include if class has significant failures if class_failure_rate >= min_failure_rate: try: @@ -883,15 +883,15 @@ def get_persistent_failing_tests(self, results: List[TestResult], current_time, test_tags=["+trunk", "-flaky"] ) - + failing_test_cases = {} for test_case in test_cases: total_runs = test_case.outcome_distribution.total if total_runs >= min_executions: - problem_runs = (test_case.outcome_distribution.failed + + problem_runs = (test_case.outcome_distribution.failed + test_case.outcome_distribution.flaky) failure_rate = problem_runs / total_runs if total_runs > 0 else 0 - + if failure_rate >= min_failure_rate: # Extract just the method name method_name = test_case.name.split('.')[-1] @@ -902,7 +902,7 @@ def get_persistent_failing_tests(self, results: List[TestResult], 'failed_executions': problem_runs, 'timeline': sorted(test_case.timeline, key=lambda x: x.timestamp) } - + if failing_test_cases: # Only include classes that have problematic test cases persistent_failures[class_name] = { 'failure_rate': class_failure_rate, @@ -910,16 +910,16 @@ def get_persistent_failing_tests(self, results: List[TestResult], 'failed_executions': class_problems, 'test_cases': failing_test_cases } - + except Exception as e: logger.error(f"Error getting test case details for {class_name}: {str(e)}") - + return persistent_failures def get_develocity_class_link(class_name: str, threshold_days: int) -> str: """ Generate Develocity link for a test class - + Args: class_name: Name of the test class threshold_days: Number of days to look back in search @@ -933,24 +933,24 @@ def get_develocity_class_link(class_name: str, threshold_days: int) -> str: "tests.container": class_name, "search.tasks": "test" } - + return f"{base_url}?{'&'.join(f'{k}={requests.utils.quote(str(v))}' for k, v in params.items())}" def get_develocity_method_link(class_name: str, method_name: str, threshold_days: int) -> str: """ Generate Develocity link for a test method - + Args: class_name: Name of the test class method_name: Name of the test method threshold_days: Number of days to look back in search """ base_url = "https://develocity.apache.org/scans/tests" - + # Extract just the method name without the class prefix if '.' in method_name: method_name = method_name.split('.')[-1] - + params = { "search.rootProjectNames": "kafka", "search.tags": "github,trunk", @@ -960,7 +960,7 @@ def get_develocity_method_link(class_name: str, method_name: str, threshold_days "tests.test": method_name, "search.tasks": "test" } - + return f"{base_url}?{'&'.join(f'{k}={requests.utils.quote(str(v))}' for k, v in params.items())}" def print_most_problematic_tests(problematic_tests: Dict[str, Dict], threshold_days: int): @@ -969,21 +969,21 @@ def print_most_problematic_tests(problematic_tests: Dict[str, Dict], threshold_d if not problematic_tests: print("No high-priority problematic tests found.") return - + print(f"Found {len(problematic_tests)} tests that have been quarantined for {threshold_days} days and are still failing frequently.") - + # Print table with class and method information print("\n") print("") - - for test_name, details in sorted(problematic_tests.items(), + + for test_name, details in sorted(problematic_tests.items(), key=lambda x: x[1]['failure_rate'], reverse=True): class_link = get_develocity_class_link(test_name, threshold_days) print(f"") - + for test_case in sorted(details['test_cases'], - key=lambda x: (x.outcome_distribution.failed + x.outcome_distribution.flaky) / x.outcome_distribution.total + key=lambda x: (x.outcome_distribution.failed + x.outcome_distribution.flaky) / x.outcome_distribution.total if x.outcome_distribution.total > 0 else 0, reverse=True): method_name = test_case.name.split('.')[-1] @@ -995,11 +995,11 @@ def print_most_problematic_tests(problematic_tests: Dict[str, Dict], threshold_d f"" f"") print("
ClassTest CaseFailure RateBuild ScansLink
{test_name}↗️
{failure_rate:.2%}{total_runs}↗️
") - + # Print detailed execution history print("\n
") print("Detailed Execution History\n") - + for test_name, details in sorted(problematic_tests.items(), key=lambda x: x[1]['failure_rate'], reverse=True): @@ -1010,9 +1010,9 @@ def print_most_problematic_tests(problematic_tests: Dict[str, Dict], threshold_d print(f"* Build Outcomes: Passed: {details['container_result'].outcome_distribution.passed} | " f"Failed: {details['container_result'].outcome_distribution.failed} | " f"Flaky: {details['container_result'].outcome_distribution.flaky}") - + for test_method in sorted(details['test_cases'], - key=lambda x: (x.outcome_distribution.failed + x.outcome_distribution.flaky) / x.outcome_distribution.total + key=lambda x: (x.outcome_distribution.failed + x.outcome_distribution.flaky) / x.outcome_distribution.total if x.outcome_distribution.total > 0 else 0, reverse=True): if test_method.timeline: @@ -1025,7 +1025,7 @@ def print_most_problematic_tests(problematic_tests: Dict[str, Dict], threshold_d date_str = entry.timestamp.strftime('%Y-%m-%d %H:%M') print(f"{date_str:<17} {entry.outcome:<10} {entry.build_id}") print("```") - + print("
") def print_flaky_regressions(flaky_regressions: Dict[str, Dict], threshold_days: int): @@ -1034,31 +1034,31 @@ def print_flaky_regressions(flaky_regressions: Dict[str, Dict], threshold_days: if not flaky_regressions: print("No flaky test regressions found.") return - + print(f"Found {len(flaky_regressions)} tests that have started showing increased flaky behavior recently.") - + # Print table with test details print("\n") print("") - + for test_name, details in flaky_regressions.items(): class_link = get_develocity_class_link(test_name, threshold_days) print(f"") print(f"" f"" f"") - + # Add recent execution details in sub-rows print("") for entry in sorted(details['recent_executions'], key=lambda x: x.timestamp, reverse=True)[:5]: date_str = entry.timestamp.strftime('%Y-%m-%d %H:%M') print(f"") print("
Test ClassRecent Flaky RateHistorical RateRecent ExecutionsLink
{test_name}↗️
{details['recent_flaky_rate']:.2%}{details['historical_flaky_rate']:.2%}{len(details['recent_executions'])}
Recent Executions:
{date_str} - {entry.outcome}
") - + # Print detailed history print("\n
") print("Detailed Execution History\n") - + for test_name, details in sorted(flaky_regressions.items(), key=lambda x: x[1]['recent_flaky_rate'], reverse=True): @@ -1073,7 +1073,7 @@ def print_flaky_regressions(flaky_regressions: Dict[str, Dict], threshold_days: date_str = entry.timestamp.strftime('%Y-%m-%d %H:%M') print(f"{date_str:<17} {entry.outcome:<10} {entry.build_id}") print("```") - + print("
") def print_persistent_failing_tests(persistent_failures: Dict[str, Dict], threshold_days: int): @@ -1082,22 +1082,22 @@ def print_persistent_failing_tests(persistent_failures: Dict[str, Dict], thresho if not persistent_failures: print("No persistently failing tests found.") return - + print(f"Found {len(persistent_failures)} tests that have been consistently failing or flaky.") - + # Print table with test details print("\n") print("") - + for class_name, class_details in sorted(persistent_failures.items(), key=lambda x: x[1]['failure_rate'], reverse=True): class_link = get_develocity_class_link(class_name, threshold_days) - + # Print class row print(f"" f"") - + # Print test case rows for test_name, test_details in sorted(class_details['test_cases'].items(), key=lambda x: x[1]['failure_rate'], @@ -1110,11 +1110,11 @@ def print_persistent_failing_tests(persistent_failures: Dict[str, Dict], thresho f"" f"") print("
Test ClassTest CaseFailure RateTotal RunsFailed/FlakyLink
{class_name}↗️
{test_details['failed_executions']}↗️
") - + # Print detailed history print("\n
") print("Detailed Execution History\n") - + for class_name, class_details in sorted(persistent_failures.items(), key=lambda x: x[1]['failure_rate'], reverse=True): @@ -1122,7 +1122,7 @@ def print_persistent_failing_tests(persistent_failures: Dict[str, Dict], thresho print(f"* Overall Failure Rate: {class_details['failure_rate']:.2%}") print(f"* Total Executions: {class_details['total_executions']}") print(f"* Failed/Flaky Executions: {class_details['failed_executions']}") - + for test_name, test_details in sorted(class_details['test_cases'].items(), key=lambda x: x[1]['failure_rate'], reverse=True): @@ -1134,7 +1134,7 @@ def print_persistent_failing_tests(persistent_failures: Dict[str, Dict], thresho date_str = entry.timestamp.strftime('%Y-%m-%d %H:%M') print(f"{date_str:<17} {entry.outcome:<10} {entry.build_id}") print("```") - + print("
") def print_cleared_tests(cleared_tests: Dict[str, Dict], threshold_days: int): @@ -1143,17 +1143,17 @@ def print_cleared_tests(cleared_tests: Dict[str, Dict], threshold_days: int): if not cleared_tests: print("No tests ready to be cleared from quarantine.") return - + # Calculate total number of test methods total_methods = sum(len(details['test_cases']) for details in cleared_tests.values()) - + print(f"Found {len(cleared_tests)} test classes with {total_methods} test methods that have been consistently passing. " f"These tests could be candidates for removing quarantine annotations at either class or method level.") - + # Print table with class and method information print("\n") print("") - + for test_name, details in sorted(cleared_tests.items(), key=lambda x: x[1]['success_rate'], reverse=True): @@ -1163,14 +1163,14 @@ def print_cleared_tests(cleared_tests: Dict[str, Dict], threshold_days: int): f"" f"" f"") - + for test_case in details['test_cases']: method_name = test_case['name'].split('.')[-1] method_link = get_develocity_method_link(test_name, test_case['name'], threshold_days) recent_status = "N/A" if test_case['recent_executions']: recent_status = test_case['recent_executions'][-1].outcome - + print(f"" f"" f"" @@ -1178,11 +1178,11 @@ def print_cleared_tests(cleared_tests: Dict[str, Dict], threshold_days: int): f"") print("") print("
Test ClassTest MethodSuccess RateTotal RunsRecent StatusLink
{details['success_rate']:.2%}{details['total_executions']}{details['successful_runs']} passed
{method_name}{test_case['success_rate']:.2%}{test_case['total_executions']}↗️
 
") - + # Print detailed history print("\n
") print("Detailed Test Method History\n") - + for test_name, details in sorted(cleared_tests.items(), key=lambda x: x[1]['success_rate'], reverse=True): @@ -1190,7 +1190,7 @@ def print_cleared_tests(cleared_tests: Dict[str, Dict], threshold_days: int): print(f"* Overall Success Rate: {details['success_rate']:.2%}") print(f"* Total Executions: {details['total_executions']}") print(f"* Consecutive Successful Runs: {details['successful_runs']}") - + for test_case in details['test_cases']: method_name = test_case['name'].split('.')[-1] print(f"\n#### {method_name}") @@ -1204,7 +1204,7 @@ def print_cleared_tests(cleared_tests: Dict[str, Dict], threshold_days: int): date_str = entry.timestamp.strftime('%Y-%m-%d %H:%M') print(f"{date_str:<17} {entry.outcome:<10} {entry.build_id}") print("```") - + print("
") def main(): @@ -1225,7 +1225,7 @@ def main(): MIN_FLAKY_RATE = 0.2 # For flaky regressions analyzer = TestAnalyzer(BASE_URL, token) - + try: quarantined_builds = analyzer.get_build_info([], PROJECT, "quarantinedTest", 7, bypass_cache=True, fetch_all=True) regular_builds = analyzer.get_build_info([], PROJECT, "test", 7, bypass_cache=True, fetch_all=True) @@ -1235,49 +1235,49 @@ def main(): # Get test results quarantined_results = analyzer.get_test_results( - PROJECT, + PROJECT, threshold_days=QUARANTINE_THRESHOLD_DAYS, test_tags=["+trunk", "+flaky", "-new"] ) - + regular_results = analyzer.get_test_results( PROJECT, threshold_days=7, # Last 7 days for regular tests test_tags=["+trunk", "-flaky", "-new"] ) - + # Generate reports problematic_tests = analyzer.get_problematic_quarantined_tests( - quarantined_results, + quarantined_results, QUARANTINE_THRESHOLD_DAYS, MIN_FAILURE_RATE, RECENT_FAILURE_THRESHOLD ) - + flaky_regressions = analyzer.get_flaky_test_regressions( PROJECT, regular_results, recent_days=7, min_flaky_rate=MIN_FLAKY_RATE ) - + cleared_tests = analyzer.get_cleared_tests( PROJECT, quarantined_results, success_threshold=SUCCESS_THRESHOLD ) - + # Get persistent failing tests (add after getting regular_results) persistent_failures = analyzer.get_persistent_failing_tests( regular_results, min_failure_rate=0.2, # 20% failure rate threshold min_executions=5 ) - + # Print report header print(f"\n# Flaky Test Report for {datetime.now(pytz.UTC).strftime('%Y-%m-%d')}") print(f"This report was run on {datetime.now(pytz.UTC).strftime('%Y-%m-%d %H:%M:%S')} UTC") - + # Print each section print_most_problematic_tests(problematic_tests, QUARANTINE_THRESHOLD_DAYS) print_flaky_regressions(flaky_regressions, QUARANTINE_THRESHOLD_DAYS) diff --git a/.github/scripts/pr-format.py b/.github/scripts/pr-format.py index d2da5e3e5bff3..f51942eb42dad 100644 --- a/.github/scripts/pr-format.py +++ b/.github/scripts/pr-format.py @@ -107,16 +107,16 @@ def split_paragraphs(text: str): """ This script performs some basic linting of our PR titles and body. The PR number is read from the PR_NUMBER environment variable. Since this script expects to run on a GHA runner, it expects the "gh" tool to be installed. - + The STDOUT from this script is used as the status check message. It should not be too long. Use the logger for any necessary logging. - + Title checks: * Not too short (at least 15 characters) * Not too long (at most 120 characters) * Not truncated (ending with ...) * Starts with "KAFKA-", "MINOR", or "HOTFIX" - + Body checks: * Is not empty * Has "Reviewers:" trailer if the PR is approved diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 0f6cea60a25b9..09a6b9d9dc0e7 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -139,8 +139,9 @@ jobs: # --info: For now, we'll generate lots of logs while setting up the GH Actions # --scan: Publish the build scan. This will only work on PRs from apache/kafka and trunk # --no-scan: For public fork PRs, we won't attempt to publish the scan - run: | - ./gradlew --build-cache --info $SCAN_ARG check releaseTarGz -x test + run: ./gradlew --build-cache --info $SCAN_ARG check releaseTarGz -x test + - name: Sanity Check + run: ./gradlew --build-cache rewriteDryRun - name: Archive check reports if: always() uses: actions/upload-artifact@v4 diff --git a/build.gradle b/build.gradle index 5b6714f882294..7ff7c709c5aa7 100644 --- a/build.gradle +++ b/build.gradle @@ -184,6 +184,8 @@ allprojects { } } } + + apply from: "$rootDir/gradle/spotless.gradle" } configurations.all { @@ -866,14 +868,6 @@ subprojects { skipProjects = [ ":jmh-benchmarks", ":trogdor" ] skipConfigurations = [ "zinc" ] } - apply plugin: 'com.diffplug.spotless' - spotless { - java { - targetExclude('**/generated/**/*.java','**/generated-test/**/*.java') - importOrder('kafka', 'org.apache.kafka', 'com', 'net', 'org', 'java', 'javax', '', '\\#') - removeUnusedImports() - } - } tasks.register("checkJsonLicenseHeader") { group = "Verification" @@ -2882,15 +2876,6 @@ project(':streams:streams-scala') { jar { dependsOn 'copyDependantLibs' } - - apply plugin: 'com.diffplug.spotless' - spotless { - scala { - target '**/*.scala' - scalafmt("$versions.scalafmt").configFile('../../checkstyle/.scalafmt.conf').scalaMajorVersion(versions.baseScala) - licenseHeaderFile '../../checkstyle/java.header', 'package' - } - } } project(':streams:integration-tests') { diff --git a/committer-tools/verify_license.py b/committer-tools/verify_license.py index c8489008cae67..0e32fd421b4d1 100644 --- a/committer-tools/verify_license.py +++ b/committer-tools/verify_license.py @@ -44,7 +44,7 @@ def get_tarball_path(project_dir): if not os.path.isdir(distributions_dir): print("Error: Distributions directory not found:", distributions_dir) sys.exit(1) - + pattern = re.compile(r'^kafka_2\.13-(?!.*docs).+\.tgz$', re.IGNORECASE) candidates = [ os.path.join(distributions_dir, f) @@ -54,7 +54,7 @@ def get_tarball_path(project_dir): if not candidates: print("Error: No tarball matching 'kafka_2.13-*.tgz' found in:", distributions_dir) sys.exit(1) - + tarball_path = max(candidates, key=os.path.getmtime) return tarball_path @@ -91,7 +91,7 @@ def main(): run_gradlew(project_dir) tarball = get_tarball_path(project_dir) print("Tarball created at:", tarball) - + # Extract the tarball into a temporary directory. with tempfile.TemporaryDirectory() as tmp_dir: extract_tarball(tarball, tmp_dir) @@ -101,17 +101,17 @@ def main(): sys.exit(1) extracted = os.path.join(tmp_dir, extracted_dirs[0]) print("Tarball extracted to:", extracted) - + # Locate the LICENSE file and libs directory. license_path = os.path.join(extracted, "LICENSE") libs_dir = os.path.join(extracted, "libs") if not os.path.exists(license_path) or not os.path.exists(libs_dir): print("Error: LICENSE file or libs directory not found in the extracted project.") sys.exit(1) - + with open(license_path, "r", encoding="utf-8") as f: license_text = f.read() - + # Get dependency sets. libs = get_libs_set(libs_dir) license_deps = get_license_deps(license_text) @@ -119,11 +119,11 @@ def main(): print("\nDependencies from libs (extracted from jar names):") for dep in sorted(libs): print(" -", dep) - + print("\nDependencies extracted from LICENSE file:") for dep in sorted(license_deps): print(" -", dep) - + # Compare the sets. missing_in_license = libs - license_deps extra_in_license = license_deps - libs @@ -134,7 +134,7 @@ def main(): print(" -", dep) else: print("\nAll libs from ./libs are present in the LICENSE file.") - + if extra_in_license: print("\nThe following entries are in the LICENSE file but not present in ./libs. These should be removed from the LICENSE-binary file:") for dep in sorted(extra_in_license): diff --git a/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/StaticInitializerThrowsRestExtension.java b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/StaticInitializerThrowsRestExtension.java index 8ef0bdc10e97f..315d51a38966e 100644 --- a/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/StaticInitializerThrowsRestExtension.java +++ b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/StaticInitializerThrowsRestExtension.java @@ -18,14 +18,10 @@ package test.plugins; import java.io.IOException; -import java.util.List; import java.util.Map; -import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.rest.ConnectRestExtension; import org.apache.kafka.connect.rest.ConnectRestExtensionContext; -import org.apache.kafka.connect.sink.SinkConnector; /** * Fake plugin class for testing classloading isolation. diff --git a/connect/runtime/src/test/resources/test-plugins/classpath-converter/org/apache/kafka/connect/converters/ByteArrayConverter.java b/connect/runtime/src/test/resources/test-plugins/classpath-converter/org/apache/kafka/connect/converters/ByteArrayConverter.java index 699d71635a042..8b99ec14bca8f 100644 --- a/connect/runtime/src/test/resources/test-plugins/classpath-converter/org/apache/kafka/connect/converters/ByteArrayConverter.java +++ b/connect/runtime/src/test/resources/test-plugins/classpath-converter/org/apache/kafka/connect/converters/ByteArrayConverter.java @@ -19,17 +19,14 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.connect.components.Versioned; -import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.errors.DataException; -import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.storage.ConverterConfig; import org.apache.kafka.connect.storage.HeaderConverter; import java.nio.ByteBuffer; -import java.util.List; import java.util.Map; public class ByteArrayConverter implements Converter, HeaderConverter, Versioned { diff --git a/connect/runtime/src/test/resources/test-plugins/multiple-plugins-in-jar/test/plugins/ThingTwo.java b/connect/runtime/src/test/resources/test-plugins/multiple-plugins-in-jar/test/plugins/ThingTwo.java index ac265ce982302..5e817da9e73c1 100644 --- a/connect/runtime/src/test/resources/test-plugins/multiple-plugins-in-jar/test/plugins/ThingTwo.java +++ b/connect/runtime/src/test/resources/test-plugins/multiple-plugins-in-jar/test/plugins/ThingTwo.java @@ -21,7 +21,6 @@ import java.util.Map; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; -import org.apache.kafka.connect.runtime.isolation.TestPlugins; import org.apache.kafka.connect.storage.Converter; /** diff --git a/connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v1/test/plugins/ReadVersionFromResource.java b/connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v1/test/plugins/ReadVersionFromResource.java index f68b4eb4e581a..2d52468c19dc8 100644 --- a/connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v1/test/plugins/ReadVersionFromResource.java +++ b/connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v1/test/plugins/ReadVersionFromResource.java @@ -23,7 +23,6 @@ import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; import java.util.Map; -import java.util.stream.Collectors; import java.util.ArrayList; import java.util.Enumeration; import java.net.URL; diff --git a/connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v2/test/plugins/ReadVersionFromResource.java b/connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v2/test/plugins/ReadVersionFromResource.java index 863ed9fad97dc..e608e06666c7a 100644 --- a/connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v2/test/plugins/ReadVersionFromResource.java +++ b/connect/runtime/src/test/resources/test-plugins/read-version-from-resource-v2/test/plugins/ReadVersionFromResource.java @@ -23,7 +23,6 @@ import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; import java.util.Map; -import java.util.stream.Collectors; import java.util.ArrayList; import java.util.Enumeration; import java.net.URL; diff --git a/connect/runtime/src/test/resources/test-plugins/sampling-config-provider/test/plugins/SamplingConfigProvider.java b/connect/runtime/src/test/resources/test-plugins/sampling-config-provider/test/plugins/SamplingConfigProvider.java index b0a79a18ef20b..deb9707370712 100644 --- a/connect/runtime/src/test/resources/test-plugins/sampling-config-provider/test/plugins/SamplingConfigProvider.java +++ b/connect/runtime/src/test/resources/test-plugins/sampling-config-provider/test/plugins/SamplingConfigProvider.java @@ -27,11 +27,7 @@ import org.apache.kafka.common.config.provider.ConfigProvider; import org.apache.kafka.common.config.ConfigData; import org.apache.kafka.common.config.ConfigChangeCallback; -import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.data.SchemaAndValue; -import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin; -import org.apache.kafka.connect.storage.HeaderConverter; /** * Fake plugin class for testing classloading isolation. diff --git a/connect/runtime/src/test/resources/test-plugins/sampling-connector/test/plugins/SamplingConnector.java b/connect/runtime/src/test/resources/test-plugins/sampling-connector/test/plugins/SamplingConnector.java index 263b41ef50c88..1b60d6fa52610 100644 --- a/connect/runtime/src/test/resources/test-plugins/sampling-connector/test/plugins/SamplingConnector.java +++ b/connect/runtime/src/test/resources/test-plugins/sampling-connector/test/plugins/SamplingConnector.java @@ -17,7 +17,6 @@ package test.plugins; -import java.io.IOException; import java.util.Collections; import java.util.ArrayList; import java.util.List; diff --git a/connect/runtime/src/test/resources/test-plugins/sampling-header-converter/test/plugins/SamplingHeaderConverter.java b/connect/runtime/src/test/resources/test-plugins/sampling-header-converter/test/plugins/SamplingHeaderConverter.java index 7f6df858f7758..91259a44dc767 100644 --- a/connect/runtime/src/test/resources/test-plugins/sampling-header-converter/test/plugins/SamplingHeaderConverter.java +++ b/connect/runtime/src/test/resources/test-plugins/sampling-header-converter/test/plugins/SamplingHeaderConverter.java @@ -26,7 +26,6 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; -import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin; import org.apache.kafka.connect.storage.HeaderConverter; diff --git a/connect/runtime/src/test/resources/test-plugins/subclass-of-classpath/test/plugins/SubclassOfClasspathOverridePolicy.java b/connect/runtime/src/test/resources/test-plugins/subclass-of-classpath/test/plugins/SubclassOfClasspathOverridePolicy.java index 935aa7bee60b8..3378d06a29192 100644 --- a/connect/runtime/src/test/resources/test-plugins/subclass-of-classpath/test/plugins/SubclassOfClasspathOverridePolicy.java +++ b/connect/runtime/src/test/resources/test-plugins/subclass-of-classpath/test/plugins/SubclassOfClasspathOverridePolicy.java @@ -18,7 +18,6 @@ package test.plugins; import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy; -import org.apache.kafka.connect.converters.ByteArrayConverter; /** * Fake plugin class for testing classloading isolation. diff --git a/connect/runtime/src/test/resources/test-plugins/versioned-predicate/test/plugins/VersionedPredicate.java b/connect/runtime/src/test/resources/test-plugins/versioned-predicate/test/plugins/VersionedPredicate.java index 2e92c79c3517d..2b36647c90b8c 100644 --- a/connect/runtime/src/test/resources/test-plugins/versioned-predicate/test/plugins/VersionedPredicate.java +++ b/connect/runtime/src/test/resources/test-plugins/versioned-predicate/test/plugins/VersionedPredicate.java @@ -18,7 +18,6 @@ package test.plugins; import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.connect.components.Versioned; import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.transforms.predicates.Predicate; diff --git a/connect/runtime/src/test/resources/test-plugins/versioned-transformation/test/plugins/VersionedTransformation.java b/connect/runtime/src/test/resources/test-plugins/versioned-transformation/test/plugins/VersionedTransformation.java index 0422834d027cb..f5549f510e57d 100644 --- a/connect/runtime/src/test/resources/test-plugins/versioned-transformation/test/plugins/VersionedTransformation.java +++ b/connect/runtime/src/test/resources/test-plugins/versioned-transformation/test/plugins/VersionedTransformation.java @@ -18,7 +18,6 @@ package test.plugins; import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.connect.components.Versioned; import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.transforms.Transformation; diff --git a/docker/docker_build_test.py b/docker/docker_build_test.py index 9a986875fe320..632a9bdaf1960 100755 --- a/docker/docker_build_test.py +++ b/docker/docker_build_test.py @@ -80,6 +80,6 @@ def run_docker_tests(image, tag, kafka_url, image_type): build_docker_image_runner(f"docker build -f $DOCKER_FILE -t {args.image}:{args.tag} --build-arg kafka_url={args.kafka_url} --build-arg build_date={date.today()} --no-cache --progress=plain $DOCKER_DIR", args.image_type) elif args.kafka_archive: build_docker_image_runner(f"docker build -f $DOCKER_FILE -t {args.image}:{args.tag} --build-arg build_date={date.today()} --no-cache --progress=plain $DOCKER_DIR", args.image_type, args.kafka_archive) - + if args.test_only or not (args.build_only or args.test_only): run_docker_tests(args.image, args.tag, args.kafka_url, args.image_type) diff --git a/docker/extract_docker_official_image_artifact.py b/docker/extract_docker_official_image_artifact.py index 2d362eb50db94..31859f381cbd3 100644 --- a/docker/extract_docker_official_image_artifact.py +++ b/docker/extract_docker_official_image_artifact.py @@ -26,10 +26,10 @@ Example command:- extract_docker_official_image_artifact.py --path_to_downloaded_artifact - This command will build an extract the downloaded artifact, and copy the contents to the - docker_official_images directory. If the extracted artifact contents already exist in the + This command will build an extract the downloaded artifact, and copy the contents to the + docker_official_images directory. If the extracted artifact contents already exist in the docker_official_images directory , they will be overwritten, else they will be created. - + """ import os import argparse @@ -49,7 +49,7 @@ def extract_artifact(artifact_path): temp_dir = Path('temp_extracted') try: if temp_dir.exists(): - shutil.rmtree(temp_dir) + shutil.rmtree(temp_dir) temp_dir.mkdir() with zipfile.ZipFile(artifact_path, 'r') as zip_ref: zip_ref.extractall(temp_dir) @@ -62,7 +62,7 @@ def extract_artifact(artifact_path): for image_type_dir in artifact_version_dir.iterdir(): target_image_type_dir = Path(os.path.join(target_version_dir, image_type_dir.name)) if target_image_type_dir.exists(): - shutil.rmtree(target_image_type_dir) + shutil.rmtree(target_image_type_dir) shutil.copytree(image_type_dir, target_image_type_dir) set_executable_permissions(target_image_type_dir) finally: diff --git a/docker/prepare_docker_official_image_source.py b/docker/prepare_docker_official_image_source.py index bbc539b5c4c54..9c2a71f07c192 100644 --- a/docker/prepare_docker_official_image_source.py +++ b/docker/prepare_docker_official_image_source.py @@ -26,8 +26,8 @@ Example command:- prepare_docker_official_image_source.py --image-type --kafka-version - This command will build a directory with the name as housing the hardcoded static Dockerfile and scripts for - the docker official image, as image type (jvm by default), for the kafka version for which the + This command will build a directory with the name as housing the hardcoded static Dockerfile and scripts for + the docker official image, as image type (jvm by default), for the kafka version for which the image is being built. """ diff --git a/docker/test/docker_sanity_test.py b/docker/test/docker_sanity_test.py index d6e648cbe0bf1..e5ac929bb039b 100644 --- a/docker/test/docker_sanity_test.py +++ b/docker/test/docker_sanity_test.py @@ -24,7 +24,7 @@ class DockerSanityTest(unittest.TestCase): IMAGE="apache/kafka" FIXTURES_DIR="." - + def resume_container(self): subprocess.run(["docker", "start", constants.BROKER_CONTAINER]) @@ -42,7 +42,7 @@ def start_compose(self, filename) -> None: self.update_file(filename, "image: {$IMAGE}", f"image: {self.IMAGE}") self.update_file(f"{self.FIXTURES_DIR}/{constants.SSL_CLIENT_CONFIG}", "{$DIR}", self.FIXTURES_DIR) subprocess.run(["docker-compose", "-f", filename, "up", "-d"]) - + def destroy_compose(self, filename) -> None: subprocess.run(["docker-compose", "-f", filename, "down"]) self.update_file(filename, f"image: {self.IMAGE}", "image: {$IMAGE}") @@ -58,24 +58,24 @@ def create_topic(self, topic, topic_config): if topic in output.decode("utf-8"): return True return False - + def produce_message(self, topic, producer_config, key, value): command = ["echo", f'"{key}:{value}"', "|", f"{self.FIXTURES_DIR}/{constants.KAFKA_CONSOLE_PRODUCER}", "--topic", topic, "--reader-property", "'parse.key=true'", "--reader-property", "'key.separator=:'", "--timeout", f"{constants.CLIENT_TIMEOUT}"] command.extend(producer_config) subprocess.run(["bash", "-c", " ".join(command)]) - + def consume_message(self, topic, consumer_config): command = [f"{self.FIXTURES_DIR}/{constants.KAFKA_CONSOLE_CONSUMER}", "--topic", topic, "--formatter-property", "'print.key=true'", "--formatter-property", "'key.separator=:'", "--from-beginning", "--max-messages", "1", "--timeout-ms", f"{constants.CLIENT_TIMEOUT}"] command.extend(consumer_config) message = subprocess.check_output(["bash", "-c", " ".join(command)]) return message.decode("utf-8").strip() - + def get_metrics(self, jmx_tool_config): command = [f"{self.FIXTURES_DIR}/{constants.KAFKA_RUN_CLASS}", constants.JMX_TOOL] command.extend(jmx_tool_config) message = subprocess.check_output(["bash", "-c", " ".join(command)]) return message.decode("utf-8").strip().split() - + def broker_metrics_flow(self): print(f"Running {constants.BROKER_METRICS_TESTS}") errors = [] @@ -102,7 +102,7 @@ def broker_metrics_flow(self): except AssertionError as e: errors.append(constants.BROKER_METRICS_ERROR_PREFIX + str(e)) return errors - + metrics_after_message = self.get_metrics(jmx_tool_config) try: self.assertEqual(len(metrics_before_message), 2) @@ -142,19 +142,19 @@ def ssl_flow(self, ssl_broker_port, test_name, test_error_prefix, topic): self.assertEqual(message, "key:message") except AssertionError as e: errors.append(test_error_prefix + str(e)) - + return errors - + def broker_restart_flow(self): print(f"Running {constants.BROKER_RESTART_TESTS}") errors = [] - + try: self.assertTrue(self.create_topic(constants.BROKER_RESTART_TEST_TOPIC, ["--bootstrap-server", "localhost:9092"])) except AssertionError as e: errors.append(constants.BROKER_RESTART_ERROR_PREFIX + str(e)) return errors - + producer_config = ["--bootstrap-server", "localhost:9092", "--command-property", "client.id=host"] self.produce_message(constants.BROKER_RESTART_TEST_TOPIC, producer_config, "key", "message") @@ -169,7 +169,7 @@ def broker_restart_flow(self): self.assertEqual(message, "key:message") except AssertionError as e: errors.append(constants.BROKER_RESTART_ERROR_PREFIX + str(e)) - + return errors def execute(self): @@ -194,7 +194,7 @@ def execute(self): except Exception as e: print(constants.BROKER_RESTART_ERROR_PREFIX, str(e)) total_errors.append(str(e)) - + self.assertEqual(total_errors, []) class DockerSanityTestCombinedMode(DockerSanityTest): @@ -220,7 +220,7 @@ def run_tests(image, mode, fixtures_dir): test_classes_to_run = [] if mode == "jvm" or mode == "native": test_classes_to_run = [DockerSanityTestCombinedMode, DockerSanityTestIsolatedMode] - + loader = unittest.TestLoader() suites_list = [] for test_class in test_classes_to_run: diff --git a/gradle/spotless.gradle b/gradle/spotless.gradle new file mode 100644 index 0000000000000..29e68e0f6fad8 --- /dev/null +++ b/gradle/spotless.gradle @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +apply plugin: 'com.diffplug.spotless' // fixme - currently broken, as fixing 300+ files. Merge hotfix and activate afterwards in dedicated pull. + +spotless { + java { + target '**/*.java' + targetExclude '**/archetype-resources/**' + // endWithNewline() + removeUnusedImports() + // trimTrailingWhitespace() + } + scala { + // target '**/*.scala' + targetExclude '**/*.scala' // fixme! currently broken, as fixing 300+ files. Merge hotfix and activate afterwards in dedicated pull. + // endWithNewline() + licenseHeaderFile "$rootDir/checkstyle/java.header", 'package' + scalafmt("$versions.scalafmt").configFile("$rootDir/checkstyle/.scalafmt.conf").scalaMajorVersion(versions.baseScala) + // trimTrailingWhitespace() + } + python { + target '**/*.py' + // endWithNewline() + trimTrailingWhitespace() + } +} \ No newline at end of file diff --git a/release/git.py b/release/git.py index 3fe47d721a2f1..a634146884810 100644 --- a/release/git.py +++ b/release/git.py @@ -135,6 +135,6 @@ def push_ref(ref, remote=push_remote_name, **kwargs): cmd(f"Pushing ref {ref} to {remote}", f"git push {remote} {ref}") -def merge_ref(ref, **kwargs): +def merge_ref(ref, **kwargs): __defaults(kwargs) cmd(f"Merging ref {ref}", f"git merge {ref}") diff --git a/tests/kafkatest/benchmarks/core/benchmark_test.py b/tests/kafkatest/benchmarks/core/benchmark_test.py index 139078931d13d..3bca25e27a67b 100644 --- a/tests/kafkatest/benchmarks/core/benchmark_test.py +++ b/tests/kafkatest/benchmarks/core/benchmark_test.py @@ -236,14 +236,14 @@ def test_producer_and_consumer(self, compression_type="none", security_protocol= str(data)] self.logger.info("\n".join(summary)) return data - + @cluster(num_nodes=8) @matrix(security_protocol=['SSL'], interbroker_security_protocol=['PLAINTEXT'], tls_version=['TLSv1.2', 'TLSv1.3'], compression_type=["none", "snappy"], metadata_quorum=[quorum.isolated_kraft], use_share_groups=[True]) - @matrix(security_protocol=['PLAINTEXT'], compression_type=["none", "snappy"], metadata_quorum=[quorum.isolated_kraft], + @matrix(security_protocol=['PLAINTEXT'], compression_type=["none", "snappy"], metadata_quorum=[quorum.isolated_kraft], use_share_groups=[True]) def test_producer_and_share_consumer(self, compression_type="none", security_protocol="PLAINTEXT", tls_version=None, - interbroker_security_protocol=None, client_version=str(DEV_BRANCH), broker_version=str(DEV_BRANCH), + interbroker_security_protocol=None, client_version=str(DEV_BRANCH), broker_version=str(DEV_BRANCH), metadata_quorum=quorum.isolated_kraft, use_share_groups=True): """ Setup: 3 node kafka cluster @@ -342,14 +342,14 @@ def test_consumer_throughput(self, compression_type="none", security_protocol="P self.consumer.group = "test-consumer-group" self.consumer.run() return compute_aggregate_throughput(self.consumer) - + @cluster(num_nodes=8) @matrix(security_protocol=['SSL'], interbroker_security_protocol=['PLAINTEXT'], tls_version=['TLSv1.2', 'TLSv1.3'], compression_type=["none", "snappy"], metadata_quorum=[quorum.isolated_kraft], use_share_groups=[True]) - @matrix(security_protocol=['PLAINTEXT'], compression_type=["none", "snappy"], metadata_quorum=[quorum.isolated_kraft], + @matrix(security_protocol=['PLAINTEXT'], compression_type=["none", "snappy"], metadata_quorum=[quorum.isolated_kraft], use_share_groups=[True]) def test_share_consumer_throughput(self, compression_type="none", security_protocol="PLAINTEXT", tls_version=None, - interbroker_security_protocol=None, num_consumers=1, client_version=str(DEV_BRANCH), + interbroker_security_protocol=None, num_consumers=1, client_version=str(DEV_BRANCH), broker_version=str(DEV_BRANCH), metadata_quorum=quorum.isolated_kraft, use_share_groups=True): """ Consume 1e6 100-byte messages with 1 or more consumers from a topic with 6 partitions diff --git a/tests/kafkatest/services/console_share_consumer.py b/tests/kafkatest/services/console_share_consumer.py index 0761acddcc87a..ce70969ebd7a3 100644 --- a/tests/kafkatest/services/console_share_consumer.py +++ b/tests/kafkatest/services/console_share_consumer.py @@ -25,7 +25,7 @@ from kafkatest.services.kafka.util import fix_opts_for_new_jvm, get_log4j_config_param, get_log4j_config_for_tools """ -The console share consumer is a tool that reads data from Kafka via a share consumer and outputs +The console share consumer is a tool that reads data from Kafka via a share consumer and outputs it to standard output. """ class ConsoleShareConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService): @@ -61,7 +61,7 @@ def __init__(self, context, num_nodes, kafka, topic, group_id="test-share-group" message_validator=None, share_consumer_timeout_ms=None, version=DEV_BRANCH, client_id="console-share-consumer", print_key=False, jmx_object_names=None, jmx_attributes=None, enable_systest_events=False, stop_timeout_sec=35, print_timestamp=False, print_partition=False, - jaas_override_variables=None, kafka_opts_override="", client_prop_file_override="", + jaas_override_variables=None, kafka_opts_override="", client_prop_file_override="", share_consumer_properties={}, log_level="DEBUG"): """ Args: diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index 41227716b9fb2..c6a3fadd8f9a3 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -294,7 +294,7 @@ def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAI use_share_groups = context.injected_args.get(arg_name) if use_share_groups is None: use_share_groups = context.globals.get(arg_name) - + # Assign the determined value. self.use_transactions_v2 = use_transactions_v2 self.use_share_groups = use_share_groups @@ -470,7 +470,7 @@ def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAI def node_id_as_isolated_controller(self, node): """ - Generates the node id for a controller-only node, starting from config_property.FIRST_CONTROLLER_ID so as not + Generates the node id for a controller-only node, starting from config_property.FIRST_CONTROLLER_ID so as not to overlap with broker id numbering. This method does not do any validation to check this node is actually part of an isolated controller quorum. """ @@ -1809,7 +1809,7 @@ def describe_consumer_group(self, group, node=None, command_config=None): output += line self.logger.debug(output) return output - + def describe_share_group(self, group, node=None, command_config=None): """ Describe a share group. """ @@ -1835,7 +1835,7 @@ def describe_share_group(self, group, node=None, command_config=None): output += line self.logger.debug(output) return output - + def describe_share_group_members(self, group, node=None, command_config=None): """ Describe members of a share group. """ @@ -1853,7 +1853,7 @@ def describe_share_group_members(self, group, node=None, command_config=None): (share_group_script, self.bootstrap_servers(self.security_protocol), command_config, group) - + cmd += " --members" output_lines = [] diff --git a/tests/kafkatest/services/log_compaction_tester.py b/tests/kafkatest/services/log_compaction_tester.py index e932212864ff8..5fe615e2c9991 100644 --- a/tests/kafkatest/services/log_compaction_tester.py +++ b/tests/kafkatest/services/log_compaction_tester.py @@ -65,7 +65,7 @@ def start_cmd(self, node): cmd += self.path.script("kafka-run-class.sh", node) cmd += " %s" % self.java_class_name() cmd += " --bootstrap-server %s --messages 1000000 --sleep 20 --duplicates 10 --percent-deletes 10" % self.kafka.bootstrap_servers(self.security_protocol) - + if 'type' in self.compression_config: cmd += " --compression-type %s" % self.compression_config['type'] if 'level' in self.compression_config: diff --git a/tests/kafkatest/services/security/security_config.py b/tests/kafkatest/services/security/security_config.py index a4f5ffa5dea18..1c1cc92935e96 100644 --- a/tests/kafkatest/services/security/security_config.py +++ b/tests/kafkatest/services/security/security_config.py @@ -154,7 +154,7 @@ def __init__(self, context, security_protocol=None, interbroker_security_protoco kraft_tls=False): """ Initialize the security properties for the node and copy - keystore and truststore to the remote node if the transport protocol + keystore and truststore to the remote node if the transport protocol is SSL. If security_protocol is None, the protocol specified in the template properties file is used. If no protocol is specified in the template properties either, PLAINTEXT is used as default. diff --git a/tests/kafkatest/services/verifiable_consumer.py b/tests/kafkatest/services/verifiable_consumer.py index 6e9716761f604..a7aa8d77cb577 100644 --- a/tests/kafkatest/services/verifiable_consumer.py +++ b/tests/kafkatest/services/verifiable_consumer.py @@ -60,7 +60,7 @@ def handle_shutdown_complete(self, node=None, logger=None): self.state = ConsumerState.Dead self.assignment = [] self.position = {} - self.shutdown_complete = True + self.shutdown_complete = True if node is not None and logger is not None: logger.debug("Shut down %s" % node.account.hostname) @@ -155,7 +155,7 @@ def last_commit(self, tp): class IncrementalAssignmentConsumerEventHandler(ConsumerEventHandler): def __init__(self, node, verify_offsets, idx, **kwargs): super().__init__(node, verify_offsets, idx, **kwargs) - + def handle_partitions_revoked(self, event, node, logger): self.revoked_count += 1 self.state = ConsumerState.Rebalancing @@ -207,10 +207,10 @@ def handle_partitions_revoked(self, event, node, logger): class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, BackgroundThreadService): """This service wraps org.apache.kafka.tools.VerifiableConsumer for use in - system testing. - + system testing. + NOTE: this class should be treated as a PUBLIC API. Downstream users use - this service both directly and through class extension, so care must be + this service both directly and through class extension, so care must be taken to ensure compatibility. """ @@ -349,7 +349,7 @@ def _worker(self, idx, node): def is_eager(self): return self.group_protocol == consumer_group.classic_group_protocol and self.assignment_strategy != "org.apache.kafka.clients.consumer.CooperativeStickyAssignor" - + def _update_global_position(self, consumed_event, node): for consumed_partition in consumed_event["partitions"]: tp = _create_partition_from_dict(consumed_partition) diff --git a/tests/kafkatest/services/verifiable_share_consumer.py b/tests/kafkatest/services/verifiable_share_consumer.py index 016a8be809283..8aa4244d6bdbe 100644 --- a/tests/kafkatest/services/verifiable_share_consumer.py +++ b/tests/kafkatest/services/verifiable_share_consumer.py @@ -107,8 +107,8 @@ class VerifiableShareConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Bac "collect_default": True} } - def __init__(self, context, num_nodes, kafka, topic, group_id, max_messages=-1, - acknowledgement_mode="auto", version=DEV_BRANCH, stop_timeout_sec=60, + def __init__(self, context, num_nodes, kafka, topic, group_id, max_messages=-1, + acknowledgement_mode="auto", version=DEV_BRANCH, stop_timeout_sec=60, log_level="INFO", jaas_override_variables=None, on_record_consumed=None): """ :param jaas_override_variables: A dict of variables to be used in the jaas.conf template file @@ -268,7 +268,7 @@ def clean_node(self, node): def total_consumed(self): with self.lock: return self.total_records_consumed - + def total_unique_consumed(self): with self.lock: return len(self.consumed_records_offsets) @@ -280,11 +280,11 @@ def total_unique_acknowledged(self): def total_acknowledged(self): with self.lock: return self.total_records_acknowledged + self.total_records_acknowledged_failed - + def total_acknowledged_successfully(self): with self.lock: return self.total_records_acknowledged - + def total_failed_acknowledged(self): with self.lock: return self.total_records_acknowledged_failed @@ -296,11 +296,11 @@ def total_consumed_for_a_share_consumer(self, node): def total_acknowledged_for_a_share_consumer(self, node): with self.lock: return self.event_handlers[node].total_acknowledged_successfully + self.event_handlers[node].total_acknowledged_failed - + def total_acknowledged_sucessfully_for_a_share_consumer(self, node): with self.lock: return self.event_handlers[node].total_acknowledged_successfully - + def total_failed_acknowledged_for_a_share_consumer(self, node): with self.lock: return self.event_handlers[node].total_acknowledged_failed diff --git a/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py b/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py index 19a3cdcde52bd..ba17152435ccc 100644 --- a/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py +++ b/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py @@ -54,7 +54,7 @@ def _verify_roundrobin_assignment(self, consumer): def rolling_update_test(self, metadata_quorum=quorum.zk): """ Verify rolling updates of partition assignment strategies works correctly. In this - test, we use a rolling restart to change the group's assignment strategy from "range" + test, we use a rolling restart to change the group's assignment strategy from "range" to "roundrobin." We verify after every restart that all members are still in the group and that the correct assignment strategy was used. """ diff --git a/tests/kafkatest/tests/client/consumer_test.py b/tests/kafkatest/tests/client/consumer_test.py index c28cba1431b67..45aae58eb75f2 100644 --- a/tests/kafkatest/tests/client/consumer_test.py +++ b/tests/kafkatest/tests/client/consumer_test.py @@ -78,11 +78,11 @@ def setup_consumer(self, topic, **kwargs): def await_conflict_consumers_fenced(self, conflict_consumer): # Rely on explicit shutdown_complete events from the verifiable consumer to guarantee each conflict member # reached the fenced path rather than remaining in the default DEAD state prior to startup. - wait_until(lambda: len(conflict_consumer.shutdown_complete_nodes()) == len(conflict_consumer.nodes) and + wait_until(lambda: len(conflict_consumer.shutdown_complete_nodes()) == len(conflict_consumer.nodes) and len(conflict_consumer.dead_nodes()) == len(conflict_consumer.nodes), timeout_sec=60, err_msg="Timed out waiting for conflict consumers to report shutdown completion after fencing") - + @cluster(num_nodes=7) @matrix( metadata_quorum=[quorum.isolated_kraft], @@ -192,9 +192,9 @@ def test_consumer_bounce(self, clean_shutdown, bounce_mode, metadata_quorum=quor ) def test_static_consumer_bounce_with_eager_assignment(self, clean_shutdown, static_membership, bounce_mode, num_bounces, metadata_quorum=quorum.isolated_kraft, group_protocol=None): """ - Verify correct static consumer behavior when the consumers in the group are restarted. In order to make + Verify correct static consumer behavior when the consumers in the group are restarted. In order to make sure the behavior of static members are different from dynamic ones, we take both static and dynamic - membership into this test suite. This test is based on the eager assignment strategy, where all dynamic consumers + membership into this test suite. This test is based on the eager assignment strategy, where all dynamic consumers revoke their partitions when a global rebalance takes place (even if they are not being bounced). The test relies on that eager behaviour when making sure that there is no global rebalance when static members are bounced. @@ -212,7 +212,7 @@ def test_static_consumer_bounce_with_eager_assignment(self, clean_shutdown, stat producer.start() self.await_produced_messages(producer) - consumer = self.setup_consumer(self.TOPIC, static_membership=static_membership, group_protocol=group_protocol, + consumer = self.setup_consumer(self.TOPIC, static_membership=static_membership, group_protocol=group_protocol, assignment_strategy="org.apache.kafka.clients.consumer.RangeAssignor") consumer.start() @@ -228,7 +228,7 @@ def test_static_consumer_bounce_with_eager_assignment(self, clean_shutdown, stat self.rolling_bounce_consumers(consumer, keep_alive=num_keep_alive, num_bounces=num_bounces) num_revokes_after_bounce = consumer.num_revokes_for_alive() - num_revokes_before_bounce - + # under static membership, the live consumer shall not revoke any current running partitions, # since there is no global rebalance being triggered. if static_membership: diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py index 553ab59f254e9..b98a846490bf4 100644 --- a/tests/kafkatest/tests/connect/connect_distributed_test.py +++ b/tests/kafkatest/tests/connect/connect_distributed_test.py @@ -113,7 +113,7 @@ def _start_connector(self, config_file, extra_config={}): connector_config = dict([line.strip().split('=', 1) for line in connector_props.split('\n') if line.strip() and not line.strip().startswith('#')]) connector_config.update(extra_config) self.cc.create_connector(connector_config) - + def _connector_status(self, connector, node=None): try: return self.cc.get_connector_status(connector, node) @@ -195,7 +195,7 @@ def test_restart_failed_connector(self, exactly_once_source, connect_protocol, m err_msg="Failed to see connector transition to the FAILED state") self.cc.restart_connector(self.connector.name) - + wait_until(lambda: self.connector_is_running(self.connector), timeout_sec=10, err_msg="Failed to see connector transition to the RUNNING state") @@ -218,7 +218,7 @@ def test_restart_failed_task(self, connector_type, connect_protocol, metadata_qu connector = MockSink(self.cc, self.topics.keys(), mode='task-failure', delay_sec=5, consumer_group_protocol=group_protocol) else: connector = MockSource(self.cc, mode='task-failure', delay_sec=5) - + connector.start() task_id = 0 @@ -226,7 +226,7 @@ def test_restart_failed_task(self, connector_type, connect_protocol, metadata_qu err_msg="Failed to see task transition to the FAILED state") self.cc.restart_task(connector.name, task_id) - + wait_until(lambda: self.task_is_running(connector, task_id), timeout_sec=10, err_msg="Failed to see task transition to the RUNNING state") @@ -306,7 +306,7 @@ def test_pause_and_resume_source(self, exactly_once_source, connect_protocol, me wait_until(lambda: self.is_running(self.source), timeout_sec=30, err_msg="Failed to see connector transition to the RUNNING state") - + self.cc.pause_connector(self.source.name) # wait until all nodes report the paused transition @@ -358,7 +358,7 @@ def test_pause_and_resume_sink(self, connect_protocol, metadata_quorum, group_pr wait_until(lambda: self.is_running(self.sink), timeout_sec=30, err_msg="Failed to see connector transition to the RUNNING state") - + self.cc.pause_connector(self.sink.name) # wait until all nodes report the paused transition @@ -403,7 +403,7 @@ def test_pause_state_persistent(self, exactly_once_source, connect_protocol, met wait_until(lambda: self.is_running(self.source), timeout_sec=30, err_msg="Failed to see connector transition to the RUNNING state") - + self.cc.pause_connector(self.source.name) self.cc.restart() @@ -604,7 +604,7 @@ def _wait_for_loggers(self, level, request_time, namespace, workers=None): @cluster(num_nodes=6) @matrix( security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL], - exactly_once_source=[True, False], + exactly_once_source=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'], metadata_quorum=[quorum.isolated_kraft], group_protocol=consumer_group.all_group_protocols @@ -628,7 +628,7 @@ def test_file_source_and_sink(self, security_protocol, exactly_once_source, conn self._start_connector("connect-file-sink.properties", {"consumer.override.group.protocol" : group_protocol}) else: self._start_connector("connect-file-sink.properties") - + # Generating data on the source node should generate new records and create new output on the sink node. Timeouts # here need to be more generous than they are for standalone mode because a) it takes longer to write configs, # do rebalancing of the group, etc, and b) without explicit leave group support, rebalancing takes awhile @@ -678,8 +678,8 @@ def test_bounce(self, clean, connect_protocol, metadata_quorum, group_protocol=N # Give additional time for the consumer groups to recover. Even if it is not a hard bounce, there are # some cases where a restart can cause a rebalance to take the full length of the session timeout # (e.g. if the client shuts down before it has received the memberId from its initial JoinGroup). - # If we don't give enough time for the group to stabilize, the next bounce may cause consumers to - # be shut down before they have any time to process data and we can end up with zero data making it + # If we don't give enough time for the group to stabilize, the next bounce may cause consumers to + # be shut down before they have any time to process data and we can end up with zero data making it # through the test. time.sleep(15) diff --git a/tests/kafkatest/tests/connect/connect_rest_test.py b/tests/kafkatest/tests/connect/connect_rest_test.py index 0e5c90df0f468..93da0c7314183 100644 --- a/tests/kafkatest/tests/connect/connect_rest_test.py +++ b/tests/kafkatest/tests/connect/connect_rest_test.py @@ -37,13 +37,13 @@ class ConnectRestApiTest(KafkaTest): 'topic', 'file', 'transforms', 'config.action.reload', 'errors.retry.timeout', 'errors.retry.delay.max.ms', 'errors.tolerance', 'errors.log.enable', 'errors.log.include.messages', 'predicates', 'topic.creation.groups', 'exactly.once.support', 'transaction.boundary', 'transaction.boundary.interval.ms', 'offsets.storage.topic', - 'tasks.max.enforce', 'connector.plugin.version', 'key.converter.plugin.version', 'value.converter.plugin.version', + 'tasks.max.enforce', 'connector.plugin.version', 'key.converter.plugin.version', 'value.converter.plugin.version', 'header.converter.plugin.version'} FILE_SINK_CONFIGS = {'name', 'connector.class', 'tasks.max', 'key.converter', 'value.converter', 'header.converter', 'topics', 'file', 'transforms', 'topics.regex', 'config.action.reload', 'errors.retry.timeout', 'errors.retry.delay.max.ms', 'errors.tolerance', 'errors.log.enable', 'errors.log.include.messages', 'errors.deadletterqueue.topic.name', 'errors.deadletterqueue.topic.replication.factor', 'errors.deadletterqueue.context.headers.enable', 'predicates', - 'tasks.max.enforce', 'connector.plugin.version', 'key.converter.plugin.version', 'value.converter.plugin.version', + 'tasks.max.enforce', 'connector.plugin.version', 'key.converter.plugin.version', 'value.converter.plugin.version', 'header.converter.plugin.version'} INPUT_FILE = "/mnt/connect.input" diff --git a/tests/kafkatest/tests/core/network_degrade_test.py b/tests/kafkatest/tests/core/network_degrade_test.py index 1c55d9b7e061c..a7b4262758b7e 100644 --- a/tests/kafkatest/tests/core/network_degrade_test.py +++ b/tests/kafkatest/tests/core/network_degrade_test.py @@ -57,7 +57,7 @@ def test_latency(self, task_name, device_name, latency_ms, rate_limit_kbit, meta quorum0 = self.kafka.controller_quorum.nodes[0] quorum1 = self.kafka.controller_quorum.nodes[1] - + # Capture the ping times from the ping stdout # 64 bytes from ducker01 (172.24.0.2): icmp_seq=1 ttl=64 time=0.325 ms diff --git a/tests/kafkatest/tests/core/replica_scale_test.py b/tests/kafkatest/tests/core/replica_scale_test.py index 025dc7867b433..778273fcf607e 100644 --- a/tests/kafkatest/tests/core/replica_scale_test.py +++ b/tests/kafkatest/tests/core/replica_scale_test.py @@ -49,7 +49,7 @@ def teardown(self): metadata_quorum=[quorum.isolated_kraft], group_protocol=consumer_group.all_group_protocols ) - def test_produce_consume(self, topic_count, partition_count, replication_factor, + def test_produce_consume(self, topic_count, partition_count, replication_factor, metadata_quorum, group_protocol=None): topics_create_start_time = time.time() for i in range(topic_count): diff --git a/tests/kafkatest/tests/end_to_end.py b/tests/kafkatest/tests/end_to_end.py index ed2ecca87fdd3..9dcdc810da19f 100644 --- a/tests/kafkatest/tests/end_to_end.py +++ b/tests/kafkatest/tests/end_to_end.py @@ -41,7 +41,7 @@ def __init__(self, test_context, topic="test_topic", topic_config=DEFAULT_TOPIC_ self.topic_config = topic_config self.records_consumed = [] self.last_consumed_offsets = {} - + def create_zookeeper_if_necessary(self, num_nodes=1, **kwargs): self.zk = ZookeeperService(self.test_context, num_nodes=num_nodes, **kwargs) if quorum.for_test(self.test_context) == quorum.zk else None @@ -71,7 +71,7 @@ def create_consumer(self, num_nodes=1, group_id="test_group", **kwargs): group_id=group_id, on_record_consumed=self.on_record_consumed, **kwargs) - + def create_producer(self, num_nodes=1, throughput=1000, **kwargs): self.producer = VerifiableProducer(self.test_context, @@ -138,7 +138,7 @@ def run_validation(self, min_records=5000, producer_timeout_sec=30, self.await_consumed_offsets(self.producer.last_acked_offsets, consumer_timeout_sec) self.consumer.stop() - + self.validate(enable_idempotence) except BaseException: self._collect_all_logs() diff --git a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py index 7b0ea9becd3ee..62caae23b0c17 100644 --- a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py +++ b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py @@ -38,7 +38,7 @@ def broker_node(test, topic, broker_type): def signal_node(test, node, sig): test.kafka.signal_node(node, sig) - + def clean_shutdown(test, topic, broker_type): """Discover broker node of requested type and shut it down cleanly. """ @@ -110,7 +110,7 @@ def bulk_hard_bounce(test, num_failures): "clean_bounce": bulk_clean_bounce, "hard_bounce": bulk_hard_bounce } - + class StreamsBrokerBounceTest(Test): """ Simple test of Kafka Streams with brokers failing @@ -170,7 +170,7 @@ def confirm_topics_on_all_brokers(self, expected_topic_set): return True - + def setup_system(self, start_processor=True, num_threads=3, group_protocol='classic'): # Setup phase use_streams_groups = True if group_protocol == 'streams' else False @@ -203,7 +203,7 @@ def collect_results(self, sleep_time_secs): self.processor1.stop() node = self.driver.node - + # Success is declared if streams does not crash when sleep time > 0 # It should give an exception when sleep time is 0 since we kill the brokers immediately # and the topic manager cannot create internal topics with the desired replication factor @@ -211,7 +211,7 @@ def collect_results(self, sleep_time_secs): output_streams = self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-EXCEPTION %s" % self.processor1.STDOUT_FILE, allow_fail=False) else: output_streams = self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % self.processor1.STDOUT_FILE, allow_fail=False) - + for line in output_streams: data["Client closed"] = line @@ -223,8 +223,8 @@ def collect_results(self, sleep_time_secs): output = node.account.ssh_capture("grep -E 'SUCCESS|FAILURE' %s" % self.driver.STDOUT_FILE, allow_fail=False) for line in output: data["Logic Success/Failure"] = line - - + + return data @cluster(num_nodes=7) diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py index 8f2f885983670..572d60d5b2659 100644 --- a/tests/kafkatest/tests/streams/streams_upgrade_test.py +++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py @@ -43,9 +43,9 @@ """ -After each release one should first check that the released version has been uploaded to -https://s3-us-west-2.amazonaws.com/kafka-packages/ which is the url used by system test to download jars; -anyone can verify that by calling +After each release one should first check that the released version has been uploaded to +https://s3-us-west-2.amazonaws.com/kafka-packages/ which is the url used by system test to download jars; +anyone can verify that by calling curl https://s3-us-west-2.amazonaws.com/kafka-packages/kafka_$scala_version-$version.tgz to download the jar and if it is not uploaded yet, ping the dev@kafka mailing list to request it being uploaded. @@ -54,19 +54,19 @@ 1. Update all relevant versions in tests/kafkatest/version.py this will include adding a new version for the new release and bumping all relevant already released versions. - -2. Add the new version to the "kafkatest.version" import above and include the version in the - broker_upgrade_versions list above. You'll also need to add the new version to the + +2. Add the new version to the "kafkatest.version" import above and include the version in the + broker_upgrade_versions list above. You'll also need to add the new version to the "StreamsUpgradeTestJobRunnerService" on line 484 to make sure the correct arguments are passed during the system test run. - + 3. Update the vagrant/base.sh file to include all new versions, including the newly released version - and all point releases for existing releases. You only need to list the latest version in + and all point releases for existing releases. You only need to list the latest version in this file. - + 4. Then update all relevant versions in the tests/docker/Dockerfile -5. Add a new upgrade-system-tests-XXXX module under streams. You can probably just copy the +5. Add a new upgrade-system-tests-XXXX module under streams. You can probably just copy the latest system test module from the last release. Just make sure to update the systout print statement in StreamsUpgradeTest to the version for the release. After you add the new module you'll need to update settings.gradle file to include the name of the module you just created @@ -111,7 +111,7 @@ def test_rolling_upgrade_with_2_bounces(self, from_version, metadata_quorum): """ This test verifies that the cluster successfully upgrades despite changes in the metadata and FK join protocols. - + Starts 3 KafkaStreams instances with version and upgrades one-by-one to """ to_version = str(DEV_VERSION) diff --git a/tests/kafkatest/tests/verifiable_share_consumer_test.py b/tests/kafkatest/tests/verifiable_share_consumer_test.py index 37e1c56d11bba..b5bcc05244af7 100644 --- a/tests/kafkatest/tests/verifiable_share_consumer_test.py +++ b/tests/kafkatest/tests/verifiable_share_consumer_test.py @@ -51,7 +51,7 @@ def min_cluster_size(self): def setup_share_group(self, topic, acknowledgement_mode="auto", group_id="test_group_id", **kwargs): return VerifiableShareConsumer(self.test_context, self.num_consumers, self.kafka, - topic, group_id, acknowledgement_mode=acknowledgement_mode, + topic, group_id, acknowledgement_mode=acknowledgement_mode, log_level="TRACE", **kwargs) def setup_producer(self, topic, max_messages=-1, throughput=500): @@ -80,7 +80,7 @@ def await_consumed_messages_by_a_consumer(self, consumer, node, min_messages=1, wait_until(lambda: consumer.total_consumed_for_a_share_consumer(node) >= current_total + min_messages, timeout_sec=timeout_sec, err_msg="Timed out waiting for consumption") - + def await_unique_consumed_messages(self, consumer, min_messages=1, timeout_sec=10): wait_until(lambda: consumer.total_unique_consumed() >= min_messages, timeout_sec=timeout_sec, @@ -90,7 +90,7 @@ def await_acknowledged_messages(self, consumer, min_messages=1, timeout_sec=10): wait_until(lambda: consumer.total_acknowledged() >= min_messages, timeout_sec=timeout_sec, err_msg="Timed out waiting for consumption") - + def await_unique_acknowledged_messages(self, consumer, min_messages=1, timeout_sec=10): wait_until(lambda: consumer.total_unique_acknowledged() >= min_messages, timeout_sec=timeout_sec, diff --git a/tests/kafkatest/utils/util.py b/tests/kafkatest/utils/util.py index 4affe1975a6ca..ddbbbca5ef231 100644 --- a/tests/kafkatest/utils/util.py +++ b/tests/kafkatest/utils/util.py @@ -142,11 +142,11 @@ def validate_delivery(acked, consumed, idempotence_enabled=False, check_lost_dat # Correctness of the set difference operation depends on using equivalent # message_validators in producer and consumer missing = set(acked) - set(consumed) - + # Were all acked messages consumed? if len(missing) > 0: msg = annotate_missing_msgs(missing, acked, consumed, msg) - + # Did we miss anything due to data loss? if check_lost_data: max_truncate_count = 100 if may_truncate_acked_records else 0