Skip to content

Commit

Permalink
encoded key accesses into fields
Browse files Browse the repository at this point in the history
  • Loading branch information
nevoodoo committed Jun 11, 2024
1 parent 5f751db commit dc98692
Showing 1 changed file with 99 additions and 72 deletions.
171 changes: 99 additions & 72 deletions db/python/layers/ourdna/dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import asyncio
from collections import defaultdict
from datetime import datetime
from functools import cached_property
from math import ceil
from typing import Any

Expand All @@ -18,10 +19,9 @@
class SampleProcessMeta:
"""Helper class to encapsulate sample metadata properties and calculations."""

meta: dict
_collection_to_process_start_time: int | None = None
_collection_to_process_end_time: int | None = None
_processing_times_by_site: tuple[str | None, int | None] | None = None
def __init__(self, sample: Sample):
self.sample = sample
self.meta = sample.meta

def get_property(self, property_name: str) -> Any:
"""Get a property from the meta field of a sample."""
Expand All @@ -43,68 +43,101 @@ def get_time_difference_in_seconds(
return int(time_taken.total_seconds())

@staticmethod
def from_sample(sample: Sample) -> 'SampleProcessMeta':
"""Create a SampleProcessMeta instance from a Sample instance."""
instance = SampleProcessMeta()
instance.meta = sample.meta
return instance
def try_parse_datetime(d: str) -> datetime | None:
"""
Attempts to parse a datetime string in the format '%Y-%m-%d %H:%M:%S'.
Args:
d (str): The datetime string to parse.
Returns:
datetime | None: A datetime object if parsing is successful, otherwise None.
"""
try:
return datetime.strptime(d, '%Y-%m-%d %H:%M:%S')
except TypeError as e:
# Optionally log the error message
print(f'Datetime passed is not a str: {e}')
return None
except ValueError as e:
# Optionally log the error message
print(f'Error parsing datetime: {e}')
return None

@property
def collection_to_process_end_time(self) -> int | None:
"""Get the time taken from collection to process end."""
if self._collection_to_process_end_time is None:
self._collection_to_process_end_time = self.get_time_difference_in_seconds(
'collection-time', 'process-end-time'
)
return self._collection_to_process_end_time
@cached_property
def collection_time(self) -> datetime | None:
"""Returns the collection time for a sample."""
return self.try_parse_datetime(self.get_property('collection-time'))

@cached_property
def process_start_time(self) -> datetime | None:
"""Returns the process start time for a sample."""
return self.try_parse_datetime(self.get_property('process-start-time'))

@cached_property
def process_end_time(self) -> datetime | None:
"""Returns the process end time for a sample."""
return self.try_parse_datetime(self.get_property('process-end-time'))

@cached_property
def processing_time(self) -> int | None:
"""Get processing time for a sample."""
if not (self.process_start_time and self.process_end_time):
return None
return int((self.process_end_time - self.process_start_time).total_seconds())

@property
def processing_times_by_site(self) -> tuple[str | None, int | None]:
@cached_property
def processing_time_by_site(self) -> tuple[str | None, int | None]:
"""Get processing times and site for a sample."""
if self._processing_times_by_site is None:
processing_site = self.get_property('processing-site')
processing_time = self.get_time_difference_in_seconds(
'process-start-time', 'process-end-time'
)
self._processing_times_by_site = (processing_site, processing_time)
return self._processing_times_by_site
return self.get_property('processing-site'), self.processing_time

@property
@cached_property
def collection_to_process_end_time(self) -> int | None:
"""Get the time taken from collection to process end."""
if self.collection_time and self.process_end_time:
return int((self.process_end_time - self.collection_time).total_seconds())
return None

@cached_property
def collection_to_process_start_time(self) -> int | None:
"""Get the time taken from collection to process start."""
if self._collection_to_process_start_time is None:
self._collection_to_process_start_time = (
self.get_time_difference_in_seconds(
'collection-time', 'process-start-time'
)
)
return self._collection_to_process_start_time

@property
def get_lost_sample_properties(self) -> dict[str, Any]:
"""Returns the properties to report for a sample that has been lost"""
return {
'collection_time': self.get_property('collection-time'),
'process_start_time': self.get_property('process-start-time'),
'process_end_time': self.get_property('process-end-time'),
'received_time': self.get_property('received-time'),
'received_by': self.get_property('received-by'),
'collection_lab': self.get_property('collection-lab'),
'courier': self.get_property('courier'),
'courier_tracking_number': self.get_property('courier-tracking-number'),
'courier_scheduled_pickup_time': self.get_property(
if self.collection_time and self.process_start_time:
return int((self.process_start_time - self.collection_time).total_seconds())
return None

@cached_property
def get_lost_sample_properties(self) -> OurDNALostSample:
"""Returns the normalised properties to report for a sample that has been lost"""
return OurDNALostSample(
sample_id=self.sample.id,
collection_time=self.get_property('collection-time'),
process_start_time=self.get_property('process-start-time'),
process_end_time=self.get_property('process-end-time'),
received_time=self.get_property('received-time'),
received_by=self.get_property('received-by'),
collection_lab=self.get_property('collection-lab'),
courier=self.get_property('courier'),
courier_tracking_number=self.get_property('courier-tracking-number'),
courier_scheduled_pickup_time=self.get_property(
'courier-scheduled-pickup-time'
),
'courier_actual_pickup_time': self.get_property(
'courier-actual-pickup-time'
),
'courier_scheduled_dropoff_time': self.get_property(
courier_actual_pickup_time=self.get_property('courier-actual-pickup-time'),
courier_scheduled_dropoff_time=self.get_property(
'courier-scheduled-dropoff-time'
),
'courier_actual_dropoff_time': self.get_property(
courier_actual_dropoff_time=self.get_property(
'courier-actual-dropoff-time'
),
}
time_to_process_start=self.collection_to_process_start_time,
)

@cached_property
def is_lost(self) -> bool:
"""Returns True if the sample is considered lost, otherwise False."""
time_to_process_start = self.collection_to_process_start_time
if time_to_process_start and time_to_process_start > 72 * 60 * 60:
return True
return False


class OurDnaDashboardLayer(BaseLayer):
Expand Down Expand Up @@ -194,7 +227,7 @@ def process_collection_to_process_end_times(self, samples: list[Sample]) -> dict
collection_to_process_end_time: dict[str, int] = {}

for sample in samples:
processed_meta = SampleProcessMeta.from_sample(sample)
processed_meta = SampleProcessMeta(sample)
if processed_meta.collection_to_process_end_time is not None:
collection_to_process_end_time[sample.id] = (
processed_meta.collection_to_process_end_time
Expand All @@ -203,7 +236,7 @@ def process_collection_to_process_end_times(self, samples: list[Sample]) -> dict

def process_collection_to_process_end_times_statistics(
self, collection_to_process_end_time: dict[str, int]
) -> dict:
) -> dict[str, float | None]:
"""Get the statistics for the time between blood collection and sample processing"""
collection_to_process_end_time_statistics: dict[str, float | None] = {}

Expand Down Expand Up @@ -235,7 +268,7 @@ def process_collection_to_process_end_times_24h(
collection_to_process_end_time_24h: dict[str, int] = {}

for sample in samples:
processed_meta = SampleProcessMeta.from_sample(sample)
processed_meta = SampleProcessMeta(sample)
if (
processed_meta.collection_to_process_end_time
and processed_meta.collection_to_process_end_time > 24 * 60 * 60
Expand All @@ -252,8 +285,8 @@ def proccess_processing_times_by_site(self, samples: list[Sample]) -> dict:
)

for sample in samples:
processed_meta = SampleProcessMeta.from_sample(sample)
processing_site, processing_time = processed_meta.processing_times_by_site
processed_meta = SampleProcessMeta(sample)
processing_site, processing_time = processed_meta.processing_time_by_site
if processing_site and processing_time:
hour_bucket = ceil(processing_time / 3600)
processing_times_by_site[processing_site][hour_bucket] += 1
Expand All @@ -273,7 +306,7 @@ def process_total_samples_by_collection_event_name(
total_samples_by_collection_event_name: dict[str, int] = defaultdict(int)

for sample in samples:
processed_meta = SampleProcessMeta.from_sample(sample)
processed_meta = SampleProcessMeta(sample)
_collection_event_name = processed_meta.get_property(
'collection-event-name'
)
Expand All @@ -289,24 +322,20 @@ def process_samples_lost_after_collection(
samples_lost_after_collection: list[OurDNALostSample] = []

for sample in samples:
processed_meta = SampleProcessMeta.from_sample(sample)
time_to_process_start = processed_meta.collection_to_process_start_time
if time_to_process_start and time_to_process_start > 72 * 60 * 60:
processed_meta = SampleProcessMeta(sample)
if processed_meta.is_lost:
samples_lost_after_collection.append(
OurDNALostSample(
**processed_meta.get_lost_sample_properties,
sample_id=sample.id,
time_to_process_start=time_to_process_start,
)
processed_meta.get_lost_sample_properties
)

return samples_lost_after_collection

def process_samples_concentration_gt_1ug(self, samples: list[Sample]) -> dict:
"""Get the concentration of the sample where the concentration is more than 1 ug of DNA"""
samples_concentration_gt_1ug: dict[str, float] = {}

for sample in samples:
processed_meta = SampleProcessMeta.from_sample(sample)
processed_meta = SampleProcessMeta(sample)
concentration = processed_meta.get_property('concentration')
if concentration and float(concentration) > 1:
samples_concentration_gt_1ug[sample.id] = float(concentration)
Expand All @@ -322,9 +351,7 @@ def process_participants_consented_not_collected(
for participant_id, samples in grouped_participants_samples.items():
participant = participants[participant_id]
if participant.meta.get('consent') and any(
SampleProcessMeta.from_sample(sample).get_property('collection-time')
is None
for sample in samples
SampleProcessMeta(sample).collection_time is None for sample in samples
):
filtered_participants.append(participant.id)
return filtered_participants
Expand Down

0 comments on commit dc98692

Please sign in to comment.