Skip to content

Commit

Permalink
added output file querying, mutation via tables
Browse files Browse the repository at this point in the history
  • Loading branch information
nevoodoo committed Jan 31, 2024
1 parent 5fd3f60 commit f75d27d
Show file tree
Hide file tree
Showing 11 changed files with 214 additions and 52 deletions.
3 changes: 3 additions & 0 deletions api/graphql/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
AssayInternal,
AuditLogInternal,
FamilyInternal,
FileInternal,
ParticipantInternal,
Project,
SampleInternal,
Expand Down Expand Up @@ -239,6 +240,7 @@ class GraphQLAnalysis:
timestamp_completed: datetime.datetime | None = None
active: bool
meta: strawberry.scalars.JSON
output_files: strawberry.scalars.JSON

@staticmethod
def from_internal(internal: AnalysisInternal) -> 'GraphQLAnalysis':
Expand All @@ -250,6 +252,7 @@ def from_internal(internal: AnalysisInternal) -> 'GraphQLAnalysis':
timestamp_completed=internal.timestamp_completed,
active=internal.active,
meta=internal.meta,
output_files=FileInternal.reconstruct_json(internal.output_files),
)

@strawberry.field
Expand Down
11 changes: 10 additions & 1 deletion db/project.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1149,7 +1149,12 @@
nullable="false"
/>
</column>
<column name="secondary_files" type="VARCHAR(255)">
<column name="meta" type="VARCHAR(255)">
<constraints
nullable="true"
/>
</column>
<column name="json_structure" type="VARCHAR(255)">
<constraints
nullable="true"
/>
Expand All @@ -1171,6 +1176,10 @@
/>
</column>
</createTable>
<addPrimaryKey columnNames="analysis_id, file_id"
constraintName="PK_ANALYSIS_FILE"
tableName="analysis_file"
validate="true"/>
<sql>ALTER TABLE `file` ADD SYSTEM VERSIONING;</sql>
</changeSet>
</databaseChangeLog>
1 change: 1 addition & 0 deletions db/python/connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
'sequencing_group',
'assay',
'sequencing_group_assay',
'analysis_file',
'analysis_sequencing_group',
'analysis_sample',
'assay_external_id',
Expand Down
10 changes: 9 additions & 1 deletion db/python/layers/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from db.python.layers.base import BaseLayer
from db.python.layers.sequencing_group import SequencingGroupLayer
from db.python.tables.analysis import AnalysisFilter, AnalysisTable
from db.python.tables.file import FileTable
from db.python.tables.sample import SampleTable
from db.python.tables.sequencing_group import SequencingGroupFilter
from db.python.utils import GenericFilter, get_logger
Expand Down Expand Up @@ -48,6 +49,7 @@ def __init__(self, connection: Connection):

self.sampt = SampleTable(connection)
self.at = AnalysisTable(connection)
self.ft = FileTable(connection)

# GETS

Expand Down Expand Up @@ -543,7 +545,7 @@ async def create_analysis(
project: ProjectId = None,
) -> int:
"""Create a new analysis"""
return await self.at.create_analysis(
new_analysis_id = await self.at.create_analysis(
analysis_type=analysis.type,
status=analysis.status,
sequencing_group_ids=analysis.sequencing_group_ids,
Expand All @@ -553,6 +555,10 @@ async def create_analysis(
project=project,
)

await self.ft.create_or_update_analysis_output_files_from_json(analysis_id=new_analysis_id, json_dict=analysis.output)

return new_analysis_id

async def add_sequencing_groups_to_analysis(
self, analysis_id: int, sequencing_group_ids: list[int], check_project_id=True
):
Expand Down Expand Up @@ -591,6 +597,8 @@ async def update_analysis(
output=output,
)

await self.ft.create_or_update_analysis_output_files_from_json(analysis_id=analysis_id, json_dict=output)

async def get_analysis_runner_log(
self,
project_ids: list[int] = None,
Expand Down
24 changes: 23 additions & 1 deletion db/python/tables/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from models.enums import AnalysisStatus
from models.models.analysis import AnalysisInternal
from models.models.audit_log import AuditLogInternal
from models.models.file import FileInternal
from models.models.project import ProjectId


Expand Down Expand Up @@ -73,7 +74,7 @@ async def create_analysis(
('type', analysis_type),
('status', status.value),
('meta', to_db_json(meta or {})),
('output', output),
('output', output), # can keep for now but will be deprecated eventually as a column
('audit_log_id', await self.audit_log_id()),
('project', project or self.project),
('active', active if active is not None else True),
Expand Down Expand Up @@ -226,15 +227,36 @@ async def query(self, filter_: AnalysisFilter) -> List[AnalysisInternal]:

rows = await self.connection.fetch_all(_query, values)
retvals: Dict[int, AnalysisInternal] = {}
analysis_ids: list = []
for row in rows:
key = row['id']
analysis_ids.append(key)
if key in retvals:
retvals[key].sequencing_group_ids.append(row['sequencing_group_id'])
else:
retvals[key] = AnalysisInternal.from_db(**dict(row))

analysis_outputs_by_aid = await self.get_file_outputs_by_analysis_ids(analysis_ids)
for analysis_id, analysis in retvals.items():
retvals[analysis_id] = analysis.copy(update={'output_files': analysis_outputs_by_aid.get(analysis_id, [])})
return list(retvals.values())

async def get_file_outputs_by_analysis_ids(self, analysis_ids: list[int]) -> dict[int, list[FileInternal]]:
"""Fetches all output files for a list of analysis IDs"""

_query = """
SELECT af.analysis_id, f.*
FROM analysis_file af
INNER JOIN file f ON af.file_id = f.id
WHERE af.analysis_id IN :analysis_ids
"""
rows = await self.connection.fetch_all(_query, {'analysis_ids': analysis_ids})
analysis_files: dict[int, list[FileInternal]] = defaultdict(list)
for row in rows:
analysis_files[row['analysis_id']].append(FileInternal.from_db(**dict(row)))

return analysis_files

async def get_latest_complete_analysis_for_type(
self,
project: ProjectId,
Expand Down
89 changes: 89 additions & 0 deletions db/python/tables/file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
from textwrap import dedent
from typing import Optional, Union

from db.python.tables.base import DbBase
from models.models.file import FileInternal


class FileTable(DbBase):
"""
Capture Analysis table operations and queries
"""

table_name = 'file'

async def create_analysis_output_file(
self,
path: str,
analysis_id: int,
json_structure: Optional[str]
) -> int:
"""Create a new file, and add it to an analysis via the join table"""

id_of_new_file = await self.create_or_update_output_file(path, json_structure)
await self.add_output_file_to_analysis(analysis_id=analysis_id, file_id=id_of_new_file)

return id_of_new_file

async def create_or_update_output_file(
self,
path: str,
json_structure: Optional[str] = None
) -> int:
"""
Create a new file, and add it to database
"""
kv_pairs = [
('path', path),
('basename', FileInternal.get_basename(path)),
('dirname', FileInternal.get_dirname(path)),
('nameroot', FileInternal.get_nameroot(path)),
('nameext', FileInternal.get_extension(path)),
('checksum', FileInternal.get_checksum(path)),
('size', FileInternal.get_size(path)),
('json_structure', json_structure),
]

kv_pairs = [(k, v) for k, v in kv_pairs if v is not None]
keys = [k for k, _ in kv_pairs]
cs_keys = ', '.join(keys)
cs_id_keys = ', '.join(f':{k}' for k in keys)
# non_pk_keys = [k for k in keys if k != 'path']
# update_clause = ', '.join([f"{k} = VALUES({k})" for k in non_pk_keys]) # ON DUPLICATE KEY UPDATE {update_clause}

_query = dedent(f"""INSERT INTO file ({cs_keys}) VALUES ({cs_id_keys}) RETURNING id""")

id_of_new_file = await self.connection.fetch_val(
_query,
dict(kv_pairs),
)

return id_of_new_file

async def add_output_file_to_analysis(self, analysis_id: int, file_id: int):
"""Add file to an analysis (through the join table)"""
_query = dedent("""
INSERT INTO analysis_file
(analysis_id, file_id)
VALUES (:analysis_id, :file_id)
ON DUPLICATE KEY UPDATE
analysis_id = VALUES(analysis_id),
file_id = VALUES(file_id)
""")
await self.connection.execute(
_query,
{'analysis_id': analysis_id, 'file_id': file_id}
)

async def create_or_update_analysis_output_files_from_json(
self,
analysis_id: int,
json_dict: Union[dict, str],
) -> None:
"""
Create analysis files from JSON
"""
async with self.connection.transaction():
for json_path, path in FileInternal.find_files_from_dict(json_dict=json_dict):
file_id = await self.create_or_update_output_file(path=path, json_structure='.'.join(json_path))
await self.add_output_file_to_analysis(analysis_id, file_id)
3 changes: 3 additions & 0 deletions db/python/tables/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,9 @@ async def delete_project_data(
INNER JOIN sample ON sample.id = sg.sample_id
WHERE sample.project = :project
);
DELETE FROM analysis_file WHERE analysis_id in (
SELECT id FROM analysis WHERE project = :project
);
DELETE FROM analysis_sample WHERE sample_id in (
SELECT s.id FROM sample s
WHERE s.project = :project
Expand Down
3 changes: 2 additions & 1 deletion models/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
PedRowInternal,
)
from models.models.file import (
File
File,
FileInternal
)
from models.models.participant import (
NestedParticipant,
Expand Down
3 changes: 2 additions & 1 deletion models/models/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from datetime import date, datetime
from typing import Any

from pydantic import BaseModel
from pydantic import BaseModel, ConfigDict

from models.base import SMBase
from models.enums import AnalysisStatus
Expand All @@ -15,6 +15,7 @@

class AnalysisInternal(SMBase):
"""Model for Analysis"""
model_config = ConfigDict(extra='allow')

id: int | None = None
type: str
Expand Down
Loading

0 comments on commit f75d27d

Please sign in to comment.