Skip to content

Commit

Permalink
Merge pull request #24619 from vbotbuildovich/backport-pr-24607-v24.3…
Browse files Browse the repository at this point in the history
….x-492

[v24.3.x] Offset translator consistency validation
  • Loading branch information
piyushredpanda authored Dec 20, 2024
2 parents c4bbaec + e87bb19 commit 78e55c3
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 1 deletion.
50 changes: 50 additions & 0 deletions tests/rptest/tests/ot_consistency_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Copyright 2020 Redpanda Data, Inc.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.md
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0
import json
import os
import re
import sys
import time
import traceback
from collections import namedtuple, defaultdict
from typing import DefaultDict, List, Optional

from ducktape.mark import matrix
from ducktape.utils.util import wait_until

from rptest.clients.kafka_cat import KafkaCat
from rptest.clients.kafka_cli_tools import KafkaCliTools
from rptest.clients.rpk import RpkTool
from rptest.clients.types import TopicSpec
from rptest.services.cluster import cluster
from rptest.services.redpanda import RedpandaService, SISettings, CloudStorageTypeAndUrlStyle, get_cloud_storage_type, get_cloud_storage_type_and_url_style
from rptest.tests.redpanda_test import RedpandaTest
from rptest.utils.node_operations import verify_offset_translator_state_consistent


class OffsetTranslatorConsistencyTest(RedpandaTest):
def __init__(self, test_ctx, *args, **kwargs):
self._ctx = test_ctx
super(OffsetTranslatorConsistencyTest, self).__init__(
test_ctx,
si_settings=SISettings(test_ctx,
log_segment_size=1024 * 1024,
fast_uploads=True),
*args,
**kwargs,
)

@cluster(num_nodes=3)
def test_offset_translator_state_consistent(self):
cli = KafkaCliTools(self.redpanda)
topic = TopicSpec(partition_count=3, replication_factor=3)

cli.create_topic(topic)
cli.produce(topic.name, 1000, 100)
verify_offset_translator_state_consistent(self.redpanda)
5 changes: 4 additions & 1 deletion tests/rptest/tests/random_node_operations_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from rptest.services.redpanda import CHAOS_LOG_ALLOW_LIST, PREV_VERSION_LOG_ALLOW_LIST, CloudStorageType, PandaproxyConfig, SISettings, SchemaRegistryConfig
from rptest.services.redpanda_installer import RedpandaInstaller
from rptest.utils.mode_checks import cleanup_on_early_exit, skip_debug_mode, skip_fips_mode
from rptest.utils.node_operations import FailureInjectorBackgroundThread, NodeOpsExecutor, generate_random_workload
from rptest.utils.node_operations import FailureInjectorBackgroundThread, NodeOpsExecutor, generate_random_workload, verify_offset_translator_state_consistent

from rptest.clients.offline_log_viewer import OfflineLogViewer

Expand Down Expand Up @@ -57,6 +57,8 @@ def __init__(self, test_context, *args, **kwargs):
# set disk timeout to value greater than max suspend time
# not to emit spurious errors
"raft_io_timeout_ms": 20000,
"compacted_log_segment_size": 1024 * 1024,
"log_segment_size": 2 * 1024 * 1024,
},
# 2 nodes for kgo producer/consumer workloads
node_prealloc_count=3,
Expand Down Expand Up @@ -577,6 +579,7 @@ def cluster_version_updated():
err_msg="Error waiting for cluster to report consistent version"
)

verify_offset_translator_state_consistent(self.redpanda)
# Validate that the controller log written during the test is readable by offline log viewer
log_viewer = OfflineLogViewer(self.redpanda)
for node in self.redpanda.started_nodes():
Expand Down
47 changes: 47 additions & 0 deletions tests/rptest/utils/node_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from collections import defaultdict
from enum import Enum
import random
import re
import threading
import time
import requests
Expand All @@ -20,6 +21,7 @@
from rptest.services.failure_injector import FailureInjector, FailureSpec
from rptest.services.redpanda import RedpandaService
from rptest.services.redpanda_installer import VERSION_RE, int_tuple
from rptest.util import wait_until_result


class OperationType(Enum):
Expand Down Expand Up @@ -79,6 +81,51 @@ def add(idx):
yield NodeOperation(op, idx, random.choice([True, False]))


def verify_offset_translator_state_consistent(redpanda: RedpandaService):
logger = redpanda.logger
last_delta_pattern = re.compile('^\\{.*, last delta: (?P<delta>\\d+)\\}$')
admin = Admin(redpanda)

for n in redpanda.started_nodes():
node_id = redpanda.node_id(n)
all_partitions = admin.get_partitions(node=n)

def _state_consistent(ns, topic, partition):

state = admin.get_partition_state(ns, topic, partition, node=n)
dirty_offset = state['replicas'][0]['dirty_offset']
if all(r['dirty_offset'] == dirty_offset
for r in state['replicas']):
return True, state
return False, None

for p in all_partitions:
namespace = p['ns']
topic = p['topic']
partition = p['partition_id']
partition_name = f"{namespace}/{topic}/{partition}"
state = wait_until_result(
lambda: _state_consistent(namespace, topic, partition),
timeout_sec=180,
backoff_sec=1,
err_msg="Error waiting for offsets to be consistent")

logger.debug(
f"debug state of {partition_name} replica on node {node_id}: {state}"
)
last_deltas = set()
for r_state in state['replicas']:
ot_state = r_state['raft_state']['offset_translator_state']
if "empty" in ot_state:
continue
m = last_delta_pattern.match(ot_state)
assert m, f"offset translator state {ot_state} does not match expected pattern"
last_deltas.add(m['delta'])
assert len(
last_deltas
) <= 1, f"partition {p} has inconsistent offset translation. Last deltas: {last_deltas}"


class NodeDecommissionWaiter():
def __init__(self,
redpanda,
Expand Down

0 comments on commit 78e55c3

Please sign in to comment.