Skip to content

Commit

Permalink
extend workunit_definition module
Browse files Browse the repository at this point in the history
  • Loading branch information
leoschwarz committed Aug 13, 2024
1 parent b3c8c02 commit 1251268
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 44 deletions.
5 changes: 3 additions & 2 deletions bfabric/entities/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def __init__(self, data_dict: dict[str, Any], client: Bfabric | None) -> None:
base_path = property(lambda self: Path(self.data_dict["basepath"]))

@cached_property
def scp_prefix(self) -> str:
def scp_prefix(self) -> str | None:
"""SCP prefix with storage base path included."""
return f"{self.data_dict['host']}:{self.data_dict['basepath']}"
protocol = self.data_dict["protocol"]
return f"{self.data_dict['host']}:{self.data_dict['basepath']}" if protocol == "scp" else None
92 changes: 70 additions & 22 deletions bfabric/experimental/app_interface/workunit_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,50 +3,98 @@
from pathlib import Path
from typing import TYPE_CHECKING

from pydantic import BaseModel
import polars as pl
from pydantic import BaseModel, ConfigDict

from bfabric.entities import Workunit, Project, ExternalJob
from bfabric.entities import Workunit, Project, ExternalJob, Resource

if TYPE_CHECKING:
from bfabric.bfabric import Bfabric


class WorkunitDefinitionData(BaseModel):
"""Encapsulates all information necessary to execute a particular workunit."""
class InputResourceDefinition(BaseModel):
id: int
scp_address: str | None
app_id: int
app_name: str

@classmethod
def from_resource(cls, resource: Resource) -> InputResourceDefinition:
# TODO optimize: find generic mechanism to preload entities with an arena-like cache
scp_address = (
f"{resource.storage.scp_prefix}{resource['relativepath']}" if resource.storage.scp_prefix else None
)
data = {
"id": resource.id,
"scp_address": scp_address,
"app_id": resource.workunit.application.id,
"app_name": resource.workunit.application["name"],
}
return cls.model_validate(data)


class WorkunitExecutionDefinition(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)

workunit_id: int
project_id: int
order_id: int | None
parameter_values: dict[str, str]
executable_path: Path
input_resource_ids: list[int]
input_dataset_ids: list[int]
input_dataset: pl.DataFrame | None
input_resources: list[InputResourceDefinition]

@classmethod
def from_workunit(cls, workunit: Workunit) -> WorkunitExecutionDefinition:
input_resources = []
for resource in workunit.input_resources:
input_resources.append(InputResourceDefinition.from_resource(resource))

class WorkunitDefinition:
def __init__(self, data: WorkunitDefinitionData) -> None:
self._data = data
data = {}
data["parameter_values"] = workunit.parameter_values
data["executable_path"] = Path(workunit.application.executable["program"])
data["input_dataset"] = workunit.input_dataset.to_polars() if workunit.input_dataset else None
data["input_resources"] = input_resources
return cls.model_validate(data)

workunit_id = property(lambda self: self._data.workunit_id)

class WorkunitRegistrationDefinition(BaseModel):
workunit_id: int
project_id: int
order_id: int | None

@classmethod
def from_workunit(cls, client: Bfabric, workunit: Workunit) -> WorkunitDefinition:
def from_workunit(cls, workunit: Workunit) -> WorkunitRegistrationDefinition:
data = {"workunit_id": workunit.id}
if isinstance(workunit.container, Project):
data["project_id"] = workunit.container.id
data["order_id"] = None
else:
data["project_id"] = workunit.container.project.id
data["order_id"] = workunit.container.id
data["parameter_values"] = workunit.parameter_values
data["executable_path"] = Path(workunit.application.executable["program"])
# TODO
data["input_resource_ids"] = []
data["input_dataset_ids"] = []
# TODO outputs... (But this part should be reconsidered)
return cls.model_validate(data)


validated = WorkunitDefinitionData.model_validate(data)
return cls(data=validated)
class WorkunitDefinition:
def __init__(self, executon: WorkunitExecutionDefinition, registration: WorkunitRegistrationDefinition) -> None:
self._execution = executon
self._registration = registration

execution = property(lambda self: self._execution)
registration = property(lambda self: self._registration)

# TODO keep these?
workunit_id = property(lambda self: self._registration.workunit_id)
project_id = property(lambda self: self._registration.project_id)
order_id = property(lambda self: self._registration.order_id)
parameter_values = property(lambda self: self._execution.parameter_values)
executable_path = property(lambda self: self._execution.executable_path)
input_resource_ids = property(lambda self: self._execution.input_resource_ids)
input_dataset_ids = property(lambda self: self._execution.input_dataset_ids)

@classmethod
def from_workunit(cls, client: Bfabric, workunit: Workunit) -> WorkunitDefinition:
return cls(
executon=WorkunitExecutionDefinition.from_workunit(workunit),
registration=WorkunitRegistrationDefinition.from_workunit(workunit),
)

@classmethod
def from_external_job_id(cls, client: Bfabric, external_job_id: int) -> WorkunitDefinition:
Expand Down
38 changes: 18 additions & 20 deletions bfabric/wrapper_creator/bfabric_wrapper_creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from bfabric import Bfabric
from bfabric.bfabric_legacy import bfabricEncoder
from bfabric.entities import Workunit, ExternalJob, Application, Resource, Storage, Order, Project
from bfabric.experimental.app_interface.workunit_definition import WorkunitDefinition
from bfabric.wrapper_creator.bfabric_external_job import BfabricExternalJob


Expand All @@ -21,6 +22,10 @@ def __init__(self, client: Bfabric, external_job_id: int) -> None:
self._client = client
self._external_job_id = external_job_id

@cached_property
def workunit_definition(self) -> WorkunitDefinition:
return WorkunitDefinition.from_external_job_id(client=self._client, external_job_id=self._external_job_id)

@cached_property
def _external_job(self) -> ExternalJob:
return ExternalJob.find(id=self._external_job_id, client=self._client)
Expand Down Expand Up @@ -73,7 +78,7 @@ def create_log_resource(self, variant: Literal["out", "err"], output_resource: R
"resource",
{
"name": f"slurm_std{variant}",
"workunitid": self._workunit.id,
"workunitid": self.workunit_definition.workunit_id,
"storageid": self._log_storage.id,
"relativepath": f"/workunitid-{self._workunit.id}_resourceid-{output_resource.id}.{variant}",
},
Expand All @@ -83,30 +88,23 @@ def create_log_resource(self, variant: Literal["out", "err"], output_resource: R
@cached_property
def _inputs_for_application_section(self):
inputs = defaultdict(list)
for resource in self._workunit.input_resources:
application_name = resource.workunit.application.data_dict["name"]
path = Path(resource.storage.data_dict["basepath"], resource.data_dict["relativepath"])
input_url = f"bfabric@{resource.storage.data_dict['host']}:{path}"
inputs[application_name].append(input_url)
for resource in self.workunit_definition.execution.input_resources:
inputs[resource.app_name].append(f"bfabric@{resource.scp_address}")
return dict(inputs)

@cached_property
def _inputs_for_configuration_section(self):
# NOTE: This is not even consistent within the yaml but for historic reasons we keep it...
inputs = defaultdict(list)
for resource in self._workunit.input_resources:
inputs[resource.workunit.application.data_dict["name"]].append(
{
"resource_id": resource.id,
"resource_url": resource.web_url,
}
)
for resource in self.workunit_definition.execution.input_resources:
web_url = Resource({"id": resource.id}, client=self._client).web_url
inputs[resource.app_name].append({"resource_id": resource.id, "resource_url": web_url})
return dict(inputs)

def get_application_section(self, output_resource: Resource) -> dict[str, Any]:
output_url = f"bfabric@{self._application.storage.data_dict['host']}:{self._application.storage.data_dict['basepath']}{output_resource.data_dict['relativepath']}"
return {
"parameters": self._workunit.parameter_values,
"parameters": self.workunit_definition.parameter_values,
"protocol": "scp",
"input": self._inputs_for_application_section,
"output": [output_url],
Expand All @@ -125,13 +123,13 @@ def get_job_configuration_section(
}

return {
"executable": self._application.executable.data_dict["program"],
"executable": str(self.workunit_definition.executable_path),
"external_job_id": self._external_job_id,
"fastasequence": self._fasta_sequence,
"input": self._inputs_for_configuration_section,
"inputdataset": None,
"order_id": self._order.id if self._order is not None else None,
"project_id": self._project.id if self._project is not None else None,
"order_id": self.workunit_definition.order_id,
"project_id": self.workunit_definition.project_id,
"output": {
"protocol": "scp",
"resource_id": output_resource.id,
Expand All @@ -140,7 +138,7 @@ def get_job_configuration_section(
"stderr": log_resource["stderr"],
"stdout": log_resource["stdout"],
"workunit_createdby": self._workunit.data_dict["createdby"],
"workunit_id": self._workunit.id,
"workunit_id": self.workunit_definition.workunit_id,
"workunit_url": self._workunit.web_url,
}

Expand All @@ -165,7 +163,7 @@ def write_results(self, config_serialized: str) -> None:
{
"name": "job configuration (executable) in YAML",
"context": "WORKUNIT",
"workunitid": self._workunit.id,
"workunitid": self.workunit_definition.workunit_id,
"description": "This is a job configuration as YAML base64 encoded. It is configured to be executed by the B-Fabric yaml submitter.",
"base64": base64.b64encode(config_serialized.encode()).decode(),
"version": "10",
Expand All @@ -174,7 +172,7 @@ def write_results(self, config_serialized: str) -> None:
yaml_workunit_externaljob = self._client.save(
"externaljob",
{
"workunitid": self._workunit.id,
"workunitid": self.workunit_definition.workunit_id,
"status": "new",
"executableid": yaml_workunit_executable["id"],
"action": "WORKUNIT",
Expand Down

0 comments on commit 1251268

Please sign in to comment.