Skip to content

Commit

Permalink
cleaned up the layer logic, simplified
Browse files Browse the repository at this point in the history
  • Loading branch information
nevoodoo committed Jun 7, 2024
1 parent 2d5bc22 commit 5f751db
Showing 1 changed file with 126 additions and 154 deletions.
280 changes: 126 additions & 154 deletions db/python/layers/ourdna/dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,93 +15,106 @@
from models.models.participant import ParticipantInternal


class OurDnaDashboardLayer(BaseLayer):
"""Layer for analysis logic"""
class SampleProcessMeta:
"""Helper class to encapsulate sample metadata properties and calculations."""

def __init__(self, connection: Connection):
super().__init__(connection)

self.sample_layer = SampleLayer(connection)
self.participant_layer = ParticipantLayer(connection)
meta: dict
_collection_to_process_start_time: int | None = None
_collection_to_process_end_time: int | None = None
_processing_times_by_site: tuple[str | None, int | None] | None = None

@staticmethod
def get_meta_property(sample: Sample, property_name: str) -> Any:
"""
Get a property from the meta field of a sample, accounting for hyphenated property names
or underscores in the property name
"""
return sample.meta.get(property_name) or sample.meta.get(
def get_property(self, property_name: str) -> Any:
"""Get a property from the meta field of a sample."""
return self.meta.get(property_name) or self.meta.get(
property_name.replace('-', '_')
)

def get_collection_to_process_end_time(self, sample: Sample) -> int | None:
"""
I want to know how long it took between blood collection and sample processing - SAMPLE TABLE
@fields: collection-time, process-end-time
"""
_collection_time = self.get_meta_property(
sample=sample, property_name='collection-time'
)
_process_end_time = self.get_meta_property(
sample=sample, property_name='process-end-time'
)
if _collection_time is None or _process_end_time is None:
def get_time_difference_in_seconds(
self, start_property: str, end_property: str
) -> int | None:
"""Calculate time difference in seconds between two meta properties."""
start_time = self.get_property(start_property)
end_time = self.get_property(end_property)
if start_time is None or end_time is None:
return None

time_taken = datetime.strptime(
_process_end_time, '%Y-%m-%d %H:%M:%S'
) - datetime.strptime(_collection_time, '%Y-%m-%d %H:%M:%S')

end_time, '%Y-%m-%d %H:%M:%S'
) - datetime.strptime(start_time, '%Y-%m-%d %H:%M:%S')
return int(time_taken.total_seconds())

def get_processing_times_by_site(
self, sample: Sample
) -> tuple[str | None, int | None]:
"""
I want to know what the sample processing times were for samples at each designated site (BBV, Garvan, Westmead, etc)
@fields: process-start-time, process-end-time, processing-site where the time fields are of the format '2022-07-03 13:28:00'
"""
_process_start_time = self.get_meta_property(
sample=sample, property_name='process-start-time'
)
_process_end_time = self.get_meta_property(
sample=sample, property_name='process-end-time'
)
_processing_site = self.get_meta_property(
sample=sample, property_name='processing-site'
)
if (
_process_start_time is None
or _process_end_time is None
or _processing_site is None
):
return None, None

processing_time = datetime.strptime(
_process_end_time, '%Y-%m-%d %H:%M:%S'
) - datetime.strptime(_process_start_time, '%Y-%m-%d %H:%M:%S')

return _processing_site, int(processing_time.total_seconds())

def get_collection_to_process_start_time(self, sample: Sample) -> int | None:
"""
I want to know how long it has been since the sample was collected - SAMPLE TABLE
@fields: collection-time, process-start-time
"""
_collection_time = self.get_meta_property(
sample=sample, property_name='collection-time'
)
_process_start_time = self.get_meta_property(
sample=sample, property_name='process-start-time'
)
if _collection_time is None or _process_start_time is None:
return None
@staticmethod
def from_sample(sample: Sample) -> 'SampleProcessMeta':
"""Create a SampleProcessMeta instance from a Sample instance."""
instance = SampleProcessMeta()
instance.meta = sample.meta
return instance

@property
def collection_to_process_end_time(self) -> int | None:
"""Get the time taken from collection to process end."""
if self._collection_to_process_end_time is None:
self._collection_to_process_end_time = self.get_time_difference_in_seconds(
'collection-time', 'process-end-time'
)
return self._collection_to_process_end_time

@property
def processing_times_by_site(self) -> tuple[str | None, int | None]:
"""Get processing times and site for a sample."""
if self._processing_times_by_site is None:
processing_site = self.get_property('processing-site')
processing_time = self.get_time_difference_in_seconds(
'process-start-time', 'process-end-time'
)
self._processing_times_by_site = (processing_site, processing_time)
return self._processing_times_by_site

@property
def collection_to_process_start_time(self) -> int | None:
"""Get the time taken from collection to process start."""
if self._collection_to_process_start_time is None:
self._collection_to_process_start_time = (
self.get_time_difference_in_seconds(
'collection-time', 'process-start-time'
)
)
return self._collection_to_process_start_time

@property
def get_lost_sample_properties(self) -> dict[str, Any]:
"""Returns the properties to report for a sample that has been lost"""
return {
'collection_time': self.get_property('collection-time'),
'process_start_time': self.get_property('process-start-time'),
'process_end_time': self.get_property('process-end-time'),
'received_time': self.get_property('received-time'),
'received_by': self.get_property('received-by'),
'collection_lab': self.get_property('collection-lab'),
'courier': self.get_property('courier'),
'courier_tracking_number': self.get_property('courier-tracking-number'),
'courier_scheduled_pickup_time': self.get_property(
'courier-scheduled-pickup-time'
),
'courier_actual_pickup_time': self.get_property(
'courier-actual-pickup-time'
),
'courier_scheduled_dropoff_time': self.get_property(
'courier-scheduled-dropoff-time'
),
'courier_actual_dropoff_time': self.get_property(
'courier-actual-dropoff-time'
),
}

time_taken = datetime.strptime(
_process_start_time, '%Y-%m-%d %H:%M:%S'
) - datetime.strptime(_collection_time, '%Y-%m-%d %H:%M:%S')

return int(time_taken.total_seconds())
class OurDnaDashboardLayer(BaseLayer):
"""Layer for analysis logic"""

def __init__(self, connection: Connection):
super().__init__(connection)

self.sample_layer = SampleLayer(connection)
self.participant_layer = ParticipantLayer(connection)

async def query(
self,
Expand Down Expand Up @@ -181,10 +194,11 @@ def process_collection_to_process_end_times(self, samples: list[Sample]) -> dict
collection_to_process_end_time: dict[str, int] = {}

for sample in samples:
time_to_process_end = self.get_collection_to_process_end_time(sample)
if time_to_process_end is not None:
collection_to_process_end_time[sample.id] = time_to_process_end

processed_meta = SampleProcessMeta.from_sample(sample)
if processed_meta.collection_to_process_end_time is not None:
collection_to_process_end_time[sample.id] = (
processed_meta.collection_to_process_end_time
)
return collection_to_process_end_time

def process_collection_to_process_end_times_statistics(
Expand Down Expand Up @@ -221,10 +235,14 @@ def process_collection_to_process_end_times_24h(
collection_to_process_end_time_24h: dict[str, int] = {}

for sample in samples:
time_to_process_end = self.get_collection_to_process_end_time(sample)
if time_to_process_end is not None and time_to_process_end > 24 * 60 * 60:
collection_to_process_end_time_24h[sample.id] = time_to_process_end

processed_meta = SampleProcessMeta.from_sample(sample)
if (
processed_meta.collection_to_process_end_time
and processed_meta.collection_to_process_end_time > 24 * 60 * 60
):
collection_to_process_end_time_24h[sample.id] = (
processed_meta.collection_to_process_end_time
)
return collection_to_process_end_time_24h

def proccess_processing_times_by_site(self, samples: list[Sample]) -> dict:
Expand All @@ -234,18 +252,17 @@ def proccess_processing_times_by_site(self, samples: list[Sample]) -> dict:
)

for sample in samples:
processing_site, processing_time = self.get_processing_times_by_site(sample)
if processing_site is not None and processing_time is not None:
processed_meta = SampleProcessMeta.from_sample(sample)
processing_site, processing_time = processed_meta.processing_times_by_site
if processing_site and processing_time:
hour_bucket = ceil(processing_time / 3600)
processing_times_by_site[processing_site][hour_bucket] += 1

for site in processing_times_by_site:
min_bucket = min(processing_times_by_site[site].keys())
max_bucket = max(processing_times_by_site[site].keys())

min_bucket = min(processing_times_by_site[site])
max_bucket = max(processing_times_by_site[site])
for i in range(min_bucket, max_bucket + 1):
if i not in processing_times_by_site[site]:
processing_times_by_site[site][i] = 0
processing_times_by_site[site].setdefault(i, 0)

return processing_times_by_site

Expand All @@ -256,14 +273,13 @@ def process_total_samples_by_collection_event_name(
total_samples_by_collection_event_name: dict[str, int] = defaultdict(int)

for sample in samples:
_collection_event_name = self.get_meta_property(
sample=sample, property_name='collection-event-name'
processed_meta = SampleProcessMeta.from_sample(sample)
_collection_event_name = processed_meta.get_property(
'collection-event-name'
)
if _collection_event_name is not None:
total_samples_by_collection_event_name[_collection_event_name] += 1
else:
total_samples_by_collection_event_name['Unknown'] += 1

total_samples_by_collection_event_name[
_collection_event_name or 'Unknown'
] += 1
return total_samples_by_collection_event_name

def process_samples_lost_after_collection(
Expand All @@ -273,71 +289,27 @@ def process_samples_lost_after_collection(
samples_lost_after_collection: list[OurDNALostSample] = []

for sample in samples:
time_to_process_start = self.get_collection_to_process_start_time(sample)

if (
time_to_process_start is not None
and time_to_process_start > 72 * 60 * 60
):
processed_meta = SampleProcessMeta.from_sample(sample)
time_to_process_start = processed_meta.collection_to_process_start_time
if time_to_process_start and time_to_process_start > 72 * 60 * 60:
samples_lost_after_collection.append(
OurDNALostSample(
**processed_meta.get_lost_sample_properties,
sample_id=sample.id,
time_to_process_start=time_to_process_start,
collection_time=self.get_meta_property(
sample=sample, property_name='collection-time'
),
process_start_time=self.get_meta_property(
sample=sample, property_name='process-start-time'
),
process_end_time=self.get_meta_property(
sample=sample, property_name='process-end-time'
),
received_time=self.get_meta_property(
sample=sample, property_name='received-time'
),
received_by=self.get_meta_property(
sample=sample, property_name='received-by'
),
collection_lab=self.get_meta_property(
sample=sample, property_name='collection-lab'
),
courier=self.get_meta_property(
sample=sample, property_name='courier'
),
courier_tracking_number=self.get_meta_property(
sample=sample, property_name='courier-tracking-number'
),
courier_scheduled_pickup_time=self.get_meta_property(
sample=sample, property_name='courier-scheduled-pickup-time'
),
courier_actual_pickup_time=self.get_meta_property(
sample=sample, property_name='courier-actual-pickup-time'
),
courier_scheduled_dropoff_time=self.get_meta_property(
sample=sample,
property_name='courier-scheduled-dropoff-time',
),
courier_actual_dropoff_time=self.get_meta_property(
sample=sample, property_name='courier-actual-dropoff-time'
),
)
)

return samples_lost_after_collection

def process_samples_concentration_gt_1ug(self, samples: list[Sample]) -> dict:
"""Get the concentration of the sample where the concentration is more than 1 ug of DNA"""
samples_concentration_gt_1ug: dict[str, float] = {}

for sample in samples:
if (
sample.meta.get('concentration')
and float(sample.meta['concentration']) > 1
):
samples_concentration_gt_1ug[sample.id] = float(
sample.meta['concentration']
)

processed_meta = SampleProcessMeta.from_sample(sample)
concentration = processed_meta.get_property('concentration')
if concentration and float(concentration) > 1:
samples_concentration_gt_1ug[sample.id] = float(concentration)
return samples_concentration_gt_1ug

def process_participants_consented_not_collected(
Expand All @@ -350,10 +322,11 @@ def process_participants_consented_not_collected(
for participant_id, samples in grouped_participants_samples.items():
participant = participants[participant_id]
if participant.meta.get('consent') and any(
sample.meta.get('collection-time') is None for sample in samples
SampleProcessMeta.from_sample(sample).get_property('collection-time')
is None
for sample in samples
):
filtered_participants.append(participant.id)

return filtered_participants

def process_participants_signed_not_consented(
Expand All @@ -363,9 +336,8 @@ def process_participants_signed_not_consented(
) -> list[int]:
"""Get the participants who have signed but have not been consented"""
filtered_participants: list[int] = []
for participant_id, _ in grouped_participants_samples.items():
for participant_id in grouped_participants_samples:
participant = participants[participant_id]
if not participant.meta.get('consent'):
filtered_participants.append(participant.id)

return filtered_participants

0 comments on commit 5f751db

Please sign in to comment.