Skip to content

Commit

Permalink
Merge pull request #697 from populationgenomics/dev
Browse files Browse the repository at this point in the history
Release
  • Loading branch information
milo-hyben authored Mar 7, 2024
2 parents 6d8a041 + f67695d commit 5b55a13
Show file tree
Hide file tree
Showing 41 changed files with 3,664 additions and 68 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/deploy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ jobs:
- uses: actions/setup-python@v4
with:
python-version: "3.10"
python-version: "3.11"

- uses: actions/setup-java@v3
with:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/lint.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:

- uses: actions/setup-python@v4
with:
python-version: "3.10"
python-version: "3.11"

- uses: actions/setup-java@v2
with:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:

- uses: actions/setup-python@v5
with:
python-version: "3.10"
python-version: "3.11"

- uses: actions/setup-java@v4
with:
Expand Down
40 changes: 25 additions & 15 deletions api/routes/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,25 @@ class AnalysisQueryModel(BaseModel):
def to_filter(self, project_id_map: dict[str, int]) -> AnalysisFilter:
"""Convert to internal analysis filter"""
return AnalysisFilter(
sample_id=GenericFilter(
in_=sample_id_transform_to_raw_list(self.sample_ids)
)
if self.sample_ids
else None,
sequencing_group_id=GenericFilter(
in_=sequencing_group_id_transform_to_raw_list(self.sequencing_group_ids)
)
if self.sequencing_group_ids
else None,
project=GenericFilter(in_=[project_id_map.get(p) for p in self.projects])
if self.projects
else None,
sample_id=(
GenericFilter(in_=sample_id_transform_to_raw_list(self.sample_ids))
if self.sample_ids
else None
),
sequencing_group_id=(
GenericFilter(
in_=sequencing_group_id_transform_to_raw_list(
self.sequencing_group_ids
)
)
if self.sequencing_group_ids
else None
),
project=(
GenericFilter(in_=[project_id_map.get(p) for p in self.projects])
if self.projects
else None
),
type=GenericFilter(eq=self.type) if self.type else None,
)

Expand Down Expand Up @@ -241,8 +247,9 @@ async def query_analyses(
@router.get('/analysis-runner', operation_id='getAnalysisRunnerLog')
async def get_analysis_runner_log(
project_names: list[str] = Query(None), # type: ignore
author: str = None,
# author: str = None, # not implemented yet, uncomment when we do
output_dir: str = None,
ar_guid: str = None,
connection: Connection = get_projectless_db_connection,
) -> list[AnalysisInternal]:
"""
Expand All @@ -257,7 +264,10 @@ async def get_analysis_runner_log(
)

results = await atable.get_analysis_runner_log(
project_ids=project_ids, author=author, output_dir=output_dir
project_ids=project_ids,
# author=author,
output_dir=output_dir,
ar_guid=ar_guid,
)
return [a.to_external() for a in results]

Expand Down
10 changes: 6 additions & 4 deletions db/backup/backup.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
#!/usr/bin/python3.7
#!/usr/bin/python3
# pylint: disable=broad-exception-caught,broad-exception-raised
""" Daily back up function for databases within a local
MariaDB instance """

import json
import os
import subprocess
from datetime import datetime
from typing import Literal
Expand Down Expand Up @@ -50,7 +51,7 @@ def perform_backup():
tmp_dir = f'backup_{timestamp_str}'
subprocess.run(['mkdir', tmp_dir], check=True)
# grant permissions, so that mariadb can read ib_logfile0
subprocess.run(['sudo', 'chmod', '-R', '777', tmp_dir], check=True)
subprocess.run(['sudo', 'chmod', '-R', '770', tmp_dir], check=True)

credentials = read_db_credentials()
db_username = credentials['username']
Expand All @@ -66,10 +67,11 @@ def perform_backup():
'--backup',
f'--target-dir={tmp_dir}/',
f'--user={db_username}',
f'-p{db_password}',
],
check=True,
stderr=subprocess.DEVNULL,
# pass the password with stdin to avoid it being visible in the process list
env={'MYSQL_PWD': db_password, **os.environ},
)

except subprocess.CalledProcessError as e:
Expand All @@ -83,7 +85,7 @@ def perform_backup():

# mariabackup creates awkward permissions for the output files,
# so we'll grant appropriate permissions for tmp_dir to later remove it
subprocess.run(['sudo', 'chmod', '-R', '777', tmp_dir], check=True)
subprocess.run(['sudo', 'chmod', '-R', '770', tmp_dir], check=True)

# tar the archive to make it easier to upload to GCS
tar_archive_path = f'{tmp_dir}.tar.gz'
Expand Down
18 changes: 12 additions & 6 deletions db/python/layers/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,9 +441,9 @@ async def get_cram_sizes_between_range(
sample_create_dates = await sglayer.get_samples_create_date_from_sgs(
list(crams.keys())
)
by_date: dict[
SequencingGroupInternalId, list[tuple[datetime.date, int]]
] = defaultdict(list)
by_date: dict[SequencingGroupInternalId, list[tuple[datetime.date, int]]] = (
defaultdict(list)
)

for sg_id, analyses in crams.items():
if len(analyses) == 1:
Expand Down Expand Up @@ -531,7 +531,9 @@ async def get_sgs_added_by_day_by_es_indices(

return by_day

async def get_audit_logs_by_analysis_ids(self, analysis_ids: list[int]) -> dict[int, list[AuditLogInternal]]:
async def get_audit_logs_by_analysis_ids(
self, analysis_ids: list[int]
) -> dict[int, list[AuditLogInternal]]:
"""Get audit logs for analysis IDs"""
return await self.at.get_audit_log_for_analysis_ids(analysis_ids)

Expand Down Expand Up @@ -594,12 +596,16 @@ async def update_analysis(
async def get_analysis_runner_log(
self,
project_ids: list[int] = None,
author: str = None,
# author: str = None,
output_dir: str = None,
ar_guid: str = None,
) -> list[AnalysisInternal]:
"""
Get log for the analysis-runner, useful for checking this history of analysis
"""
return await self.at.get_analysis_runner_log(
project_ids, author=author, output_dir=output_dir
project_ids,
# author=author,
output_dir=output_dir,
ar_guid=ar_guid,
)
2 changes: 1 addition & 1 deletion db/python/layers/billing.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class BillingLayer(BqBaseLayer):

def table_factory(
self,
source: BillingSource,
source: BillingSource | None = None,
fields: list[BillingColumn] | None = None,
filters: dict[BillingColumn, str | list | dict] | None = None,
) -> (
Expand Down
11 changes: 6 additions & 5 deletions db/python/tables/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,8 +486,9 @@ async def get_sample_cram_path_map_for_seqr(
async def get_analysis_runner_log(
self,
project_ids: List[int] = None,
author: str = None,
# author: str = None,
output_dir: str = None,
ar_guid: str = None,
) -> List[AnalysisInternal]:
"""
Get log for the analysis-runner, useful for checking this history of analysis
Expand All @@ -501,15 +502,15 @@ async def get_analysis_runner_log(
wheres.append('project in :project_ids')
values['project_ids'] = project_ids

if author:
wheres.append('audit_log_id = :audit_log_id')
values['audit_log_id'] = await self.audit_log_id()

if output_dir:
wheres.append('(output = :output OR output LIKE :output_like)')
values['output'] = output_dir
values['output_like'] = f'%{output_dir}'

if ar_guid:
wheres.append('JSON_EXTRACT(meta, "$.ar_guid") = :ar_guid')
values['ar_guid'] = ar_guid

wheres_str = ' AND '.join(wheres)
_query = f'SELECT * FROM analysis WHERE {wheres_str}'
rows = await self.connection.fetch_all(_query, values)
Expand Down
29 changes: 16 additions & 13 deletions db/python/tables/bq/billing_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,9 @@ def _execute_query(
# otherwise return as BQ iterator
return self._connection.connection.query(query, job_config=job_config)

@staticmethod
def _query_to_partitioned_filter(
self, query: BillingTotalCostQueryModel
query: BillingTotalCostQueryModel,
) -> BillingFilter:
"""
By default views are partitioned by 'day',
Expand All @@ -137,15 +138,18 @@ def _query_to_partitioned_filter(
)
return billing_filter

def _filter_to_optimise_query(self) -> str:
@staticmethod
def _filter_to_optimise_query() -> str:
"""Filter string to optimise BQ query"""
return 'day >= TIMESTAMP(@start_day) AND day <= TIMESTAMP(@last_day)'

def _last_loaded_day_filter(self) -> str:
@staticmethod
def _last_loaded_day_filter() -> str:
"""Last Loaded day filter string"""
return 'day = TIMESTAMP(@last_loaded_day)'

def _convert_output(self, query_job_result):
@staticmethod
def _convert_output(query_job_result):
"""Convert query result to json"""
if not query_job_result or query_job_result.result().total_rows == 0:
# return empty list if no record found
Expand Down Expand Up @@ -325,8 +329,8 @@ async def _execute_running_cost_query(
self._execute_query(_query, query_params),
)

@staticmethod
async def _append_total_running_cost(
self,
field: BillingColumn,
is_current_month: bool,
last_loaded_day: str | None,
Expand Down Expand Up @@ -437,9 +441,8 @@ async def _append_running_cost_records(

return results

def _prepare_order_by_string(
self, order_by: dict[BillingColumn, bool] | None
) -> str:
@staticmethod
def _prepare_order_by_string(order_by: dict[BillingColumn, bool] | None) -> str:
"""Prepare order by string"""
if not order_by:
return ''
Expand All @@ -452,9 +455,8 @@ def _prepare_order_by_string(

return f'ORDER BY {",".join(order_by_cols)}' if order_by_cols else ''

def _prepare_aggregation(
self, query: BillingTotalCostQueryModel
) -> tuple[str, str]:
@staticmethod
def _prepare_aggregation(query: BillingTotalCostQueryModel) -> tuple[str, str]:
"""Prepare both fields for aggregation and group by string"""
# Get columns to group by

Expand All @@ -479,7 +481,8 @@ def _prepare_aggregation(

return fields_selected, group_by

def _prepare_labels_function(self, query: BillingTotalCostQueryModel):
@staticmethod
def _prepare_labels_function(query: BillingTotalCostQueryModel):
if not query.filters:
return None

Expand Down Expand Up @@ -558,7 +561,7 @@ async def get_total_cost(
where_str = f'WHERE {where_str}'

_query = f"""
{func_filter.fun_implementation if func_filter else ''}
{func_filter.func_implementation if func_filter else ''}
WITH t AS (
SELECT {time_group.field}{time_group.separator} {fields_selected},
Expand Down
15 changes: 15 additions & 0 deletions db/python/tables/bq/billing_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import dataclasses
import datetime
from typing import Any

from db.python.tables.bq.generic_bq_filter import GenericBQFilter
from db.python.tables.bq.generic_bq_filter_model import GenericBQFilterModel
Expand Down Expand Up @@ -46,3 +47,17 @@ class BillingFilter(GenericBQFilterModel):
goog_pipelines_worker: GenericBQFilter[str] = None
wdl_task_name: GenericBQFilter[str] = None
namespace: GenericBQFilter[str] = None

def __eq__(self, other: Any) -> bool:
"""Equality operator"""
result = super().__eq__(other)
if not result or not isinstance(other, BillingFilter):
return False

# compare all attributes
for att in self.__dict__:
if getattr(self, att) != getattr(other, att):
return False

# all attributes are equal
return True
10 changes: 7 additions & 3 deletions db/python/tables/bq/billing_gcp_daily.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import datetime, timedelta
from typing import Any

from google.cloud import bigquery

Expand All @@ -9,7 +10,7 @@
)
from db.python.tables.bq.billing_filter import BillingFilter
from db.python.tables.bq.generic_bq_filter import GenericBQFilter
from models.models import BillingTotalCostQueryModel
from models.models import BillingColumn, BillingTotalCostQueryModel


class BillingGcpDailyTable(BillingBaseTable):
Expand All @@ -21,8 +22,9 @@ def get_table_name(self):
"""Get table name"""
return self.table_name

@staticmethod
def _query_to_partitioned_filter(
self, query: BillingTotalCostQueryModel
query: BillingTotalCostQueryModel,
) -> BillingFilter:
"""
add extra filter to limit materialized view partition
Expand Down Expand Up @@ -77,7 +79,9 @@ async def _last_loaded_day(self):

return None

def _prepare_daily_cost_subquery(self, field, query_params, last_loaded_day):
def _prepare_daily_cost_subquery(
self, field: BillingColumn, query_params: list[Any], last_loaded_day: str
):
"""prepare daily cost subquery"""

# add extra filter to limit materialized view partition
Expand Down
3 changes: 2 additions & 1 deletion db/python/tables/bq/billing_raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ def get_table_name(self):
"""Get table name"""
return self.table_name

@staticmethod
def _query_to_partitioned_filter(
self, query: BillingTotalCostQueryModel
query: BillingTotalCostQueryModel,
) -> BillingFilter:
"""
Raw BQ billing table is partitioned by usage_end_time
Expand Down
Loading

0 comments on commit 5b55a13

Please sign in to comment.