diff --git a/api/graphql/filters.py b/api/graphql/filters.py index 04174dd16..68ae3a3e5 100644 --- a/api/graphql/filters.py +++ b/api/graphql/filters.py @@ -14,6 +14,10 @@ class GraphQLFilter(Generic[T]): eq: T | None = None in_: list[T] | None = None nin: list[T] | None = None + gt: T | None = None + gte: T | None = None + lt: T | None = None + lte: T | None = None def all_values(self): """ @@ -26,6 +30,14 @@ def all_values(self): v.extend(self.in_) if self.nin: v.extend(self.nin) + if self.gt: + v.append(self.gt) + if self.gte: + v.append(self.gte) + if self.lt: + v.append(self.lt) + if self.lte: + v.append(self.lte) return v @@ -37,9 +49,21 @@ def to_internal_filter(self, f: Callable[[T], Any] = None): eq=f(self.eq) if self.eq else None, in_=list(map(f, self.in_)) if self.in_ else None, nin=list(map(f, self.nin)) if self.nin else None, + gt=f(self.gt) if self.gt else None, + gte=f(self.gte) if self.gte else None, + lt=f(self.lt) if self.lt else None, + lte=f(self.lte) if self.lte else None, ) - return GenericFilter(eq=self.eq, in_=self.in_, nin=self.nin) + return GenericFilter( + eq=self.eq, + in_=self.in_, + nin=self.nin, + gt=self.gt, + gte=self.gte, + lt=self.lt, + lte=self.lte, + ) GraphQLMetaFilter = strawberry.scalars.JSON diff --git a/api/graphql/schema.py b/api/graphql/schema.py index 7b9a030f8..f8d70dcbd 100644 --- a/api/graphql/schema.py +++ b/api/graphql/schema.py @@ -7,6 +7,7 @@ Note, we silence a lot of linting here because GraphQL looks at type annotations and defaults to decide the GraphQL schema, so it might not necessarily look correct. """ +import datetime from inspect import isclass import strawberry @@ -182,6 +183,7 @@ async def analyses( status: GraphQLFilter[strawberry.enum(AnalysisStatus)] | None = None, active: GraphQLFilter[bool] | None = None, meta: GraphQLMetaFilter | None = None, + timestamp_completed: GraphQLFilter[datetime.datetime] | None = None, ) -> list['GraphQLAnalysis']: connection = info.context['connection'] connection.project = root.id @@ -194,6 +196,9 @@ async def analyses( active=active.to_internal_filter() if active else None, project=GenericFilter(eq=root.id), meta=meta, + timestamp_completed=timestamp_completed.to_internal_filter() + if timestamp_completed + else None, ) ) return [GraphQLAnalysis.from_internal(a) for a in internal_analysis] @@ -207,7 +212,7 @@ class GraphQLAnalysis: type: str status: strawberry.enum(AnalysisStatus) output: str | None - timestamp_completed: str | None = None + timestamp_completed: datetime.datetime | None = None active: bool meta: strawberry.scalars.JSON author: str diff --git a/db/python/layers/analysis.py b/db/python/layers/analysis.py index 5575a9c13..e3bc1dd1d 100644 --- a/db/python/layers/analysis.py +++ b/db/python/layers/analysis.py @@ -1,7 +1,8 @@ from collections import defaultdict -from datetime import date, datetime +import datetime from typing import Any +from api.utils import group_by from db.python.connect import Connection from db.python.layers.base import BaseLayer from db.python.layers.sequencing_group import SequencingGroupLayer @@ -16,11 +17,28 @@ ProportionalDateModel, ProportionalDateProjectModel, ProportionalDateTemporalMethod, + SequencingGroupInternal, ) +from models.models.sequencing_group import SequencingGroupInternalId + +ES_ANALYSIS_OBJ_INTRO_DATE = datetime.date(2022, 6, 21) logger = get_logger() +def check_or_parse_date(date_: datetime.date | str | None) -> datetime.date | None: + """Check or parse a date""" + if not date_: + return None + if isinstance(date_, datetime.datetime): + return date_.date() + if isinstance(date_, datetime.date): + return date_ + if isinstance(date_, str): + return datetime.datetime.strptime(date_, '%Y-%m-%d').date() + raise ValueError(f'Invalid datetime.date {date_!r}') + + class AnalysisLayer(BaseLayer): """Layer for analysis logic""" @@ -125,281 +143,123 @@ async def query(self, filter_: AnalysisFilter, check_project_ids=True): return analyses - async def get_sg_history_for_temporal_method( - self, - sequencing_group_ids: list[int], - temporal_method: ProportionalDateTemporalMethod, - ) -> dict[int, date]: - """Get the history of samples for the given temporal method""" - sglayer = SequencingGroupLayer(self.connection) - - if temporal_method == ProportionalDateTemporalMethod.SAMPLE_CREATE_DATE: - return await sglayer.get_samples_create_date_from_sgs(sequencing_group_ids) - if temporal_method == ProportionalDateTemporalMethod.SG_ES_INDEX_DATE: - return await self.at.get_sg_add_to_project_es_index( - sg_ids=sequencing_group_ids - ) - - raise NotImplementedError( - f'Have not implemented {temporal_method.value} temporal method yet' - ) - - @staticmethod - def _sg_history_keep_sg_group( - sgid: int, sg_history: dict[int, date], start_date, end_date - ): - """Keep the sequencing group for prop map based on the params""" - d = sg_history.get(sgid) - if not d: - return False - if start_date and d <= start_date: - return True - if end_date and d <= end_date: - return True - if not start_date and not end_date: - return True - return False - - async def get_sequencing_group_file_sizes( - self, - temporal_methods: list[ProportionalDateTemporalMethod], - project_ids: list[int] = None, - start_date: date = None, - end_date: date = None, - sequencing_types: list[str] = None, - ) -> dict[ProportionalDateTemporalMethod, list[dict]]: - """ - Get the file sizes from all the given projects group by sample filtered - on the date range - """ - - if not temporal_methods: - return {} - - # Get samples from pids - sglayer = SequencingGroupLayer(self.connection) - sgfilter = SequencingGroupFilter( - project=GenericFilter(in_=project_ids), - type=GenericFilter(in_=sequencing_types) if sequencing_types else None, - ) - - sequencing_groups = await sglayer.query(sgfilter) - sg_to_project = {sg.id: sg.project for sg in sequencing_groups} - - crams = await self.at.query( - AnalysisFilter( - sequencing_group_id=GenericFilter(in_=list(sg_to_project.keys())), - type=GenericFilter(eq='cram'), - status=GenericFilter(eq=AnalysisStatus.COMPLETED), - ) - ) - - sg_history_by_method = {} - # Get size of analysis crams - for method in temporal_methods: - sg_history_by_method[method] = await self.get_sg_date_sizes_for_method( - sg_to_project=sg_to_project, - method=method, - start_date=start_date, - end_date=end_date, - crams=crams, - ) - return sg_history_by_method - - async def get_sg_date_sizes_for_method( - self, - sg_to_project: dict, - method: ProportionalDateTemporalMethod, - start_date: date, - end_date: date | None, - crams: list[AnalysisInternal], - ): - """ - Take the params, determine the history method to use, - and return the format: - - { - { - 'project': p, - 'sequencing_groups': [ - - ] - project_id: { - sg_id: [{ - 'start': date, - 'end': date | None - 'size': size, - }] - } - } - """ - method_history = await self.get_sg_history_for_temporal_method( - sequencing_group_ids=list(sg_to_project.keys()), temporal_method=method - ) - filtered_sequencing_group_ids = { - sgid - for sgid in sg_to_project - if self._sg_history_keep_sg_group( - sgid=sgid, - sg_history=method_history, - start_date=start_date, - end_date=end_date, - ) - } - if not filtered_sequencing_group_ids: - # if there are no sequencing group IDs, the query analysis treats that - # as not including a filter (so returns all for the project IDs) - return [] - - crams_by_project: dict[int, dict[int, list[dict]]] = defaultdict(dict) - - # Manual filtering to find the most recent analysis cram of each sequence type - # for each sample - affected_analyses = [] - for cram in crams: - sgids = cram.sequencing_group_ids - if len(sgids) > 1: - affected_analyses.append(cram) - continue - - sgid = int(sgids[0]) - if sgid not in filtered_sequencing_group_ids: - continue - - # Allow for multiple crams per sample in the future - # even though for now we only support 1 - if sgid not in method_history: - # sometimes we might find crams that actually shouldn't - # be included in the cost yet, we can skip them :) - continue - - if size := cram.meta.get('size'): - if project := sg_to_project.get(sgid): - crams_by_project[project][sgid] = [ - { - 'start': method_history[sgid], - 'end': None, # TODO: add functionality for deleted samples - 'size': size, - } - ] - - return [ - {'project': p, 'sequencing_groups': crams_by_project[p]} - for p in crams_by_project - ] - async def get_cram_size_proportionate_map( self, projects: list[ProjectId], sequencing_types: list[str] | None, temporal_methods: list[ProportionalDateTemporalMethod], - start_date: date = None, - end_date: date = None, + start_date: datetime.date = None, + end_date: datetime.date = None, ) -> dict[ProportionalDateTemporalMethod, list[ProportionalDateModel]]: """ This is a bit more complex, but we want to generate a map of cram size by day, - based on the temporal_method (sample create date, joint call date). - NB: Can't use the align date because the data is not good enough + based on the temporal_method (sample create datetime.date, joint call datetime.date). + NB: Can't use the align datetime.date because the data is not good enough """ # sanity checks if not start_date: raise ValueError('start_date must be set') - if end_date and start_date and end_date < start_date: - raise ValueError('end_date must be after start_date') + start_date = check_or_parse_date(start_date) + end_date = check_or_parse_date(end_date) - if start_date < date(2020, 1, 1): - raise ValueError('start_date must be after 2020-01-01') + if end_date and start_date and end_date < start_date: + raise ValueError( + f'end_date ({end_date}) must be after start_date ({start_date})' + ) - end_date_date = None - if end_date: - if isinstance(end_date, date): - end_date_date = end_date - elif isinstance(end_date, datetime): - end_date_date = end_date.date() - else: - raise ValueError('end_date must be a date or datetime') + if start_date < datetime.date(2020, 1, 1): + raise ValueError(f'start_date ({start_date}) must be after 2020-01-01') project_objs = await self.ptable.get_and_check_access_to_projects_for_ids( project_ids=projects, user=self.author, readonly=True ) project_name_map = {p.id: p.name for p in project_objs} - sg_sizes_by_method_by_project = await self.get_sequencing_group_file_sizes( - project_ids=projects, - start_date=start_date, - end_date=end_date, - sequencing_types=sequencing_types, - temporal_methods=temporal_methods, + sglayer = SequencingGroupLayer(self.connection) + sgfilter = SequencingGroupFilter( + project=GenericFilter(in_=projects), + type=GenericFilter(in_=sequencing_types) if sequencing_types else None, ) - results: dict[ProportionalDateTemporalMethod, list[ProportionalDateModel]] = {} - for ( - method, - sequencing_group_sizes_by_project, - ) in sg_sizes_by_method_by_project.items(): - results[ - method - ] = self.get_cram_size_proportionate_map_from_sequencing_group_sizes( - sequencing_group_sizes_by_project=sequencing_group_sizes_by_project, - project_name_map=project_name_map, - start_date=start_date, - end_date=end_date_date, + sequencing_groups = await sglayer.query(sgfilter) + sg_by_id = {sg.id: sg for sg in sequencing_groups} + sg_to_project = {sg.id: sg.project for sg in sequencing_groups} + + cram_list = await self.at.query( + AnalysisFilter( + sequencing_group_id=GenericFilter(in_=list(sg_to_project.keys())), + type=GenericFilter(eq='cram'), + status=GenericFilter(eq=AnalysisStatus.COMPLETED), ) + ) + + crams_by_sg = group_by(cram_list, lambda c: c.sequencing_group_ids[0]) + + results: dict[ProportionalDateTemporalMethod, list[ProportionalDateModel]] = {} + for method in temporal_methods: + if method == ProportionalDateTemporalMethod.SAMPLE_CREATE_DATE: + results[method] = await self.get_prop_map_for_sample_create_date( + sg_by_id=sg_by_id, + crams=crams_by_sg, + project_name_map=project_name_map, + start_date=start_date, + end_date=end_date, + ) + elif method == ProportionalDateTemporalMethod.SG_ES_INDEX_DATE: + results[method] = await self.get_prop_map_for_es_index_date( + sg_by_id=sg_by_id, + crams=crams_by_sg, + project_name_map=project_name_map, + start_date=start_date, + end_date=end_date, + ) + else: + raise NotImplementedError( + f'Have not implemented {method.value} temporal method yet' + ) return results - def get_cram_size_proportionate_map_from_sequencing_group_sizes( + async def get_prop_map_for_sample_create_date( self, - sequencing_group_sizes_by_project: list[dict], + sg_by_id: dict[SequencingGroupInternalId, SequencingGroupInternal], + crams: dict[SequencingGroupInternalId, list[AnalysisInternal]], project_name_map: dict[int, str], - start_date: date, - end_date: date = None, - ): + start_date: datetime.date | None, + end_date: datetime.date | None, + ) -> list[ProportionalDateModel]: """ Turn the sequencing_group_sizes_project into a proportionate map We'll do this in three steps: - 1. First assign the cram a {dataset: total_size} map on the relevant day. - This generates a diff of each dataset by day. + 1. First, calculate a delta of each project by day, based on the sample create date + This means we can more easily handle SGs with multiple crams 2. Iterate over the days, and progressively sum up the sizes in the map. 3. Iterate over the days, and proportion each day by total size in the day. """ - by_date_diff: dict[date, dict[str, int]] = defaultdict(lambda: defaultdict(int)) # 1. - for obj in sequencing_group_sizes_by_project: - project_name = project_name_map.get(obj['project']) - for sg_dates in obj['sequencing_groups'].values(): - sizes_dates: list[tuple[date, int]] = [] - for obj in sg_dates: - obj_date = obj['start'] - size = int(obj['size']) - if end_date and start_date > end_date: - continue - if len(sizes_dates) > 0: - # subtract last size to get the difference - # if the crams got smaller, this number will be negative - size -= sizes_dates[-1][1] - - adjusted_start_date = obj_date - if start_date and obj_date < start_date: - adjusted_start_date = start_date - by_date_diff[adjusted_start_date][project_name] += size - sizes_dates.append((adjusted_start_date, size)) + by_project_delta = await self.calculate_delta_of_crams_by_project_for_day( + sg_by_id=sg_by_id, + crams=crams, + project_name_map=project_name_map, + start_date=start_date, + end_date=end_date, + ) # 2: progressively sum up the sizes, prepping for step 3 - by_date_totals: list[tuple[date, dict[str, int]]] = [] - sorted_days = list(sorted(by_date_diff.items(), key=lambda el: el[0])) - for idx, (dt, project_map) in enumerate(sorted_days): - if idx == 0: + by_date_totals: list[tuple[datetime.date, dict[str, int]]] = [] + sorted_days = list(sorted(by_project_delta.items(), key=lambda el: el[0])) + for dt, project_map in sorted_days: + if len(by_date_totals) == 0: by_date_totals.append((dt, project_map)) continue - new_project_map = {**by_date_totals[idx - 1][1]} + new_project_map = {**by_date_totals[-1][1]} for pn, cram_size in project_map.items(): if pn not in new_project_map: @@ -427,6 +287,249 @@ def get_cram_size_proportionate_map_from_sequencing_group_sizes( return prop_map + async def get_prop_map_for_es_index_date( + self, + sg_by_id: dict[SequencingGroupInternalId, SequencingGroupInternal], + crams: dict[SequencingGroupInternalId, list[AnalysisInternal]], + project_name_map: dict[ProjectId, str], + start_date: datetime.date | None, + end_date: datetime.date | None, + ) -> list[ProportionalDateModel]: + """ + Calculate the prop map for es-indices. + + This one works a bit different, we start with the es-indices, and progressively + add samples into this list as we see new samples. + + We'll do this in three steps: + + 1. Prepare the crams into a format where it's easier for us to find: + "What cram size is appropriate for this day" + + 2. Get all SGs inside any es-index (* or joint call) before the start date + (that forms our baseline crams) + + 3. Get all es-indices between the start and end date + We'll do some processing on these analysis objects so we just get the + SGs that are new on a specific day. + + 4. Iterate over the days, and add the most appropriate cram size for each + SG for that day. + * We can't progressively sum, because the cram size might change + between days, so get it on each day. + """ + sizes_by_sg = await self.get_cram_sizes_between_range( + crams=crams, + start_date=start_date, + end_date=end_date, + ) + + def get_cram_size_for(sg_id: SequencingGroupInternalId, date): + """ + From the list of crams, return the most appropriate cram size for a + sequencing group on a specific day. + """ + if sg_id not in sizes_by_sg: + return None + sg_sizes = sizes_by_sg[sg_id] + if len(sg_sizes) == 1: + # probably shouldn't just return it, but it's the only cram size + # and for some reason it's in the es-index, so we'll just use it + return sg_sizes[0][1] + for dt, size in sg_sizes[::-1]: + if dt <= date: + return size + logger.warning(f'Could not find size for {sg_id} on {date}') + return None + + sg_to_project: dict[SequencingGroupInternalId, ProjectId] = { + sg.id: sg.project for sg in sg_by_id.values() + } + + sgs_added_by_day = await self.get_sgs_added_by_day_by_es_indices( + start=start_date, end=end_date, projects=list(project_name_map.keys()) + ) + sgs_seen: set[SequencingGroupInternalId] = set() + + ordered_days = sorted(sgs_added_by_day.items(), key=lambda el: el[0]) + prop_map: list[ProportionalDateModel] = [] + for day, sgs_for_day in ordered_days: + by_project: dict[ProjectId, int] = defaultdict(int) + sgs_seen |= sgs_for_day + for sg in sgs_seen: + if sg not in sg_to_project: + # it's a sg that was in an es-index, but not in the projects + # we care about, so happily skip. It's _probably_ quicker to do + # it this way, rather than only querying for the SGs we care about + continue + if cram_size := get_cram_size_for(sg, day): + by_project[sg_to_project[sg]] += cram_size + + total_size = sum(by_project.values()) + prop_map.append( + ProportionalDateModel( + date=day, + projects=[ + ProportionalDateProjectModel( + project=project_name_map[pid], + percentage=size / total_size, + size=size, + ) + for pid, size in by_project.items() + ], + ) + ) + + return prop_map + + async def calculate_delta_of_crams_by_project_for_day( + self, + sg_by_id: dict[SequencingGroupInternalId, SequencingGroupInternal], + crams: dict[SequencingGroupInternalId, list[AnalysisInternal]], + project_name_map: dict[int, str], + start_date: datetime.date | None, + end_date: datetime.date | None, + ) -> dict[datetime.date, dict[str, int]]: + """ + Calculate a delta of cram size for each project by day, so you can sum them up + """ + sglayer = SequencingGroupLayer(self.connection) + sample_create_dates = await sglayer.get_samples_create_date_from_sgs( + list(crams.keys()) + ) + by_date_diff: dict[datetime.date, dict[str, int]] = defaultdict( + lambda: defaultdict(int) + ) + + for sg_id, analyses in crams.items(): + for idx, cram in enumerate(analyses): + project = project_name_map.get(sg_by_id[sg_id].project) + delta = None + if idx == 0: + # use the sample_create_date for the first analysis + sg_start_date = sample_create_dates[sg_id] + delta = cram.meta.get('size') or 0 + else: + # replace with the current analyses timestamp_completed + sg_start_date = check_or_parse_date(cram.timestamp_completed) + if new_cram_size := cram.meta.get('size'): + delta = new_cram_size - analyses[idx - 1].meta.get('size', 0) + if not delta: + continue + if end_date and sg_start_date > end_date: + continue + + # this will eventually get the "best" cram size correctly by applying + # deltas for multiple crams before the start datetime.date, so the + # clamping here is fine. + clamped_date = max(sg_start_date, start_date) + by_date_diff[clamped_date][project] += delta + + return by_date_diff + + async def get_cram_sizes_between_range( + self, + crams: dict[SequencingGroupInternalId, list[AnalysisInternal]], + start_date: datetime.date | None, + end_date: datetime.date | None, + ) -> dict[SequencingGroupInternalId, list[tuple[datetime.date, int]]]: + """ + This method uses the cram start time + """ + sglayer = SequencingGroupLayer(self.connection) + sample_create_dates = await sglayer.get_samples_create_date_from_sgs( + list(crams.keys()) + ) + by_date: dict[ + SequencingGroupInternalId, list[tuple[datetime.date, int]] + ] = defaultdict(list) + + for sg_id, analyses in crams.items(): + if len(analyses) == 1: + # it does resolve the same, but most cases come through here + by_date[sg_id] = [ + ( + max(sample_create_dates[sg_id], start_date), + analyses[0].meta.get('size') or 0, + ) + ] + else: + for idx, cram in enumerate( + sorted(analyses, key=lambda a: a.timestamp_completed) + ): + if idx == 0: + # use the sample_create_date for the first analysis + sg_start_date = sample_create_dates[sg_id] + else: + # replace with the current analyses timestamp_completed + sg_start_date = cram.timestamp_completed.date() + + if end_date and sg_start_date > end_date: + continue + + clamped_date = ( + max(sg_start_date, start_date) if start_date else sg_start_date + ) + + if 'size' not in cram.meta: + continue + + by_date[sg_id].append((clamped_date, cram.meta.get('size') or 0)) + + return by_date + + async def get_sgs_added_by_day_by_es_indices( + self, start: datetime.date, end: datetime.date, projects: list[ProjectId] + ): + """ + Fetch the relevant analysis objects + crams from sample-metadata + to put together the proportionate_map. + """ + by_day: dict[datetime.date, set[SequencingGroupInternalId]] = defaultdict(set) + + # unfortunately, the way ES-indices are progressive, it's basically impossible + # for us to know if a sequencing-group was removed. So we assume that no SG + # was removed. So we'll sum up all SGs up to the start date and then use that + # as the starting point for the prop map. + + by_day[start] = await self.at.find_sgs_in_joint_call_or_es_index_up_to_date( + date=start + ) + + if start < ES_ANALYSIS_OBJ_INTRO_DATE: + # do a special check for joint-calling + joint_calls = await self.at.query( + AnalysisFilter( + type=GenericFilter(eq='joint-calling'), + status=GenericFilter(eq=AnalysisStatus.COMPLETED), + project=GenericFilter(in_=projects), + timestamp_completed=GenericFilter( + # midnight on the day + gt=datetime.datetime.combine(start, datetime.time()), + lte=datetime.datetime.combine(end, datetime.time()), + ), + ) + ) + for jc in joint_calls: + by_day[jc.timestamp_completed.date()].update(jc.sequencing_group_ids) + + es_indices = await self.at.query( + AnalysisFilter( + type=GenericFilter(eq='es-index'), + status=GenericFilter(eq=AnalysisStatus.COMPLETED), + project=GenericFilter(in_=projects), + timestamp_completed=GenericFilter( + # midnight on the day + gt=datetime.datetime.combine(start, datetime.time()), + lte=datetime.datetime.combine(end, datetime.time()), + ), + ) + ) + for es in es_indices: + by_day[es.timestamp_completed.date()].update(es.sequencing_group_ids) + + return by_day + # CREATE / UPDATE async def create_analysis( diff --git a/db/python/layers/sequencing_group.py b/db/python/layers/sequencing_group.py index 5b2492ab4..2e7d073f5 100644 --- a/db/python/layers/sequencing_group.py +++ b/db/python/layers/sequencing_group.py @@ -13,6 +13,7 @@ from models.models.sequencing_group import ( SequencingGroupInternal, SequencingGroupUpsertInternal, + SequencingGroupInternalId, ) from models.utils.sequencing_group_id_format import sequencing_group_id_format @@ -117,7 +118,9 @@ async def get_sequencing_groups_create_date( return await self.seqgt.get_sequencing_groups_create_date(sequencing_group_ids) - async def get_samples_create_date_from_sgs(self, sequencing_group_ids: list[int]): + async def get_samples_create_date_from_sgs( + self, sequencing_group_ids: list[int] + ) -> dict[SequencingGroupInternalId, date]: """ Get a map of {internal_sg_id: sample_date_created} for a list of sequencing_groups diff --git a/db/python/tables/analysis.py b/db/python/tables/analysis.py index 2d041ccf0..b079a0e42 100644 --- a/db/python/tables/analysis.py +++ b/db/python/tables/analysis.py @@ -1,7 +1,7 @@ # pylint: disable=too-many-instance-attributes import dataclasses +import datetime from collections import defaultdict -from datetime import date, datetime from typing import Any, Dict, List, Optional, Set, Tuple from db.python.connect import DbBase, NotFoundError @@ -29,6 +29,7 @@ class AnalysisFilter(GenericFilterModel): meta: GenericMetaFilter = None output: GenericFilter[str] = None active: GenericFilter[bool] = None + timestamp_completed: GenericFilter[datetime.datetime] = None def __hash__(self): # pylint: disable=useless-parent-delegation return super().__hash__() @@ -81,7 +82,7 @@ async def create_analysis( kv_pairs.append(('on_behalf_of', self.author)) if status == AnalysisStatus.COMPLETED: - kv_pairs.append(('timestamp_completed', datetime.utcnow())) + kv_pairs.append(('timestamp_completed', datetime.datetime.utcnow())) kv_pairs = [(k, v) for k, v in kv_pairs if v is not None] keys = [k for k, _ in kv_pairs] @@ -116,6 +117,22 @@ async def add_sequencing_groups_to_analysis( values = map(lambda sid: {'aid': analysis_id, 'sid': sid}, sequencing_group_ids) await self.connection.execute_many(_query, list(values)) + async def find_sgs_in_joint_call_or_es_index_up_to_date( + self, date: datetime.date + ) -> set[int]: + """Find all the sequencing groups that have been in a joint-call or es-index up to a date""" + _query = """ + SELECT DISTINCT asg.sequencing_group_id + FROM analysis_sequencing_group asg + INNER JOIN analysis a ON asg.analysis_id = a.id + WHERE + a.type IN ('joint-calling', 'es-index') + AND a.timestamp_completed <= :date + """ + + results = await self.connection.fetch_all(_query, {'date': date}) + return {r['sequencing_group_id'] for r in results} + async def update_analysis( self, analysis_id: int, @@ -144,7 +161,7 @@ async def update_analysis( fields['active'] = active if status == AnalysisStatus.COMPLETED: - fields['timestamp_completed'] = datetime.utcnow() + fields['timestamp_completed'] = datetime.datetime.utcnow() setters.append('timestamp_completed = :timestamp_completed') if output: @@ -553,7 +570,7 @@ async def get_seqr_stats_by_sequencing_type( async def get_sg_add_to_project_es_index( self, sg_ids: list[int] - ) -> dict[int, date]: + ) -> dict[int, datetime.date]: """ Get all the sequencing groups that should be added to seqr joint calls """ @@ -570,7 +587,5 @@ async def get_sg_add_to_project_es_index( GROUP BY a_sg.sequencing_group_id """ - rows = await self.connection.fetch_all( - _query, {'sg_ids': sg_ids} - ) + rows = await self.connection.fetch_all(_query, {'sg_ids': sg_ids}) return {r['sg_id']: r['timestamp_completed'].date() for r in rows} diff --git a/db/python/tables/sequencing_group.py b/db/python/tables/sequencing_group.py index a5059e03c..1be71e29a 100644 --- a/db/python/tables/sequencing_group.py +++ b/db/python/tables/sequencing_group.py @@ -12,7 +12,10 @@ ProjectId, to_db_json, ) -from models.models.sequencing_group import SequencingGroupInternal +from models.models.sequencing_group import ( + SequencingGroupInternal, + SequencingGroupInternalId, +) @dataclasses.dataclass(kw_only=True) @@ -213,7 +216,9 @@ async def get_sequencing_groups_create_date( rows = await self.connection.fetch_all(_query, {'sgids': sequencing_group_ids}) return {r[0]: r[1].date() for r in rows} - async def get_samples_create_date_from_sgs(self, sequencing_group_ids: list[int]): + async def get_samples_create_date_from_sgs( + self, sequencing_group_ids: list[int] + ) -> dict[SequencingGroupInternalId, date]: """ Get a map of {internal_sg_id: sample_date_created} for list of sg_ids """ diff --git a/db/python/utils.py b/db/python/utils.py index f412a4754..847ad69c1 100644 --- a/db/python/utils.py +++ b/db/python/utils.py @@ -88,19 +88,31 @@ class GenericFilter(Generic[T]): eq: T | None = None in_: list[T] | None = None nin: list[T] | None = None + gt: T | None = None + gte: T | None = None + lt: T | None = None + lte: T | None = None def __init__( self, eq: T | None = None, in_: list[T] | None = None, nin: list[T] | None = None, + gt: T | None = None, + gte: T | None = None, + lt: T | None = None, + lte: T | None = None, ): self.eq = eq self.in_ = in_ self.nin = nin + self.gt = gt + self.gte = gte + self.lt = lt + self.lte = lte def __repr__(self): - keys = ['eq', 'in_', 'nin'] + keys = ['eq', 'in_', 'nin', 'gt', 'gte', 'lt', 'lte'] inner_values = ', '.join( f'{k}={getattr(self, k)!r}' for k in keys if getattr(self, k) is not None ) @@ -113,6 +125,10 @@ def __hash__(self): self.eq, tuple(self.in_) if self.in_ is not None else None, tuple(self.nin) if self.nin is not None else None, + self.gt, + self.gte, + self.lt, + self.lte, ) ) @@ -161,6 +177,23 @@ def to_sql( k = self.generate_field_name(column + '_nin') conditionals.append(f'{column} NOT IN :{k}') values[k] = self._sql_value_prep(self.nin) + if self.gt is not None: + k = self.generate_field_name(column + '_gt') + conditionals.append(f'{column} > :{k}') + values[k] = self._sql_value_prep(self.gt) + if self.gte is not None: + k = self.generate_field_name(column + '_gte') + conditionals.append(f'{column} >= :{k}') + values[k] = self._sql_value_prep(self.gte) + if self.lt is not None: + k = self.generate_field_name(column + '_lt') + conditionals.append(f'{column} < :{k}') + values[k] = self._sql_value_prep(self.lt) + if self.lte is not None: + k = self.generate_field_name(column + '_lte') + conditionals.append(f'{column} <= :{k}') + values[k] = self._sql_value_prep(self.lte) + return ' AND '.join(conditionals), values @staticmethod diff --git a/models/models/analysis.py b/models/models/analysis.py index cb519a108..4d33bfa04 100644 --- a/models/models/analysis.py +++ b/models/models/analysis.py @@ -1,6 +1,6 @@ import enum import json -from datetime import date +from datetime import date, datetime from typing import Any from pydantic import BaseModel @@ -21,7 +21,7 @@ class AnalysisInternal(SMBase): status: AnalysisStatus output: str = None sequencing_group_ids: list[int] = [] - timestamp_completed: str | None = None + timestamp_completed: datetime | None = None project: int | None = None active: bool | None = None meta: dict[str, Any] = {} @@ -40,8 +40,8 @@ def from_db(**kwargs): if meta and isinstance(meta, str): meta = json.loads(meta) - if timestamp_completed is not None and not isinstance(timestamp_completed, str): - timestamp_completed = timestamp_completed.isoformat() + if timestamp_completed and isinstance(timestamp_completed, str): + timestamp_completed = datetime.fromisoformat(timestamp_completed) sequencing_group_ids = [] if sg := kwargs.pop('sequencing_group_id', None): @@ -72,7 +72,9 @@ def to_external(self): self.sequencing_group_ids ), output=self.output, - timestamp_completed=self.timestamp_completed, + timestamp_completed=self.timestamp_completed.isoformat() + if self.timestamp_completed + else None, project=self.project, active=self.active, meta=self.meta, @@ -106,7 +108,8 @@ def to_internal(self): self.sequencing_group_ids ), output=self.output, - timestamp_completed=self.timestamp_completed, + # don't allow this to be set + timestamp_completed=None, project=self.project, active=self.active, meta=self.meta, diff --git a/models/models/sequencing_group.py b/models/models/sequencing_group.py index 1b0bbd3f4..b1b493c8c 100644 --- a/models/models/sequencing_group.py +++ b/models/models/sequencing_group.py @@ -9,6 +9,10 @@ ) +SequencingGroupInternalId = int +SequencingGroupExternalId = str + + class SequencingGroupInternal(SMBase): """ A group of sequences that would be aligned and analysed together. @@ -27,7 +31,7 @@ class SequencingGroupInternal(SMBase): # similar to a sample ID, this is stored internally as an integer, # but displayed as a string - id: int | None = None + id: SequencingGroupInternalId | None = None type: str | None = None technology: str | None = None platform: str | None = None @@ -71,7 +75,7 @@ def to_external(self): class NestedSequencingGroupInternal(SMBase): """SequencingGroupInternal with nested assays""" - id: int | None = None + id: SequencingGroupInternalId | None = None type: str | None = None technology: str | None = None platform: str | None = None @@ -98,7 +102,7 @@ class SequencingGroupUpsertInternal(SMBase): Upsert model for sequence group """ - id: int | None = None + id: SequencingGroupInternalId | None = None type: str | None = None technology: str | None = None # fk platform: str | None = None # fk @@ -134,7 +138,7 @@ def to_external(self): class SequencingGroup(SMBase): """External model for sequencing group""" - id: str + id: SequencingGroupExternalId type: str technology: str platform: str # uppercase @@ -148,7 +152,7 @@ class SequencingGroup(SMBase): class NestedSequencingGroup(SMBase): """External model for sequencing group with nested assays""" - id: str + id: SequencingGroupExternalId type: str technology: str platform: str diff --git a/test/test_analysis.py b/test/test_analysis.py index 72f678778..1676c6f6a 100644 --- a/test/test_analysis.py +++ b/test/test_analysis.py @@ -1,5 +1,4 @@ # pylint: disable=invalid-overridden-method -from datetime import datetime, timedelta from test.testbase import DbIsolatedTest, run_as_sync from db.python.layers.analysis import AnalysisLayer @@ -12,7 +11,6 @@ from models.models import ( AnalysisInternal, AssayUpsertInternal, - ProportionalDateTemporalMethod, SampleUpsertInternal, SequencingGroupUpsertInternal, ) @@ -159,251 +157,3 @@ async def test_get_analysis(self): ] self.assertEqual(analyses, expected) - - @run_as_sync - async def test_get_sequencing_group_file_sizes_single(self): - """ - Test retrieval of sample file sizes over time - """ - - today = datetime.utcnow().date() - - # insert analysis - await self.al.create_analysis( - AnalysisInternal( - type='cram', - status=AnalysisStatus.COMPLETED, - sequencing_group_ids=[self.genome_sequencing_group_id], - meta={'sequencing_type': 'genome', 'size': 1024}, - ) - ) - - result = await self.al.get_sequencing_group_file_sizes( - project_ids=[self.project_id], - temporal_methods=[ProportionalDateTemporalMethod.SAMPLE_CREATE_DATE], - ) - expected = { - 'project': self.project_id, - 'sequencing_groups': { - self.genome_sequencing_group_id: [ - { - 'start': today, - 'end': None, - 'size': 1024, - } - ] - }, - } - - # Check output is formatted correctly - self.assertEqual( - 1, len(result[ProportionalDateTemporalMethod.SAMPLE_CREATE_DATE]) - ) - self.assertDictEqual( - expected, result[ProportionalDateTemporalMethod.SAMPLE_CREATE_DATE][0] - ) - - @run_as_sync - async def test_get_sequencing_group_file_sizes_single_sample_double_sg(self): - """ - Test getting file sizes for sequencing groups - 1 sample, w/ 2 sequencing groups - """ - today = datetime.utcnow().date() - - # Add genome cram - await self.al.create_analysis( - AnalysisInternal( - type='cram', - status=AnalysisStatus.COMPLETED, - sequencing_group_ids=[self.genome_sequencing_group_id], - meta={'sequencing_type': 'genome', 'size': 1024}, - ) - ) - - # Add exome cram - await self.al.create_analysis( - AnalysisInternal( - type='cram', - status=AnalysisStatus.COMPLETED, - sequencing_group_ids=[self.exome_sequencing_group_id], - meta={'sequencing_type': 'exome', 'size': 3141}, - ) - ) - expected = { - 'project': self.project_id, - 'sequencing_groups': { - self.genome_sequencing_group_id: [ - { - 'start': today, - 'end': None, - 'size': 1024, - } - ], - self.exome_sequencing_group_id: [ - { - 'start': today, - 'end': None, - 'size': 3141, - } - ], - }, - } - - # Assert that the exome size was added correctly - result = await self.al.get_sequencing_group_file_sizes( - project_ids=[self.project_id], - temporal_methods=[ProportionalDateTemporalMethod.SAMPLE_CREATE_DATE], - ) - self.assertDictEqual( - expected, result[ProportionalDateTemporalMethod.SAMPLE_CREATE_DATE][0] - ) - - @run_as_sync - async def test_get_sequencing_group_file_sizes_exclusive_date_range(self): - """ - Exclusive date range returning no data - """ - today = datetime.utcnow().date() - - # Assert that if we select a date range outside of sample creation date - # that is doesn't show up in the map - yesterday = today - timedelta(days=3) - result = await self.al.get_sequencing_group_file_sizes( - project_ids=[self.project_id], - end_date=yesterday, - temporal_methods=[ProportionalDateTemporalMethod.SAMPLE_CREATE_DATE], - ) - - self.assertEqual([], result[ProportionalDateTemporalMethod.SAMPLE_CREATE_DATE]) - - @run_as_sync - async def test_get_sequencing_group_file_sizes_newer_sample(self): - """ - Update analysis for sequencing group, making sure we only return the later one - """ - today = datetime.utcnow().date() - - # Add genome cram that's older - await self.al.create_analysis( - AnalysisInternal( - type='cram', - status=AnalysisStatus.COMPLETED, - sequencing_group_ids=[self.genome_sequencing_group_id], - meta={'sequencing_type': 'genome', 'size': 1024}, - ) - ) - - # Add another genome cram that's newer - await self.al.create_analysis( - AnalysisInternal( - type='cram', - status=AnalysisStatus.COMPLETED, - sequencing_group_ids=[self.genome_sequencing_group_id], - meta={'sequencing_type': 'genome', 'size': 11111}, - ) - ) - expected = { - 'project': self.project_id, - 'sequencing_groups': { - self.genome_sequencing_group_id: [ - { - 'start': today, - 'end': None, - 'size': 11111, - } - ] - }, - } - - result = await self.al.get_sequencing_group_file_sizes( - project_ids=[self.project_id], - temporal_methods=[ProportionalDateTemporalMethod.SAMPLE_CREATE_DATE], - ) - self.assertDictEqual( - expected, result[ProportionalDateTemporalMethod.SAMPLE_CREATE_DATE][0] - ) - - @run_as_sync - async def test_get_sequencing_group_file_sizes_two_samples(self): - """Test that it works for two samples""" - today = datetime.utcnow().date() - - # Add another sample and it's analysis cram as well - sample_2 = await self.sl.upsert_sample( - SampleUpsertInternal( - external_id='Test02', - type='blood', - meta={'meta': 'meta ;)'}, - active=True, - sequencing_groups=[ - SequencingGroupUpsertInternal( - type='genome', - technology='short-read', - platform='illumina', - meta={}, - sample_id=None, - assays=[ - AssayUpsertInternal( - type='sequencing', - meta={ - 'sequencing_type': 'genome', - 'sequencing_technology': 'short-read', - 'sequencing_platform': 'illumina', - }, - ) - ], - ) - ], - ) - ) - # add for sg 1 - await self.al.create_analysis( - AnalysisInternal( - type='cram', - status=AnalysisStatus.COMPLETED, - sequencing_group_ids=[self.genome_sequencing_group_id], - meta={'sequencing_type': 'genome', 'size': 1024}, - ) - ) - - sequencing_group_2_id = sample_2.sequencing_groups[0].id - await self.al.create_analysis( - AnalysisInternal( - type='cram', - status=AnalysisStatus.COMPLETED, - sequencing_group_ids=[sequencing_group_2_id], - meta={'sequencing_type': 'genome', 'size': 987654321}, - ) - ) - - expected = { - 'project': self.project_id, - 'sequencing_groups': { - self.genome_sequencing_group_id: [ - { - 'start': today, - 'end': None, - 'size': 1024, - } - ], - sequencing_group_2_id: [ - { - 'start': today, - 'end': None, - 'size': 987654321, - } - ], - }, - } - - result = await self.al.get_sequencing_group_file_sizes( - project_ids=[self.project_id], - temporal_methods=[ProportionalDateTemporalMethod.SAMPLE_CREATE_DATE], - ) - self.assertDictEqual( - expected, result[ProportionalDateTemporalMethod.SAMPLE_CREATE_DATE][0] - ) - self.assertDictEqual( - expected, result[ProportionalDateTemporalMethod.SAMPLE_CREATE_DATE][0] - ) diff --git a/web/src/pages/billing/SeqrProportionalMapGraph.tsx b/web/src/pages/billing/SeqrProportionalMapGraph.tsx index 3156c8086..14e3c14b0 100644 --- a/web/src/pages/billing/SeqrProportionalMapGraph.tsx +++ b/web/src/pages/billing/SeqrProportionalMapGraph.tsx @@ -25,6 +25,11 @@ interface ISeqrProportionalMapGraphProps { end: string } +const TEMPORAL_METHODS_TO_DISPLAY = [ + ProportionalDateTemporalMethod.SampleCreateDate, + ProportionalDateTemporalMethod.EsIndexDate, +] + const SeqrProportionalMapGraph: React.FunctionComponent = ({ start, end, @@ -33,7 +38,7 @@ const SeqrProportionalMapGraph: React.FunctionComponent() const [temporalMethod, setTemporalMethod] = React.useState( - ProportionalDateTemporalMethod.SampleCreateDate + TEMPORAL_METHODS_TO_DISPLAY[0] ) const [projectSelections, setProjectSelections] = React.useState< { [key: string]: boolean } | undefined @@ -64,10 +69,7 @@ const SeqrProportionalMapGraph: React.FunctionComponent{error}}