Skip to content

Commit

Permalink
fix: job caching using data hashes
Browse files Browse the repository at this point in the history
  • Loading branch information
makkus committed Feb 11, 2024
1 parent a380fe6 commit baa6eed
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 9 deletions.
58 changes: 50 additions & 8 deletions src/kiara/registries/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import structlog
from bidict import bidict
from rich.console import Group

from kiara.exceptions import FailedJobException
from kiara.models.events import KiaraEvent
Expand Down Expand Up @@ -136,6 +137,13 @@ def find_existing_job(

matches = []

ignore_internal = True
if ignore_internal:

module = self._kiara.module_registry.create_module(inputs_manifest)
if module.characteristics.is_internal:
return None

for store_id, archive in self._kiara.job_registry.job_archives.items():

match = archive.retrieve_record_for_job_hash(
Expand All @@ -159,8 +167,6 @@ def find_existing_job(
inputs_data_cid, contains_invalid = inputs_manifest.calculate_inputs_data_cid(
data_registry=self._kiara.data_registry
)
# if not inputs_data_cid:
# return None

inputs_data_hash = str(inputs_data_cid)

Expand Down Expand Up @@ -438,6 +444,7 @@ def find_matching_job_record(
return None

job_record = self.job_matcher.find_existing_job(inputs_manifest=inputs_manifest)

if job_record is None:
return None

Expand All @@ -449,6 +456,7 @@ def find_matching_job_record(
job_hash=inputs_manifest.job_hash,
module_type=inputs_manifest.module_type,
)

return job_record.job_id

def prepare_job_config(
Expand Down Expand Up @@ -481,12 +489,21 @@ def execute_job(
job_metadata: Union[None, Any] = None,
) -> uuid.UUID:

log = logger.bind(
module_type=job_config.module_type,
module_config=job_config.module_config,
inputs={k: str(v) for k, v in job_config.inputs.items()},
job_hash=job_config.job_hash,
)
if job_config.module_type != "pipeline":
log = logger.bind(
module_type=job_config.module_type,
module_config=job_config.module_config,
inputs={k: str(v) for k, v in job_config.inputs.items()},
job_hash=job_config.job_hash,
)
else:
pipeline_name = job_config.module_config.get("pipeline_name", "n/a")
log = logger.bind(
module_type=job_config.module_type,
pipeline_name=pipeline_name,
inputs={k: str(v) for k, v in job_config.inputs.items()},
job_hash=job_config.job_hash,
)

stored_job = self.find_matching_job_record(inputs_manifest=job_config)
if stored_job is not None:
Expand All @@ -495,6 +512,31 @@ def execute_job(
job_id=str(stored_job),
module_type=job_config.module_type,
)
if is_develop():

module = self._kiara.module_registry.create_module(manifest=job_config)
if job_metadata and job_metadata.get("is_pipeline_step", True):
step_id = job_metadata.get("step_id", None)
title = f"Using cached pipeline step: {step_id}"
else:
title = f"Using cached job for: {module.module_type_name}"

from kiara.utils.debug import create_module_preparation_table
from kiara.utils.develop import log_dev_message

stored_job_record = self.get_job_record(stored_job)

table = create_module_preparation_table(
kiara=self._kiara,
job_config=job_config,
job_id=stored_job_record.job_id,
module=module,
)
include = ["job_hash", "inputs_id_hash", "input_ids_hash", "outputs"]
table_job_record = stored_job_record.create_renderable(include=include)
panel = Group(table, table_job_record)
log_dev_message(panel, title=title)

return stored_job

if job_metadata is None:
Expand Down
3 changes: 2 additions & 1 deletion src/kiara/registries/jobs/job_store/sqlite_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,10 @@ def store_job_record(self, job_record: JobRecord):
job_record_json = job_record.model_dump_json()

sql = text(
"INSERT INTO job_records (job_hash, manifest_hash, input_ids_hash, inputs_data_hash, job_metadata) VALUES (:job_hash, :manifest_hash, :input_ids_hash, :inputs_data_hash, :job_metadata)"
"INSERT OR IGNORE INTO job_records(job_id, job_hash, manifest_hash, input_ids_hash, inputs_data_hash, job_metadata) VALUES (:job_id, :job_hash, :manifest_hash, :input_ids_hash, :inputs_data_hash, :job_metadata)"
)
params = {
"job_id": str(job_record.job_id),
"job_hash": job_hash,
"manifest_hash": manifest_hash,
"input_ids_hash": input_ids_hash,
Expand Down
5 changes: 5 additions & 0 deletions src/kiara/utils/debug.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,11 @@ def create_module_preparation_table(

module_details = dev_config.log.pre_run.module_info
if module_details not in [DetailLevel.NONE.value, DetailLevel.NONE]:
pipeline_name = job_config.module_config.get("pipeline_name", None)
if module_details in [DetailLevel.MINIMAL.value, DetailLevel.MINIMAL]:
table.add_row("module", job_config.module_type)
if pipeline_name:
table.add_row("pipeline name", pipeline_name)
doc = module.operation.doc
table.add_row(
"module desc",
Expand All @@ -59,6 +62,8 @@ def create_module_preparation_table(
)
elif module_details in [DetailLevel.FULL.value, DetailLevel.FULL]:
table.add_row("module", job_config.module_type)
if pipeline_name:
table.add_row("pipeline name", pipeline_name)
doc = module.operation.doc
table.add_row(
"module doc",
Expand Down

0 comments on commit baa6eed

Please sign in to comment.