Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cloud: Archive according to parameters #612

Merged
merged 1 commit into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion kcidb/cloud/scheduler.sh
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,11 @@ function scheduler_deploy() {
scheduler_job_pubsub_deploy \
"$project" "${prefix}archive_trigger" \
"$archive_trigger_topic" '0 */6 * * *' \
'{}'
"{
\"data_min_age\": $((14*24*60*60)),
\"data_chunk_duration\": $((12*60*60)),
\"run_max_duration\": $((7*60))
}"
}

# Withdraw from the scheduler
Expand Down
94 changes: 71 additions & 23 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,15 +426,62 @@ def kcidb_archive(event, context):
"""
# It's OK, pylint: disable=too-many-locals
#
# Editing window
edit_window = datetime.timedelta(days=14)
# Maximum duration of the dump transferred in a single execution
# Describe the expected event data
params_schema = dict(
type="object",
properties=dict(
data_min_age=dict(
type="integer", minimum=0,
description="Minimum age of data, seconds"
),
data_max_duration=dict(
type="integer", minimum=0,
description="Maximum data duration, seconds. "
"No limit, if missing."
),
data_chunk_duration=dict(
type="integer", minimum=0,
description="Data chunk duration, seconds"
),
run_max_duration=dict(
type="integer", minimum=0,
description="Maximum runtime, seconds"
),
),
required=[
"data_min_age",
"data_chunk_duration",
"run_max_duration",
],
additionalProperties=False,
)

# Parse the input JSON
params_string = base64.b64decode(event["data"]).decode()
params = json.loads(params_string)
jsonschema.validate(
instance=params, schema=params_schema,
format_checker=jsonschema.Draft7Validator.FORMAT_CHECKER
)

# Minimum data age (editing window, to be enforced)
data_min_age = datetime.timedelta(
seconds=int(params["data_min_age"])
)
# Maximum duration of the data transferred in a single execution
# Operational database cannot have gaps of this or greater duration
max_duration = datetime.timedelta(days=7)
# Duration of each dump piece
piece_duration = datetime.timedelta(hours=12)
data_max_duration = (
datetime.timedelta(seconds=int(params["data_max_duration"]))
if "data_max_duration" in params else
None
)
# Duration of each data chunk
data_chunk_duration = datetime.timedelta(
seconds=int(params["data_chunk_duration"])
)

# Execution (monotonic) deadline
deadline_monotonic = time.monotonic() + 7 * 60
deadline_monotonic = time.monotonic() + int(params["run_max_duration"])

op_client = get_db_client(OPERATIONAL_DATABASE)
op_io_schema = op_client.get_schema()[1]
Expand All @@ -446,9 +493,6 @@ def kcidb_archive(event, context):
"aborting")
return

# Maximum timestamp of data to archive
max_until = op_now - edit_window

ar_client = get_db_client(ARCHIVE_DATABASE)
ar_last_modified = ar_client.get_last_modified()

Expand All @@ -461,13 +505,18 @@ def kcidb_archive(event, context):
) for n in op_obj_list_names
}
min_after = min(after.values())
if min_after >= max_until:
LOGGER.info("No data old enough to archive, aborting")
return

# Find the maximum timestamp of the data we need to fetch
# We try to align all tables on a single time boundary
until = min(min_after + max_duration, max_until)
until = min(
datetime.datetime.max if data_max_duration is None
else min_after + data_max_duration,
op_now - data_min_age
)

if min_after >= until:
LOGGER.info("No data old enough to archive, aborting")
return

# Transfer data in pieces which can hopefully fit in memory
# Split by time, down to microseconds, as it's our transfer atom
Expand All @@ -479,13 +528,13 @@ def kcidb_archive(event, context):
LOGGER.info("Ran out of time, stopping")
break
next_after = {
n: min(max(t, min_after + piece_duration), until)
n: min(max(t, min_after + data_chunk_duration), until)
for n, t in after.items()
}
next_min_after = min(next_after.values())
next_min_after_str = next_min_after.isoformat(timespec='microseconds')
# Transfer the data, preserving the timestamps
LOGGER.info("FETCHING operational database dump for (%s, %s] range",
LOGGER.info("FETCHING operational database data for (%s, %s] range",
min_after_str, next_min_after_str)
for obj_list_name in after:
LOGGER.debug(
Expand All @@ -494,23 +543,22 @@ def kcidb_archive(event, context):
after[obj_list_name].isoformat(timespec='microseconds'),
next_after[obj_list_name].isoformat(timespec='microseconds')
)
dump = op_client.dump(with_metadata=True,
data = op_client.dump(with_metadata=True,
after=after, until=next_after)
count = kcidb.io.SCHEMA.count(dump)
LOGGER.info("LOADING a dump of %u objects into archive database",
count)
ar_client.load(dump, with_metadata=True)
count = kcidb.io.SCHEMA.count(data)
LOGGER.info("LOADING %u objects into archive database", count)
ar_client.load(data, with_metadata=True)
LOGGER.info("ARCHIVED %u objects in (%s, %s] range",
count, min_after_str, next_min_after_str)
for obj_list_name in after:
LOGGER.debug("ARCHIVED %u %s",
len(dump.get(obj_list_name, [])), obj_list_name)
len(data.get(obj_list_name, [])), obj_list_name)
total_count += count
after = next_after
min_after = next_min_after
min_after_str = next_min_after_str
# Make sure we have enough memory for the next piece
dump = None
data = None
gc.collect()
else:
LOGGER.info("Completed, stopping")
Expand Down
18 changes: 15 additions & 3 deletions test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,18 +406,30 @@ def gen_data(id, ts):
data_3w = gen_data("archive:3w", ts_3w)
data_4w = gen_data("archive:4w", ts_4w)

# Archival parameters
params = dict(
# Edit window: two weeks
data_min_age=2 * 7 * 24 * 60 * 60,
# Transfer at most one week
data_max_duration=7 * 24 * 60 * 60,
# Transfer one week at a time (everything in one go)
data_chunk_duration=7 * 24 * 60 * 60,
# We gotta be at least faster than the time we wait (60s)
run_max_duration=45,
)

# Load data_now into the operational DB
op_client.load(data_now, with_metadata=True)
# Trigger and wait for archival (ignore possibility of actual trigger)
publisher.publish({})
publisher.publish(params)
time.sleep(60)
# Check data_now doesn't end up in the archive DB
assert ar_schema.count(ar_client.dump()) == 0

# Load data_3w and data_4w
op_client.load(op_schema.merge(data_3w, [data_4w]), with_metadata=True)
# Trigger and wait for archival (ignore possibility of actual trigger)
publisher.publish({})
publisher.publish(params)
time.sleep(60)
# Check data_4w is in the archive database
dump = ar_client.dump()
Expand All @@ -433,7 +445,7 @@ def gen_data(id, ts):
for obj_list_name in op_schema.id_fields
), "Some three-week old data in the archive"
# Trigger and wait for another archival (ignore chance of actual trigger)
publisher.publish({})
publisher.publish(params)
time.sleep(60)
# Check data_3w is now in the archive database
dump = ar_client.dump()
Expand Down