Skip to content

Commit

Permalink
feat: Yield inside huge values migration serialization
Browse files Browse the repository at this point in the history
With #4144 we break huge values slot migration into multiple commands.
This PR now adds yield between those commands.
It also adds a test that checks that modifying huge values while doing a
migration works well, and that RSS doesn't grow too much.

Fixes #4100
  • Loading branch information
chakaz committed Nov 26, 2024
1 parent f84e1ee commit 66307a1
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 16 deletions.
6 changes: 4 additions & 2 deletions src/server/journal/streamer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,6 @@ bool RestoreStreamer::ShouldWrite(cluster::SlotId slot_id) const {

void RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) {
if (it.GetVersion() < snapshot_version_) {
FiberAtomicGuard fg;
it.SetVersion(snapshot_version_);
string key_buffer; // we can reuse it
for (; !it.is_done(); ++it) {
Expand Down Expand Up @@ -318,7 +317,10 @@ void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req

void RestoreStreamer::WriteEntry(string_view key, const PrimeValue& pk, const PrimeValue& pv,
uint64_t expire_ms) {
CmdSerializer serializer([&](std::string s) { Write(s); });
CmdSerializer serializer([&](std::string s) {
Write(s);
ThrottleIfNeeded();
});
serializer.SerializeEntry(key, pk, pv, expire_ms);
}

Expand Down
91 changes: 77 additions & 14 deletions tests/dragonfly/cluster_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1825,11 +1825,29 @@ async def node1size0():
assert str(i) == await nodes[1].client.get(f"{{key50}}:{i}")


@pytest.mark.parametrize(
"huge_values_threshold, seed_during_migration",
[
pytest.param(10, True),
pytest.param(1_000, True),
pytest.param(1_000_000, True),
pytest.param(10, False),
pytest.param(1_000, False),
pytest.param(1_000_000, False),
],
)
@dfly_args({"proactor_threads": 2, "cluster_mode": "yes"})
@pytest.mark.asyncio
async def test_cluster_migration_huge_container(df_factory: DflyInstanceFactory):
async def test_cluster_migration_huge_container(
df_factory: DflyInstanceFactory, huge_values_threshold: int, seed_during_migration: bool
):
instances = [
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
df_factory.create(
port=BASE_PORT + i,
admin_port=BASE_PORT + i + 1000,
serialization_max_chunk_size=huge_values_threshold,
)
for i in range(2)
]
df_factory.start_all(instances)

Expand All @@ -1840,16 +1858,45 @@ async def test_cluster_migration_huge_container(df_factory: DflyInstanceFactory)
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])

logging.debug("Generating huge containers")
seeder = StaticSeeder(
key_target=10,
data_size=10_000_000,
collection_size=10_000,
variance=1,
samples=1,
types=["LIST", "HASH", "SET", "ZSET", "STRING"],
)
await seeder.run(nodes[0].client)
source_data = await StaticSeeder.capture(nodes[0].client)
# Insert data to containers with a gaussian distribution: some will be small and other big
stop = False

async def insert_data(client):
nonlocal stop
for i in itertools.count(start=1):
if stop:
return
key = int(random.gauss(mu=0, sigma=100))
val = "#" * i # Create large entries for RSS to grow
await client.rpush(f"l:{key}", val)
await client.sadd(f"s:{key}", val)
await client.hset(f"h:{key}", val, val)
await client.zadd(f"z:{key}", {val: i})
logging.debug("Stopped feeding data")

insert_task = asyncio.create_task(insert_data(instances[0].cluster_client()))

async def get_rss(client, field):
info = await client.info("memory")
return info[field]

rss = 0
while True:
rss = await get_rss(nodes[0].client, "used_memory_rss")
logging.debug(f"Current rss: {rss}")
if rss > 1_000_000_000:
break
await asyncio.sleep(1)

async def stop_seed():
nonlocal stop
stop = True
logging.debug("Waiting for task")
await insert_task
logging.debug("Done waiting for task")

if not seed_during_migration:
await stop_seed()

nodes[0].migrations = [
MigrationInfo("127.0.0.1", instances[1].admin_port, [(0, 16383)], nodes[1].id)
Expand All @@ -1860,8 +1907,24 @@ async def test_cluster_migration_huge_container(df_factory: DflyInstanceFactory)
logging.debug("Waiting for migration to finish")
await wait_for_status(nodes[0].admin_client, nodes[1].id, "FINISHED")

target_data = await StaticSeeder.capture(nodes[1].client)
assert source_data == target_data
if seed_during_migration:
await stop_seed()
else:
# Only verify memory growth if we haven't pushed new data during migration
new_rss = await get_rss(nodes[0].client, "used_memory_peak_rss")
logging.debug(f"new rss {new_rss}, previous rss {rss}")
assert new_rss < rss * 1.1

for i in range(-500, 500):
l = await nodes[1].client.lrange(f"l:{i}", 0, -1)
s = await nodes[1].client.smembers(f"s:{i}")
h = await nodes[1].client.hkeys(f"h:{i}")
z = await nodes[1].client.zrange(f"z:{i}", 0, -1)
assert set(l) == s
assert set(h) == s
assert set(z) == s

await instances[0].cluster_client().close()


def parse_lag(replication_info: str):
Expand Down

0 comments on commit 66307a1

Please sign in to comment.