Skip to content

Commit

Permalink
fix KeyError
Browse files Browse the repository at this point in the history
  • Loading branch information
ervandagadzhanyan committed Sep 5, 2024
1 parent c515181 commit 3168a6a
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 34 deletions.
9 changes: 7 additions & 2 deletions jobs/jobs/airflow_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ async def run(
job_id: int,
files: List[pipeline.PipelineFile],
current_tenant: str,
datasets: List[pipeline.Dataset],
) -> None:
configuration = get_configuration()
with client.ApiClient(configuration) as api_client:
Expand All @@ -74,7 +75,10 @@ async def run(
dag_run_id=dag_run_id,
conf=dataclasses.asdict(
pipeline.PipelineRunArgs(
job_id=job_id, tenant=current_tenant, files_data=files
job_id=job_id,
tenant=current_tenant,
files_data=files,
datasets=datasets,
)
),
)
Expand All @@ -92,5 +96,6 @@ async def run(
job_id: str,
files: List[pipeline.PipelineFile],
current_tenant: str,
datasets: List[pipeline.Dataset],
) -> None:
return await run(pipeline_id, job_id, files, current_tenant)
return await run(pipeline_id, job_id, files, current_tenant, datasets)
11 changes: 9 additions & 2 deletions jobs/jobs/databricks_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ async def run(
job_id: int,
files: List[pipeline.PipelineFile],
current_tenant: str,
datasets: List[pipeline.Dataset],
) -> None:
logger.info(
"Running pipeline %s, job_id %s, current_tenant: %s with arguments %s",
Expand All @@ -60,7 +61,10 @@ async def run(
"badgerdoc_job_parameters": json.dumps(
dataclasses.asdict(
pipeline.PipelineRunArgs(
job_id=job_id, tenant=current_tenant, files_data=files
job_id=job_id,
tenant=current_tenant,
files_data=files,
datasets=datasets,
)
)
)
Expand All @@ -79,5 +83,8 @@ async def run(
job_id: str,
files: List[pipeline.PipelineFile],
current_tenant: str,
datasets: List[pipeline.Dataset],
) -> None:
await run(pipeline_id, int(job_id), files, current_tenant)
await run(
pipeline_id, int(job_id), files, current_tenant, datasets=datasets
)
18 changes: 12 additions & 6 deletions jobs/jobs/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,18 @@ async def create_job(
)
if not job_params.is_draft:
logger.info("Running jobs")
await run_job_funcs.run_extraction_job(
db=db,
job_to_run=created_extraction_job,
current_tenant=current_tenant,
jw_token=jw_token,
)
try:
await run_job_funcs.run_extraction_job(
db=db,
job_to_run=created_extraction_job,
current_tenant=current_tenant,
jw_token=jw_token,
)
except KeyError:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Invalid file data",
)
await run_job_funcs.run_annotation_job(
job_to_run=created_extraction_job,
current_tenant=current_tenant,
Expand Down
9 changes: 8 additions & 1 deletion jobs/jobs/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,17 @@ class PipelineFileInput:
job_id: int


class Dataset(TypedDict, total=False):
id: int
name: str


class PipelineFile(TypedDict, total=False):
bucket: str
input: PipelineFileInput
input_path: str
pages: List[int]
datasets: List[int]
datasets: List[Dataset]
revision: Optional[str]
output_path: Optional[str]
signed_url: Optional[str]
Expand All @@ -25,6 +30,7 @@ class PipelineRunArgs:
job_id: int
tenant: str
files_data: List[PipelineFile]
datasets: List[Dataset]


@dataclass
Expand All @@ -44,5 +50,6 @@ async def run(
job_id: str,
files: List[PipelineFile],
current_tenant: str,
datasets: List[Dataset],
) -> None:
raise NotImplementedError()
18 changes: 10 additions & 8 deletions jobs/jobs/run_job_funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,16 @@ async def run_extraction_job(
job_to_run.pipeline_engine,
)

datasets_resp = await utils.search_datasets_by_ids(
datasets_ids=job_to_run.datasets,
current_tenant=current_tenant,
jw_token=jw_token,
)
datasets = [
{"id": d["id"], "name": d["name"]} for d in datasets_resp["data"]
]
datasets = []
if job_to_run.datasets:
datasets_resp = await utils.search_datasets_by_ids(
datasets_ids=job_to_run.datasets,
current_tenant=current_tenant,
jw_token=jw_token,
)
datasets = [
{"id": d["id"], "name": d["name"]} for d in datasets_resp["data"]
]

await utils.execute_external_pipeline(
pipeline_id=job_to_run.pipeline_id,
Expand Down
35 changes: 20 additions & 15 deletions jobs/jobs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,21 +291,26 @@ def files_data_to_pipeline_arg(
) -> Iterator[pipeline.PipelineFile]:
data = previous_jobs_data if previous_jobs_data else files_data
for file in data:
# todo: change me
_, job_id, file_id, *_ = file["output_path"].strip().split("/")

pipeline_file: pipeline.PipelineFile = {
"bucket": file["bucket"],
"input": pipeline.PipelineFileInput(job_id=job_id),
"input_path": file["file"],
"pages": file["pages"],
"file_id": file_id,
"datasets": file["datasets"],
}
rev = file.get("revision")
if rev:
pipeline_file["revision"] = rev
yield pipeline_file
try:
# todo: change me
_, job_id, file_id, *_ = file["output_path"].strip().split("/")

pipeline_file: pipeline.PipelineFile = {
"bucket": file["bucket"],
"input": pipeline.PipelineFileInput(job_id=job_id),
"input_path": file["file"],
"pages": file["pages"],
"file_id": file_id,
"datasets": file["datasets"],
}
except KeyError as err:
logger.exception(f"Unable to process file: {err}")
raise err
else:
rev = file.get("revision")
if rev:
pipeline_file["revision"] = rev
yield pipeline_file


def fill_signed_url(files: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
Expand Down

0 comments on commit 3168a6a

Please sign in to comment.