Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V24.3.x co debugging #24616

Draft
wants to merge 10 commits into
base: v24.3.x
Choose a base branch
from
2 changes: 1 addition & 1 deletion src/v/cloud_storage_clients/util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ boost::property_tree::ptree iobuf_to_ptree(iobuf&& buf, ss::logger& logger) {
return res;
} catch (...) {
log_buffer_with_rate_limiting("unexpected reply", buf, logger);
vlog(logger.error, "!!parsing error {}", std::current_exception());
vlog(logger.warn, "!!parsing error {}", std::current_exception());
throw;
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ configuration::configuration()
.example = "2147483648",
.visibility = visibility::tunable},
128_MiB,
{.min = 1_MiB})
{.min = 1_KiB})
, log_segment_size_min(
*this,
"log_segment_size_min",
Expand All @@ -51,7 +51,7 @@ configuration::configuration()
{.needs_restart = needs_restart::no,
.example = "16777216",
.visibility = visibility::tunable},
1_MiB)
1_KiB)
, log_segment_size_max(
*this,
"log_segment_size_max",
Expand All @@ -78,7 +78,7 @@ configuration::configuration()
.example = "268435456",
.visibility = visibility::tunable},
256_MiB,
{.min = 1_MiB})
{.min = 1_KiB})
, readers_cache_eviction_timeout_ms(
*this,
"readers_cache_eviction_timeout_ms",
Expand Down
72 changes: 70 additions & 2 deletions tests/rptest/services/redpanda.py
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ def __init__(self,
self.skip_end_of_test_scrubbing = skip_end_of_test_scrubbing

if fast_uploads:
self.cloud_storage_segment_max_upload_interval_sec = 10
self.cloud_storage_segment_max_upload_interval_sec = 1
self.cloud_storage_manifest_max_upload_interval_sec = 1

self._expected_damage_types = set()
Expand Down Expand Up @@ -1321,6 +1321,15 @@ class RedpandaServiceBase(RedpandaServiceABC, Service):
KAFKA_KERBEROS_PORT = 9094
ADMIN_ALTERNATE_PORT = 9647

GLOBAL_USE_STRESS_FIBER = 'enable_stress_fiber'
GLOBAL_NUM_STRESS_FIBERS = 'num_stress_fibers'
GLOBAL_STRESS_FIBER_MIN_MS = 'stress_fiber_min_ms'
GLOBAL_STRESS_FIBER_MAX_MS = 'stress_fiber_max_ms'
DEFAULT_USE_STRESS_FIBER = 'OFF'
DEFAULT_NUM_STRESS_FIBERS = 1
DEFAULT_STRESS_FIBER_MIN_MS = 100
DEFAULT_STRESS_FIBER_MAX_MS = 200

CLUSTER_CONFIG_DEFAULTS = {
'join_retry_timeout_ms': 200,
'default_topic_partitions': 4,
Expand Down Expand Up @@ -1461,6 +1470,31 @@ def set_si_settings(self, si_settings: SISettings):
self._extra_rp_conf = self._si_settings.update_rp_conf(
self._extra_rp_conf)

def use_stress_fiber(self) -> bool:
"""Return true if the test should run with the stress fiber."""
use_stress_fiber = self._context.globals.get(
self.GLOBAL_USE_STRESS_FIBER, self.DEFAULT_USE_STRESS_FIBER)
if use_stress_fiber == "ON":
return True
elif use_stress_fiber == "OFF":
return False

self.logger.warn(
f"{self.GLOBAL_USE_STRESS_FIBER} should be 'ON', or 'OFF'")
return False

def get_stress_fiber_params(self) -> Tuple[int, int, int]:
fibers = int(
self._context.globals.get(self.GLOBAL_NUM_STRESS_FIBERS,
self.DEFAULT_NUM_STRESS_FIBERS))
min_ms = int(
self._context.globals.get(self.GLOBAL_STRESS_FIBER_MIN_MS,
self.DEFAULT_STRESS_FIBER_MIN_MS))
max_ms = int(
self._context.globals.get(self.GLOBAL_STRESS_FIBER_MAX_MS,
self.DEFAULT_STRESS_FIBER_MAX_MS))
return (fibers, min_ms, max_ms)

def add_extra_rp_conf(self, conf):
self._extra_rp_conf = {**self._extra_rp_conf, **conf}

Expand Down Expand Up @@ -2884,6 +2918,22 @@ def start_one(node):
request_timeout_ms=30000,
api_version_auto_timeout_ms=3000)

# Start stress fiber if requested
if self.use_stress_fiber():

def start_stress_fiber(node):
count, min_ms, max_ms = self.get_stress_fiber_params()
self.start_stress_fiber(node, count, min_ms, max_ms)

if first_start:
self.logger.info(
f"Starting stress fiber for {len(to_start)} nodes")
self.for_nodes(to_start, start_stress_fiber)
else:
self.logger.info(
f"Starting stress fiber for {len(self.nodes)} nodes")
self.for_nodes(self.nodes, start_stress_fiber)

def write_crl_file(self, node: ClusterNode, ca: tls.CertificateAuthority):
self.logger.info(
f"Writing Redpanda node tls ca CRL file: {RedpandaService.TLS_CA_CRL_FILE}"
Expand Down Expand Up @@ -3011,6 +3061,14 @@ def check_node(self, node):
# fall through
return True

def start_stress_fiber(self, node, count, min_ms, max_ms):
"""Start stress fiber"""
admin = Admin(self)
admin.stress_fiber_start(node=node,
num_fibers=count,
min_ms_per_scheduling_point=min_ms,
max_ms_per_scheduling_point=max_ms)

def all_up(self):
def check_node(node):
pid = self.redpanda_pid(node)
Expand Down Expand Up @@ -3131,7 +3189,8 @@ def start_node(self,
omit_seeds_on_idx_one: bool = True,
skip_readiness_check: bool = False,
node_id_override: int | None = None,
extra_cli: list[str] = []):
extra_cli: list[str] = [],
with_stress=False):
"""
Start a single instance of redpanda. This function will not return until
redpanda appears to have started successfully. If redpanda does not
Expand Down Expand Up @@ -3191,6 +3250,15 @@ def start_rp():
if not expect_fail:
self._started.add(node)

if with_stress:

def start_stress_fiber(node):
count, min_ms, max_ms = self.get_stress_fiber_params()
self.start_stress_fiber(node, count, min_ms, max_ms)

self.logger.info("Starting stress fiber for node")
start_stress_fiber(node)

def start_node_with_rpk(self, node, additional_args="", clean_node=True):
"""
Start a single instance of redpanda using rpk. similar to start_node,
Expand Down
52 changes: 33 additions & 19 deletions tests/rptest/tests/node_pool_migration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from concurrent.futures import ThreadPoolExecutor
import random
import re
from time import sleep

import requests
from rptest.clients.kafka_cat import KafkaCat
Expand All @@ -25,6 +26,7 @@
from rptest.services.redpanda import RESTART_LOG_ALLOW_LIST, SISettings
from rptest.utils.mode_checks import cleanup_on_early_exit
from rptest.utils.node_operations import NodeDecommissionWaiter
from rptest.utils.node_operations import verify_offset_translator_state_consistent
from enum import Enum

TS_LOG_ALLOW_LIST = [
Expand Down Expand Up @@ -54,7 +56,7 @@ def __init__(self, test_context):

super(NodePoolMigrationTest, self).__init__(
test_context=test_context,
num_brokers=10,
num_brokers=8,
node_prealloc_count=1,
si_settings=SISettings(test_context,
cloud_storage_enable_remote_read=True,
Expand Down Expand Up @@ -185,7 +187,7 @@ def decommissioned():

@property
def msg_size(self):
return 4096
return 1024

@property
def msg_count(self):
Expand All @@ -198,7 +200,7 @@ def producer_throughput(self):

@property
def segment_size(self):
return 1024 * 1024
return 4096

@property
def local_retention_bytes(self):
Expand All @@ -214,7 +216,7 @@ def start_producer(self):
self._topic,
self.msg_size,
self.msg_count,
key_set_cardinality=10000,
key_set_cardinality=100,
rate_limit_bps=self.producer_throughput,
custom_node=self.preallocated_nodes,
debug_logs=True)
Expand All @@ -231,7 +233,8 @@ def start_consumer(self):
self.redpanda,
self._topic,
self.msg_size,
readers=1,
readers=10,
continuous=True,
nodes=self.preallocated_nodes)
self.consumer.start(clean=False)

Expand Down Expand Up @@ -269,14 +272,11 @@ def _replicas_per_node(self):

return node_replicas

@cluster(num_nodes=11,
@cluster(num_nodes=9,
log_allow_list=RESTART_LOG_ALLOW_LIST + TS_LOG_ALLOW_LIST)
@matrix(balancing_mode=["off", 'node_add'],
test_mode=[
TestMode.NO_TIRED_STORAGE, TestMode.TIRED_STORAGE,
TestMode.FAST_MOVES
],
cleanup_policy=["compact", "compact,delete"])
@matrix(balancing_mode=['continuous'],
test_mode=[TestMode.FAST_MOVES],
cleanup_policy=["delete"])
def test_migrating_redpanda_nodes_to_new_pool(self, balancing_mode,
test_mode: TestMode,
cleanup_policy):
Expand All @@ -291,8 +291,8 @@ def test_migrating_redpanda_nodes_to_new_pool(self, balancing_mode,
cleanup_on_early_exit(self)
return

initial_pool = self.redpanda.nodes[0:5]
new_pool = self.redpanda.nodes[5:]
initial_pool = self.redpanda.nodes[0:4]
new_pool = self.redpanda.nodes[4:]

self.redpanda.set_seed_servers(initial_pool)

Expand All @@ -301,12 +301,16 @@ def test_migrating_redpanda_nodes_to_new_pool(self, balancing_mode,
auto_assign_node_id=True,
omit_seeds_on_idx_one=False)

cfg = {"partition_autobalancing_mode": balancing_mode}
cfg = {
"partition_autobalancing_mode": balancing_mode,
"log_compaction_interval_ms": 10,
"log_segment_size": 2048,
"compacted_log_segment_size": 2048,
"group_topic_partitions": 1,
}
if test_mode.has_tiered_storage:
cfg["cloud_storage_enable_remote_write"] = True
cfg["cloud_storage_enable_remote_read"] = True
# we want data to be actually deleted
cfg["retention_local_strict"] = True

if test_mode == TestMode.FAST_MOVES:
self.redpanda.set_cluster_config({
Expand Down Expand Up @@ -334,8 +338,8 @@ def test_migrating_redpanda_nodes_to_new_pool(self, balancing_mode,
backoff_sec=2)
# add new nodes to the cluster
self.redpanda.for_nodes(
new_pool,
lambda n: self.redpanda.start_node(n, auto_assign_node_id=True))
new_pool, lambda n: self.redpanda.start_node(
n, auto_assign_node_id=True, with_stress=True))

def all_nodes_present():
for n in self.redpanda.nodes:
Expand All @@ -359,6 +363,15 @@ def all_nodes_present():
]

for to_decommission_id in decommissioned_ids:
self.consumer.wait()
del self.consumer
# Some time to allow for merge compaction, since new compactible
# segments will favor windowed compaction.
sleep(5)

# Continue consuming.
self.start_consumer()
sleep(10)

self.logger.info(f"decommissioning node: {to_decommission_id}", )
self._decommission(to_decommission_id,
Expand Down Expand Up @@ -397,5 +410,6 @@ def _quiescent_state():

for n in initial_pool:
self.redpanda.stop_node(n)
verify_offset_translator_state_consistent(self.redpanda)

self.verify()
50 changes: 50 additions & 0 deletions tests/rptest/tests/ot_consistency_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Copyright 2020 Redpanda Data, Inc.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.md
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0
import json
import os
import re
import sys
import time
import traceback
from collections import namedtuple, defaultdict
from typing import DefaultDict, List, Optional

from ducktape.mark import matrix
from ducktape.utils.util import wait_until

from rptest.clients.kafka_cat import KafkaCat
from rptest.clients.kafka_cli_tools import KafkaCliTools
from rptest.clients.rpk import RpkTool
from rptest.clients.types import TopicSpec
from rptest.services.cluster import cluster
from rptest.services.redpanda import RedpandaService, SISettings, CloudStorageTypeAndUrlStyle, get_cloud_storage_type, get_cloud_storage_type_and_url_style
from rptest.tests.redpanda_test import RedpandaTest
from rptest.utils.node_operations import verify_offset_translator_state_consistent


class OffsetTranslatorConsistencyTest(RedpandaTest):
def __init__(self, test_ctx, *args, **kwargs):
self._ctx = test_ctx
super(OffsetTranslatorConsistencyTest, self).__init__(
test_ctx,
si_settings=SISettings(test_ctx,
log_segment_size=1024 * 1024,
fast_uploads=True),
*args,
**kwargs,
)

@cluster(num_nodes=3)
def test_offset_translator_state_consistent(self):
cli = KafkaCliTools(self.redpanda)
topic = TopicSpec(partition_count=3, replication_factor=3)

cli.create_topic(topic)
cli.produce(topic.name, 1000, 100)
verify_offset_translator_state_consistent(self.redpanda)
Loading
Loading