Skip to content

Commit

Permalink
Merge branch 'enh/codeocean-sdk'
Browse files Browse the repository at this point in the history
  • Loading branch information
bjhardcastle committed Aug 14, 2024
1 parent c0bb681 commit b2e30d2
Show file tree
Hide file tree
Showing 13 changed files with 1,463 additions and 1,241 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,6 @@ cython_debug/
.misc
.ruff_cache

.sandbox*
.sandbox*
# pyenv
.python-version
1,875 changes: 995 additions & 880 deletions pdm.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ authors = [
dependencies = [
"npc-session>=0.1.36",
"redis>=4.1.4",
"aind-codeocean-api>=0.4.0",
"pydbhub-bjh>=0.0.8",
"pyyaml>=6.0.1",
"pyopenssl>=23.2.0",
Expand All @@ -18,6 +17,7 @@ dependencies = [
"types-pyYAML>=6.0.12.12",
"types-requests>=2.31.0.6",
"npc-io>=0.1.24",
"codeocean>=0.1.5",
]
requires-python = ">=3.9"
readme = "README.md"
Expand Down
130 changes: 65 additions & 65 deletions src/npc_lims/jobs/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,18 @@
from typing import Union

import npc_session
from aind_codeocean_api.models.computations_requests import ComputationDataAsset
from codeocean.computation import (
Computation,
ComputationState,
)
from codeocean.data_asset import (
DataAssetParams, ComputationSource, Source
)

from typing_extensions import TypeAlias

import npc_lims
import npc_lims.metadata.codeocean as codeocean
import npc_lims.metadata.codeocean_utils as codeocean_utils
import npc_lims.paths.s3 as s3

logger = logging.getLogger()
Expand All @@ -25,7 +32,7 @@
VIDEO_MODELS = ("dlc_eye", "dlc_side", "dlc_face", "facemap", "LPFaceParts")


def read_json(process_name: str) -> dict[str, npc_lims.CapsuleComputationAPI]:
def read_json(process_name: str) -> dict[str, Computation | None]:
"""
>>> dlc_eye_queue = read_json('dlc_eye')
>>> len(dlc_eye_queue) > 0
Expand All @@ -40,8 +47,8 @@ def read_json(process_name: str) -> dict[str, npc_lims.CapsuleComputationAPI]:
>>> len(facemap_queue) > 0
True
"""
with (s3.S3_SCRATCH_ROOT / f"{process_name}.json").open() as f:
return json.load(f)
return codeocean_utils.read_computation_queue(
s3.S3_SCRATCH_ROOT / f"{process_name}.json")


def is_session_in_queue(session: SessionID, process_name: str) -> bool:
Expand All @@ -56,20 +63,14 @@ def is_session_in_queue(session: SessionID, process_name: str) -> bool:


def add_to_json(
session_id: SessionID, process_name: str, response: npc_lims.CapsuleComputationAPI
session_id: SessionID,
process_name: str,
computation: Computation | None,
) -> None:
if not (s3.S3_SCRATCH_ROOT / f"{process_name}.json").exists():
current = {}
else:
current = read_json(process_name)

is_new = session_id not in current
current.update({session_id: response})
(s3.S3_SCRATCH_ROOT / f"{process_name}.json").write_text(
json.dumps(current, indent=4)
)
logger.info(
f"{'Added' if is_new else 'Updated'} {session_id} {'to' if is_new else 'in'} json"
return codeocean_utils.add_to_computation_queue(
(s3.S3_SCRATCH_ROOT / f"{process_name}.json"),
session_id,
computation
)


Expand All @@ -79,45 +80,26 @@ def add_to_queue(
session = npc_session.SessionRecord(session_id)

if not is_session_in_queue(session_id, process_name):
request_dict: npc_lims.CapsuleComputationAPI = {
"created": INITIAL_INT_VALUE,
"end_status": INITIAL_VALUE,
"has_results": False,
"id": INITIAL_VALUE,
"name": INITIAL_VALUE,
"run_time": INITIAL_INT_VALUE,
"state": INITIAL_VALUE,
}
add_to_json(session, process_name, request_dict)
add_to_json(session, process_name, None)


def get_current_job_status(
job_or_session_id: str, process_name: str
) -> npc_lims.CapsuleComputationAPI:
) -> Computation | None:
"""
>>> status = get_current_job_status('676909_2023-12-13', 'dlc_eye')
>>> status.keys()
dict_keys(['created', 'data_assets', 'end_status', 'has_results', 'id', 'name', 'run_time', 'state'])
"""
try:
session_id = npc_session.SessionRecord(job_or_session_id).id
except ValueError:
job_id = job_or_session_id
else:
job_id = read_json(process_name)[session_id]["id"]

if job_id != INITIAL_VALUE:
job_status = npc_lims.get_job_status(job_id, check_files=True)
else:
job_status = read_json(process_name)[session_id]

return job_status
return codeocean_utils.get_current_queue_computation(
(s3.S3_SCRATCH_ROOT / f"{process_name}.json"),
job_or_session_id
)


def sync_json(process_name: str) -> None:
current = read_json(process_name)
for session_id in current:
current[session_id] = get_current_job_status(session_id, process_name)
current[session_id] = codeocean_utils.serialize_computation(
get_current_job_status(session_id, process_name))
logger.info(f"Updated {session_id} status")

(s3.S3_SCRATCH_ROOT / f"{process_name}.json").write_text(
Expand All @@ -127,11 +109,13 @@ def sync_json(process_name: str) -> None:


def get_data_asset_name(session_id: SessionID, process_name: str) -> str:
computation = get_current_job_status(session_id, process_name)
if computation is None:
raise ValueError(f"No computation found for {session_id}")

created_dt = (
npc_session.DatetimeRecord(
datetime.datetime.fromtimestamp(
get_current_job_status(session_id, process_name)["created"]
)
datetime.datetime.fromtimestamp(computation.created)
)
.replace(" ", "_")
.replace(":", "-")
Expand All @@ -141,17 +125,17 @@ def get_data_asset_name(session_id: SessionID, process_name: str) -> str:

def create_data_asset(session_id: SessionID, job_id: str, process_name: str) -> None:
data_asset_name = get_data_asset_name(session_id, process_name)
asset = codeocean.create_session_data_asset(session_id, job_id, data_asset_name)
asset = codeocean_utils.create_session_data_asset(
session_id, job_id, data_asset_name)

if asset is None:
logger.info(f"Failed to create data asset for {session_id}")
return

asset.raise_for_status()
while not asset_exists(session_id, process_name):
time.sleep(10)
logger.info(f"Created data asset for {session_id}")
npc_lims.set_asset_viewable_for_everyone(asset.json()["id"])
npc_lims.set_asset_viewable_for_everyone(asset.id)


def asset_exists(session_id: SessionID, process_name: str) -> bool:
Expand All @@ -169,21 +153,24 @@ def create_all_data_assets(process_name: str, overwrite_existing_assets: bool) -

for session_id in read_json(process_name):
job_status = get_current_job_status(session_id, process_name)
if job_status is None:
continue
if npc_lims.is_computation_errored(
job_status
) or not npc_lims.is_computation_finished(job_status):
continue
if asset_exists(session_id, process_name) and not overwrite_existing_assets:
continue
create_data_asset(session_id, job_status["id"], process_name)
create_data_asset(session_id, job_status.id, process_name)


def sync_and_get_num_running_jobs(process_name: str) -> int:
sync_json(process_name)
return sum(
1
for job in read_json(process_name).values()
if job["state"] in ("running", "initializing")
if job is not None and job.state in
(ComputationState.Running, ComputationState.Initializing)
)


Expand All @@ -192,10 +179,11 @@ def is_started_or_completed(session_id: SessionID, process_name: str) -> bool:
>>> is_started_or_completed(npc_session.SessionRecord('664851_2023-11-14'), 'dlc_side')
True
"""
return read_json(process_name)[session_id]["state"] in (
"running",
"initializing",
"completed",
computation = read_json(process_name)[session_id]
return computation is not None and computation.state in (
ComputationState.Running,
ComputationState.Initializing,
ComputationState.Completed,
)


Expand Down Expand Up @@ -224,19 +212,31 @@ def add_sessions_to_queue(


def start(
session_id: SessionID, capsule_pipeline_info: codeocean.CapsulePipelineInfo
session_id: SessionID,
capsule_pipeline_info: codeocean_utils.CapsulePipelineInfo
) -> None:
session_data_asset = npc_lims.get_session_raw_data_asset(session_id)
data_assets = [
ComputationDataAsset(
id=npc_lims.get_session_raw_data_asset(session_id)["id"],
mount=npc_lims.get_session_raw_data_asset(session_id)["name"],
DataAssetParams(
name=session_data_asset.name,
source=Source(
computation=ComputationSource(
id=session_data_asset.id,
)
),
mount=session_data_asset.mount,
),
]
response = npc_lims.run_capsule_or_pipeline(
data_assets, capsule_pipeline_info.id, capsule_pipeline_info.is_pipeline
computation = npc_lims.run_capsule_or_pipeline(
data_assets, capsule_pipeline_info.id,
capsule_pipeline_info.is_pipeline
)
logger.info(f"Started job for {session_id}")
add_to_json(session_id, capsule_pipeline_info.process_name, response)
add_to_json(
session_id,
capsule_pipeline_info.process_name,
computation,
)


def process_capsule_or_pipeline_queue(
Expand All @@ -253,7 +253,7 @@ def process_capsule_or_pipeline_queue(
adds jobs to queue for capsule/pipeline, then processes them
example: process_capsule_or_pipeline_queue('1f8f159a-7670-47a9-baf1-078905fc9c2e', 'sorted', is_pipeline=True)
"""
capsule_pipeline_info = codeocean.CapsulePipelineInfo(
capsule_pipeline_info = codeocean_utils.CapsulePipelineInfo(
capsule_or_pipeline_id, process_name, is_pipeline
)

Expand Down
2 changes: 1 addition & 1 deletion src/npc_lims/metadata/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from npc_lims.metadata.codeocean import *
from npc_lims.metadata.codeocean_utils import *
from npc_lims.metadata.spreadsheets import *
from npc_lims.metadata.targeting import *
from npc_lims.metadata.types import *
Loading

0 comments on commit b2e30d2

Please sign in to comment.