Skip to content

Commit

Permalink
Refactor analysis-runner into separate table (#699)
Browse files Browse the repository at this point in the history
* AR improvements

* Intermediate push

* Intermediate pushes II

* Fix UI grid

* ar_guids -> ar_guid

* Fix bad timestamp test

* Add migration script

* Allow cwd to be nullable

* Make hail_version nullable for Cromwell runs

* Fix typo in error message

* Move migration into it's own changelog

---------

Co-authored-by: Michael Franklin <[email protected]>
  • Loading branch information
illusional and illusional authored Mar 14, 2024
1 parent dcc604e commit 7b1137f
Show file tree
Hide file tree
Showing 20 changed files with 1,215 additions and 438 deletions.
203 changes: 158 additions & 45 deletions api/graphql/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,16 @@
from api.graphql.filters import GraphQLFilter, GraphQLMetaFilter
from api.graphql.loaders import LoaderKeys, get_context
from db.python import enum_tables
from db.python.layers import AnalysisLayer, SampleLayer, SequencingGroupLayer
from db.python.layers.assay import AssayLayer
from db.python.layers.family import FamilyLayer
from db.python.layers import (
AnalysisLayer,
AnalysisRunnerLayer,
AssayLayer,
FamilyLayer,
SampleLayer,
SequencingGroupLayer,
)
from db.python.tables.analysis import AnalysisFilter
from db.python.tables.analysis_runner import AnalysisRunnerFilter
from db.python.tables.assay import AssayFilter
from db.python.tables.project import ProjectPermissionsTable
from db.python.tables.sample import SampleFilter
Expand All @@ -38,6 +44,7 @@
SampleInternal,
SequencingGroupInternal,
)
from models.models.analysis_runner import AnalysisRunnerInternal
from models.models.sample import sample_id_transform_to_raw
from models.utils.sample_id_format import sample_id_format
from models.utils.sequencing_group_id_format import (
Expand Down Expand Up @@ -84,6 +91,30 @@ def from_internal(internal: Project) -> 'GraphQLProject':
meta=internal.meta,
)

@strawberry.field()
async def analysis_runner(
self,
info: Info,
root: 'Project',
ar_guid: GraphQLFilter[str] | None = None,
author: GraphQLFilter[str] | None = None,
repository: GraphQLFilter[str] | None = None,
access_level: GraphQLFilter[str] | None = None,
environment: GraphQLFilter[str] | None = None,
) -> list['GraphQLAnalysisRunner']:
connection = info.context['connection']
alayer = AnalysisRunnerLayer(connection)
filter_ = AnalysisRunnerFilter(
project=GenericFilter(eq=root.id),
ar_guid=ar_guid.to_internal_filter() if ar_guid else None,
submitting_user=author.to_internal_filter() if author else None,
repository=repository.to_internal_filter() if repository else None,
access_level=access_level.to_internal_filter() if access_level else None,
environment=environment.to_internal_filter() if environment else None,
)
analysis_runners = await alayer.query(filter_)
return [GraphQLAnalysisRunner.from_internal(ar) for ar in analysis_runners]

@strawberry.field()
async def pedigree(
self,
Expand Down Expand Up @@ -161,16 +192,20 @@ async def sequencing_groups(
) -> list['GraphQLSequencingGroup']:
loader = info.context[LoaderKeys.SEQUENCING_GROUPS_FOR_PROJECTS]
filter_ = SequencingGroupFilter(
id=id.to_internal_filter(sequencing_group_id_transform_to_raw)
if id
else None,
id=(
id.to_internal_filter(sequencing_group_id_transform_to_raw)
if id
else None
),
external_id=external_id.to_internal_filter() if external_id else None,
type=type.to_internal_filter() if type else None,
technology=technology.to_internal_filter() if technology else None,
platform=platform.to_internal_filter() if platform else None,
active_only=active_only.to_internal_filter()
if active_only
else GenericFilter(eq=True),
active_only=(
active_only.to_internal_filter()
if active_only
else GenericFilter(eq=True)
),
)
sequencing_groups = await loader.load({'id': root.id, 'filter': filter_})
return [GraphQLSequencingGroup.from_internal(sg) for sg in sequencing_groups]
Expand All @@ -191,15 +226,19 @@ async def analyses(
internal_analysis = await AnalysisLayer(connection).query(
AnalysisFilter(
type=type.to_internal_filter() if type else None,
status=status.to_internal_filter()
if status
else GenericFilter(eq=AnalysisStatus.COMPLETED),
status=(
status.to_internal_filter()
if status
else GenericFilter(eq=AnalysisStatus.COMPLETED)
),
active=active.to_internal_filter() if active else None,
project=GenericFilter(eq=root.id),
meta=meta,
timestamp_completed=timestamp_completed.to_internal_filter()
if timestamp_completed
else None,
timestamp_completed=(
timestamp_completed.to_internal_filter()
if timestamp_completed
else None
),
)
)
return [GraphQLAnalysis.from_internal(a) for a in internal_analysis]
Expand Down Expand Up @@ -464,16 +503,20 @@ async def sequencing_groups(
loader = info.context[LoaderKeys.SEQUENCING_GROUPS_FOR_SAMPLES]

_filter = SequencingGroupFilter(
id=id.to_internal_filter(sequencing_group_id_transform_to_raw)
if id
else None,
id=(
id.to_internal_filter(sequencing_group_id_transform_to_raw)
if id
else None
),
meta=meta,
type=type.to_internal_filter() if type else None,
technology=technology.to_internal_filter() if technology else None,
platform=platform.to_internal_filter() if platform else None,
active_only=active_only.to_internal_filter()
if active_only
else GenericFilter(eq=True),
active_only=(
active_only.to_internal_filter()
if active_only
else GenericFilter(eq=True)
),
)
obj = {'id': root.internal_id, 'filter': _filter}
sequencing_groups = await loader.load(obj)
Expand Down Expand Up @@ -544,12 +587,16 @@ async def analyses(
status=status.to_internal_filter() if status else None,
type=type.to_internal_filter() if type else None,
meta=meta,
active=active.to_internal_filter()
if active
else GenericFilter(eq=True),
project=project.to_internal_filter(lambda val: project_id_map[val])
if project
else None,
active=(
active.to_internal_filter()
if active
else GenericFilter(eq=True)
),
project=(
project.to_internal_filter(lambda val: project_id_map[val])
if project
else None
),
),
}
)
Expand Down Expand Up @@ -592,6 +639,62 @@ async def sample(self, info: Info, root: 'GraphQLAssay') -> GraphQLSample:
return GraphQLSample.from_internal(sample)


@strawberry.type
class GraphQLAnalysisRunner:
"""AnalysisRunner GraphQL model"""

ar_guid: str
output_path: str

timestamp: datetime.datetime
access_level: str
repository: str
commit: str
script: str
description: str
driver_image: str
config_path: str
cwd: str | None
environment: str
hail_version: str | None
batch_url: str
submitting_user: str
meta: strawberry.scalars.JSON

internal_project: strawberry.Private[int]

@staticmethod
def from_internal(internal: AnalysisRunnerInternal) -> 'GraphQLAnalysisRunner':
return GraphQLAnalysisRunner(
ar_guid=internal.ar_guid,
timestamp=internal.timestamp,
access_level=internal.access_level,
repository=internal.repository,
commit=internal.commit,
script=internal.script,
description=internal.description,
driver_image=internal.driver_image,
config_path=internal.config_path,
cwd=internal.cwd,
environment=internal.environment,
hail_version=internal.hail_version,
batch_url=internal.batch_url,
submitting_user=internal.submitting_user,
meta=internal.meta,
output_path=internal.output_path,
# internal
internal_project=internal.project,
)

@strawberry.field
async def project(
self, info: Info, root: 'GraphQLAnalysisRunner'
) -> GraphQLProject:
loader = info.context[LoaderKeys.PROJECTS_FOR_IDS]
project = await loader.load(root.internal_project)
return GraphQLProject.from_internal(project)


@strawberry.type
class Query:
"""GraphQL Queries"""
Expand Down Expand Up @@ -644,12 +747,14 @@ async def sample(
type=type.to_internal_filter() if type else None,
meta=meta,
external_id=external_id.to_internal_filter() if external_id else None,
participant_id=participant_id.to_internal_filter()
if participant_id
else None,
project=project.to_internal_filter(lambda pname: project_name_map[pname])
if project
else None,
participant_id=(
participant_id.to_internal_filter() if participant_id else None
),
project=(
project.to_internal_filter(lambda pname: project_name_map[pname])
if project
else None
),
active=active.to_internal_filter() if active else GenericFilter(eq=True),
)

Expand Down Expand Up @@ -684,21 +789,29 @@ async def sequencing_groups(
project_id_map = {p.name: p.id for p in projects}

filter_ = SequencingGroupFilter(
project=project.to_internal_filter(lambda val: project_id_map[val])
if project
else None,
sample_id=sample_id.to_internal_filter(sample_id_transform_to_raw)
if sample_id
else None,
id=id.to_internal_filter(sequencing_group_id_transform_to_raw)
if id
else None,
project=(
project.to_internal_filter(lambda val: project_id_map[val])
if project
else None
),
sample_id=(
sample_id.to_internal_filter(sample_id_transform_to_raw)
if sample_id
else None
),
id=(
id.to_internal_filter(sequencing_group_id_transform_to_raw)
if id
else None
),
type=type.to_internal_filter() if type else None,
technology=technology.to_internal_filter() if technology else None,
platform=platform.to_internal_filter() if platform else None,
active_only=active_only.to_internal_filter()
if active_only
else GenericFilter(eq=True),
active_only=(
active_only.to_internal_filter()
if active_only
else GenericFilter(eq=True)
),
)
sgs = await sglayer.query(filter_)
return [GraphQLSequencingGroup.from_internal(sg) for sg in sgs]
Expand Down
1 change: 1 addition & 0 deletions api/routes/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from api.routes.analysis import router as analysis_router
from api.routes.analysis_runner import router as analysis_runner_router
from api.routes.assay import router as assay_router
from api.routes.billing import router as billing_router
from api.routes.enum import router as enum_router
Expand Down
98 changes: 98 additions & 0 deletions api/routes/analysis_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import datetime

from fastapi import APIRouter

from api.utils.db import (
Connection,
get_project_readonly_connection,
get_project_write_connection,
)
from db.python.layers.analysis_runner import AnalysisRunnerLayer
from db.python.tables.analysis_runner import AnalysisRunnerFilter
from db.python.utils import GenericFilter
from models.models.analysis_runner import AnalysisRunner, AnalysisRunnerInternal

router = APIRouter(prefix='/analysis-runner', tags=['analysis-runner'])


@router.put('/{project}/', operation_id='createAnalysisRunnerLog')
async def create_analysis_runner_log( # pylint: disable=too-many-arguments
ar_guid: str,
access_level: str,
repository: str,
commit: str,
script: str,
description: str,
driver_image: str,
config_path: str,
cwd: str | None,
environment: str,
hail_version: str | None,
batch_url: str,
submitting_user: str,
meta: dict[str, str],
output_path: str,
connection: Connection = get_project_write_connection,
) -> str:
"""Create a new analysis runner log"""

alayer = AnalysisRunnerLayer(connection)

if not connection.project:
raise ValueError('Project not set')

analysis_id = await alayer.insert_analysis_runner_entry(
AnalysisRunnerInternal(
ar_guid=ar_guid,
timestamp=datetime.datetime.now(),
access_level=access_level,
repository=repository,
commit=commit,
script=script,
description=description,
driver_image=driver_image,
config_path=config_path,
cwd=cwd,
environment=environment,
hail_version=hail_version,
batch_url=batch_url,
submitting_user=submitting_user,
meta=meta,
project=connection.project,
audit_log_id=None,
output_path=output_path,
)
)

return analysis_id


@router.get('/{project}/', operation_id='getAnalysisRunnerLogs')
async def get_analysis_runner_logs(
project: str,
ar_guid: str | None = None,
submitting_user: str | None = None,
repository: str | None = None,
access_level: str | None = None,
environment: str | None = None,
connection: Connection = get_project_readonly_connection,
) -> list[AnalysisRunner]:
"""Get analysis runner logs"""

atable = AnalysisRunnerLayer(connection)

if not connection.project:
raise ValueError('Project not set')

filter_ = AnalysisRunnerFilter(
ar_guid=GenericFilter(eq=ar_guid),
submitting_user=GenericFilter(eq=submitting_user),
repository=GenericFilter(eq=repository),
access_level=GenericFilter(eq=access_level),
environment=GenericFilter(eq=environment),
project=GenericFilter(eq=connection.project),
)

logs = await atable.query(filter_)

return [log.to_external({connection.project: project}) for log in logs]
Loading

0 comments on commit 7b1137f

Please sign in to comment.