-
Notifications
You must be signed in to change notification settings - Fork 73
[HZ-5278] Asyncio module soak tests #755
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
cbe98dc
162fd17
9931956
856e3df
35384bf
fdda120
fee5b45
fc2c38b
1772031
170cf89
539c904
22449a8
5406bc6
a417a4a
d00c480
baa3bc1
ebfc9e2
6928837
3e03cbf
51ced7a
e635b94
4f103f6
042cc58
265a2b4
293975d
2718478
58783dc
3cf9982
7e97ec7
6a558e8
a630706
c1798ea
e92936a
6ced889
c313bfa
6222c6b
120a58a
80880b8
6431acc
e9a9b5e
5334cd1
a14290a
492ccc1
e8a2600
24eb6bf
6ab9365
3f3a9c5
bfb805d
5f59992
91bf1d1
ef7570f
2128f5e
62697e3
a87a5c6
5023568
8d7eede
00a2d12
539466b
2aff5e4
284de6d
767bfd5
e673679
ab4a746
319bb35
8e325ea
eed53b3
a6d5949
f61ec8e
4446ba7
1ca7fd6
1c1699d
d9acede
bd23f41
76759ec
74a9aca
e33af1d
81b5041
2a2d6e8
6da4226
0a829d4
3cf71d5
d61731d
9e1a2e6
6c75f4a
7ad8c4d
550a006
2c8f10e
7f92a93
4a741ae
264e9fa
df561e1
a4697a4
207d9e6
f863f58
082f02c
fddd197
f2378cd
eb60360
3a9d4f5
59b667e
f86e6f0
d73b3a9
759c97e
2db5c1d
ab8ca27
b724f52
0f191c8
60687b8
453e739
6cb5b17
c01fe4f
ec5896f
0ef1822
3dfb065
15772fe
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,268 @@ | ||
| import argparse | ||
| import asyncio | ||
| import logging | ||
| import os | ||
| import random | ||
| import sys | ||
| import time | ||
| import typing | ||
| from collections import defaultdict | ||
|
|
||
| sys.path.append(os.path.join(os.path.dirname(__file__), "../../")) | ||
| # Making sure that hazelcast directory is in the sys.path so we can import modules from there in the command line. | ||
|
|
||
| from hazelcast.asyncio import HazelcastClient, Map | ||
| from hazelcast.predicate import between | ||
| from hazelcast.serialization.api import IdentifiedDataSerializable | ||
|
|
||
| if hasattr(time, "monotonic"): | ||
| get_current_timestamp = time.monotonic | ||
| else: | ||
| get_current_timestamp = time.time | ||
|
|
||
|
|
||
| TASK_COUNT = 256 | ||
| ENTRY_COUNT = 10000 | ||
| OBSERVATION_INTERVAL = 10.0 | ||
|
|
||
|
|
||
| class SoakTestCoordinator: | ||
| def __init__(self, test_duration, address): | ||
| self.address = address | ||
| self._task_count_before = len(asyncio.all_tasks()) | ||
| self._deadline = get_current_timestamp() + test_duration * 60 * 60 | ||
| self._lock = asyncio.Lock() | ||
| # the following are protected by the lock above | ||
| self._reached_deadline = False | ||
| self._tests_failed = False | ||
|
|
||
| async def start_tests(self): | ||
| client, test_map = await self._init_client_map() | ||
| logging.info("Soak test operations are starting!") | ||
| logging.info("* " * 20 + "\n") | ||
| test_runners = [TestRunner(i, self) for i in range(TASK_COUNT)] | ||
| observer = OperationCountObserver(self, test_runners) | ||
| async with asyncio.TaskGroup() as tg: | ||
| observer_task = tg.create_task(observer.run()) | ||
| for runner in test_runners: | ||
| tg.create_task(runner.run(test_map)) | ||
|
|
||
| await observer_task | ||
|
|
||
| logging.info("* " * 20) | ||
| logging.info("Soak test has finished!") | ||
| logging.info("-" * 40) | ||
|
|
||
| if await self.tests_failed(): | ||
| logging.info("Soak test failed!") | ||
| return | ||
|
|
||
| hang_counts = await observer.hang_counts() | ||
| if not hang_counts: | ||
| logging.info("All threads worked without hanging") | ||
| else: | ||
| for runner, hang_count in hang_counts: | ||
| logging.info("Thread %s hanged %s times.", runner.id, hang_count) | ||
|
|
||
| await client.shutdown() | ||
| # wait for canceled tasks to expire | ||
| await asyncio.sleep(1) | ||
| task_count_after = len(asyncio.all_tasks()) | ||
| logging.info("Task count before: %s, after: %s", self._task_count_before, task_count_after) | ||
|
|
||
| async def notify_error(self): | ||
| async with self._lock: | ||
| self._tests_failed = True | ||
|
|
||
| async def should_continue_tests(self): | ||
| async with self._lock: | ||
| return not (self._reached_deadline or self._tests_failed) | ||
|
|
||
| async def check_deadline(self): | ||
| async with self._lock: | ||
| now = get_current_timestamp() | ||
| self._reached_deadline = now >= self._deadline | ||
|
|
||
| async def tests_failed(self) -> bool: | ||
| async with self._lock: | ||
| return self._tests_failed | ||
|
|
||
| async def _init_client_map(self) -> typing.Tuple[HazelcastClient, Map]: | ||
| def no_op_listener(_): | ||
| pass | ||
|
|
||
| try: | ||
| client = await HazelcastClient.create_and_start( | ||
| cluster_members=[self.address], | ||
| data_serializable_factories={ | ||
| SimpleEntryProcessor.FACTORY_ID: { | ||
| SimpleEntryProcessor.CLASS_ID: SimpleEntryProcessor | ||
| } | ||
| }, | ||
| ) | ||
| map = await client.get_map("test-map") | ||
| await map.add_entry_listener( | ||
| include_value=False, | ||
| added_func=no_op_listener, | ||
| removed_func=no_op_listener, | ||
| updated_func=no_op_listener, | ||
| ) | ||
| return client, map | ||
|
|
||
| except Exception as e: | ||
| logging.exception("Client failed to start") | ||
| raise e | ||
|
|
||
|
|
||
| class TestRunner: | ||
| def __init__(self, id, coordinator: SoakTestCoordinator): | ||
| self.id = id | ||
| self.coordinator = coordinator | ||
| self.counter = OperationCounter() | ||
|
|
||
| async def run(self, test_map): | ||
| coordinator = self.coordinator | ||
| processor = SimpleEntryProcessor("test") | ||
| while await coordinator.should_continue_tests(): | ||
| key = str(random.randint(0, ENTRY_COUNT)) | ||
| value = str(random.randint(0, ENTRY_COUNT)) | ||
| operation = random.randint(0, 100) | ||
| try: | ||
| if operation < 30: | ||
| await test_map.get(key) | ||
| elif operation < 60: | ||
| await test_map.put(key, value) | ||
| elif operation < 80: | ||
| await test_map.values(between("this", 0, 10)) | ||
| else: | ||
| await test_map.execute_on_key(key, processor) | ||
|
|
||
| await self.counter.increment() | ||
| except Exception: | ||
| await coordinator.notify_error() | ||
| logging.exception("Unexpected error occurred in thread %s", self) | ||
| return | ||
|
|
||
|
|
||
| class OperationCountObserver: | ||
| def __init__(self, coordinator, test_runners): | ||
| self.coordinator = coordinator | ||
| self.test_runners = test_runners | ||
| self._lock = asyncio.Lock() | ||
| # the lock above protects the fields below | ||
| self._hang_counts = defaultdict(int) | ||
|
|
||
| async def hang_counts(self): | ||
| async with self._lock: | ||
| return self._hang_counts | ||
|
|
||
| async def _increment_hang_count(self, runner): | ||
| async with self._lock: | ||
| self._hang_counts[runner] += 1 | ||
|
|
||
| async def run(self): | ||
| while True: | ||
| await asyncio.sleep(OBSERVATION_INTERVAL) | ||
| await self.coordinator.check_deadline() | ||
| if not await self.coordinator.should_continue_tests(): | ||
| break | ||
|
|
||
| logging.info("-" * 40) | ||
| op_count = 0 | ||
| hanged_runners = [] | ||
|
|
||
| for test_runner in self.test_runners: | ||
| op_count_per_runner = await test_runner.counter.get_and_reset() | ||
| op_count += op_count_per_runner | ||
| if op_count == 0: | ||
| hanged_runners.append(test_runner) | ||
|
|
||
| if not hanged_runners: | ||
| logging.info("All threads worked without hanging") | ||
| else: | ||
| logging.info("%s threads hanged: %s", len(hanged_runners), hanged_runners) | ||
| for hanged_runner in hanged_runners: | ||
| await self._increment_hang_count(hanged_runner) | ||
|
|
||
| logging.info("-" * 40) | ||
| logging.info("Operations Per Second: %s\n", op_count / OBSERVATION_INTERVAL) | ||
|
|
||
|
|
||
| class OperationCounter: | ||
| def __init__(self): | ||
| self._count = 0 | ||
| self._lock = asyncio.Lock() | ||
|
|
||
| async def get_and_reset(self): | ||
| async with self._lock: | ||
| total = self._count | ||
| self._count = 0 | ||
| return total | ||
|
|
||
| async def increment(self): | ||
| async with self._lock: | ||
| self._count += 1 | ||
|
|
||
|
|
||
| class SimpleEntryProcessor(IdentifiedDataSerializable): | ||
| CLASS_ID = 1 | ||
| FACTORY_ID = 66 | ||
|
|
||
| def __init__(self, value): | ||
| self.value = value | ||
|
|
||
| def read_data(self, object_data_input): | ||
| pass | ||
|
|
||
| def write_data(self, object_data_output): | ||
| object_data_output.write_string(self.value) | ||
|
|
||
| def get_class_id(self): | ||
| return self.CLASS_ID | ||
|
|
||
| def get_factory_id(self): | ||
| return self.FACTORY_ID | ||
|
|
||
|
|
||
| def parse_arguments(): | ||
| parser = argparse.ArgumentParser() | ||
| parser.add_argument( | ||
| "--duration", | ||
| default=48.0, | ||
| type=float, | ||
| help="Duration of the test in hours", | ||
| ) | ||
| parser.add_argument( | ||
| "--address", | ||
| required=True, | ||
| type=str, | ||
| help="host:port of the one of the cluster members", | ||
| ) | ||
| parser.add_argument( | ||
| "--log-file", | ||
| required=True, | ||
| type=str, | ||
| help="Name of the log file", | ||
| ) | ||
| return parser.parse_args() | ||
|
|
||
|
|
||
| def setup_logging(log_file): | ||
| logging.basicConfig( | ||
| filename=log_file, | ||
| filemode="w", | ||
| format="%(asctime)s %(message)s", | ||
| datefmt="%H:%M:%S", | ||
| level=logging.INFO, | ||
| ) | ||
|
|
||
|
|
||
| async def amain(): | ||
| arguments = parse_arguments() | ||
| setup_logging(arguments.log_file) | ||
| coordinator = SoakTestCoordinator(arguments.duration, arguments.address) | ||
| await coordinator.start_tests() | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| asyncio.run(amain()) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,10 +5,35 @@ DURATION=${2:-48.0} | |
|
|
||
| mkdir -p "client_logs" | ||
|
|
||
| for i in {0..9} | ||
| do | ||
| python map_soak_test.py \ | ||
| --duration "$DURATION" \ | ||
| --address "$ADDRESS" \ | ||
| --log-file client_logs/client-"$i" & | ||
| declare -a pids | ||
|
|
||
| cleanup () { | ||
| for pid in "${pids[@]}"; do | ||
| echo "Stopping $pid" | ||
| kill "$pid" | ||
| done | ||
| } | ||
|
|
||
| trap cleanup EXIT | ||
|
|
||
| for i in {1..5}; do | ||
| python map_soak_test_asyncio.py \ | ||
| --duration "$DURATION" \ | ||
| --address "$ADDRESS" \ | ||
| --log-file client_logs/client-asyncio-"$i" & | ||
| pid=$! | ||
| echo "$pid running" | ||
| pids+=("$pid") | ||
| done | ||
|
|
||
| for i in {1..5}; do | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. there used to be 10 clients all same type. Now you mıxed asyncio and non-asyncio usage. But this is not an expected usage from users. I would keep 10 clients and have 2 different tests for asynci and asyncore. We need to make sure both clients work independently in their own cluster without a problem. Mixed usage is another use case which is less interest.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Users can use both asyncio and asyncore clients together without problems even in the same process. In the soak tests, asyncio and asyncore clients run in different processes, so there's even better isolation.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We do not do such mixed tests for other clients which is also possible. If you want to change the soak test in this way, we better evaluate changing it for all clients. Original soak test did not mean to test this scenario. |
||
| python map_soak_test.py \ | ||
| --duration "$DURATION" \ | ||
| --address "$ADDRESS" \ | ||
| --log-file client_logs/client-"$i" & | ||
| pid=$! | ||
| echo "$pid running" | ||
| pids+=("$pid") | ||
| done | ||
|
|
||
| wait | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how come this was not caught before? Dont we have a test to verify all supported runtimes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't test for these.
These are just annotations.