Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release #697

Merged
merged 5 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/deploy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ jobs:

- uses: actions/setup-python@v4
with:
python-version: "3.10"
python-version: "3.11"

- uses: actions/setup-java@v3
with:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/lint.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:

- uses: actions/setup-python@v4
with:
python-version: "3.10"
python-version: "3.11"

- uses: actions/setup-java@v2
with:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:

- uses: actions/setup-python@v5
with:
python-version: "3.10"
python-version: "3.11"

- uses: actions/setup-java@v4
with:
Expand Down
40 changes: 25 additions & 15 deletions api/routes/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,25 @@ class AnalysisQueryModel(BaseModel):
def to_filter(self, project_id_map: dict[str, int]) -> AnalysisFilter:
"""Convert to internal analysis filter"""
return AnalysisFilter(
sample_id=GenericFilter(
in_=sample_id_transform_to_raw_list(self.sample_ids)
)
if self.sample_ids
else None,
sequencing_group_id=GenericFilter(
in_=sequencing_group_id_transform_to_raw_list(self.sequencing_group_ids)
)
if self.sequencing_group_ids
else None,
project=GenericFilter(in_=[project_id_map.get(p) for p in self.projects])
if self.projects
else None,
sample_id=(
GenericFilter(in_=sample_id_transform_to_raw_list(self.sample_ids))
if self.sample_ids
else None
),
sequencing_group_id=(
GenericFilter(
in_=sequencing_group_id_transform_to_raw_list(
self.sequencing_group_ids
)
)
if self.sequencing_group_ids
else None
),
project=(
GenericFilter(in_=[project_id_map.get(p) for p in self.projects])
if self.projects
else None
),
type=GenericFilter(eq=self.type) if self.type else None,
)

Expand Down Expand Up @@ -241,8 +247,9 @@ async def query_analyses(
@router.get('/analysis-runner', operation_id='getAnalysisRunnerLog')
async def get_analysis_runner_log(
project_names: list[str] = Query(None), # type: ignore
author: str = None,
# author: str = None, # not implemented yet, uncomment when we do
output_dir: str = None,
ar_guid: str = None,
connection: Connection = get_projectless_db_connection,
) -> list[AnalysisInternal]:
"""
Expand All @@ -257,7 +264,10 @@ async def get_analysis_runner_log(
)

results = await atable.get_analysis_runner_log(
project_ids=project_ids, author=author, output_dir=output_dir
project_ids=project_ids,
# author=author,
output_dir=output_dir,
ar_guid=ar_guid,
)
return [a.to_external() for a in results]

Expand Down
10 changes: 6 additions & 4 deletions db/backup/backup.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
#!/usr/bin/python3.7
#!/usr/bin/python3
# pylint: disable=broad-exception-caught,broad-exception-raised
""" Daily back up function for databases within a local
MariaDB instance """

import json
import os
import subprocess
from datetime import datetime
from typing import Literal
Expand Down Expand Up @@ -50,7 +51,7 @@ def perform_backup():
tmp_dir = f'backup_{timestamp_str}'
subprocess.run(['mkdir', tmp_dir], check=True)
# grant permissions, so that mariadb can read ib_logfile0
subprocess.run(['sudo', 'chmod', '-R', '777', tmp_dir], check=True)
subprocess.run(['sudo', 'chmod', '-R', '770', tmp_dir], check=True)

credentials = read_db_credentials()
db_username = credentials['username']
Expand All @@ -66,10 +67,11 @@ def perform_backup():
'--backup',
f'--target-dir={tmp_dir}/',
f'--user={db_username}',
f'-p{db_password}',
],
check=True,
stderr=subprocess.DEVNULL,
# pass the password with stdin to avoid it being visible in the process list
env={'MYSQL_PWD': db_password, **os.environ},
)

except subprocess.CalledProcessError as e:
Expand All @@ -83,7 +85,7 @@ def perform_backup():

# mariabackup creates awkward permissions for the output files,
# so we'll grant appropriate permissions for tmp_dir to later remove it
subprocess.run(['sudo', 'chmod', '-R', '777', tmp_dir], check=True)
subprocess.run(['sudo', 'chmod', '-R', '770', tmp_dir], check=True)

# tar the archive to make it easier to upload to GCS
tar_archive_path = f'{tmp_dir}.tar.gz'
Expand Down
18 changes: 12 additions & 6 deletions db/python/layers/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,9 +441,9 @@ async def get_cram_sizes_between_range(
sample_create_dates = await sglayer.get_samples_create_date_from_sgs(
list(crams.keys())
)
by_date: dict[
SequencingGroupInternalId, list[tuple[datetime.date, int]]
] = defaultdict(list)
by_date: dict[SequencingGroupInternalId, list[tuple[datetime.date, int]]] = (
defaultdict(list)
)

for sg_id, analyses in crams.items():
if len(analyses) == 1:
Expand Down Expand Up @@ -531,7 +531,9 @@ async def get_sgs_added_by_day_by_es_indices(

return by_day

async def get_audit_logs_by_analysis_ids(self, analysis_ids: list[int]) -> dict[int, list[AuditLogInternal]]:
async def get_audit_logs_by_analysis_ids(
self, analysis_ids: list[int]
) -> dict[int, list[AuditLogInternal]]:
"""Get audit logs for analysis IDs"""
return await self.at.get_audit_log_for_analysis_ids(analysis_ids)

Expand Down Expand Up @@ -594,12 +596,16 @@ async def update_analysis(
async def get_analysis_runner_log(
self,
project_ids: list[int] = None,
author: str = None,
# author: str = None,
output_dir: str = None,
ar_guid: str = None,
) -> list[AnalysisInternal]:
"""
Get log for the analysis-runner, useful for checking this history of analysis
"""
return await self.at.get_analysis_runner_log(
project_ids, author=author, output_dir=output_dir
project_ids,
# author=author,
output_dir=output_dir,
ar_guid=ar_guid,
)
2 changes: 1 addition & 1 deletion db/python/layers/billing.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class BillingLayer(BqBaseLayer):

def table_factory(
self,
source: BillingSource,
source: BillingSource | None = None,
fields: list[BillingColumn] | None = None,
filters: dict[BillingColumn, str | list | dict] | None = None,
) -> (
Expand Down
11 changes: 6 additions & 5 deletions db/python/tables/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,8 +486,9 @@ async def get_sample_cram_path_map_for_seqr(
async def get_analysis_runner_log(
self,
project_ids: List[int] = None,
author: str = None,
# author: str = None,
output_dir: str = None,
ar_guid: str = None,
) -> List[AnalysisInternal]:
"""
Get log for the analysis-runner, useful for checking this history of analysis
Expand All @@ -501,15 +502,15 @@ async def get_analysis_runner_log(
wheres.append('project in :project_ids')
values['project_ids'] = project_ids

if author:
wheres.append('audit_log_id = :audit_log_id')
values['audit_log_id'] = await self.audit_log_id()

if output_dir:
wheres.append('(output = :output OR output LIKE :output_like)')
values['output'] = output_dir
values['output_like'] = f'%{output_dir}'

if ar_guid:
wheres.append('JSON_EXTRACT(meta, "$.ar_guid") = :ar_guid')
values['ar_guid'] = ar_guid

wheres_str = ' AND '.join(wheres)
_query = f'SELECT * FROM analysis WHERE {wheres_str}'
rows = await self.connection.fetch_all(_query, values)
Expand Down
29 changes: 16 additions & 13 deletions db/python/tables/bq/billing_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,9 @@ def _execute_query(
# otherwise return as BQ iterator
return self._connection.connection.query(query, job_config=job_config)

@staticmethod
def _query_to_partitioned_filter(
self, query: BillingTotalCostQueryModel
query: BillingTotalCostQueryModel,
) -> BillingFilter:
"""
By default views are partitioned by 'day',
Expand All @@ -137,15 +138,18 @@ def _query_to_partitioned_filter(
)
return billing_filter

def _filter_to_optimise_query(self) -> str:
@staticmethod
def _filter_to_optimise_query() -> str:
"""Filter string to optimise BQ query"""
return 'day >= TIMESTAMP(@start_day) AND day <= TIMESTAMP(@last_day)'

def _last_loaded_day_filter(self) -> str:
@staticmethod
def _last_loaded_day_filter() -> str:
"""Last Loaded day filter string"""
return 'day = TIMESTAMP(@last_loaded_day)'

def _convert_output(self, query_job_result):
@staticmethod
def _convert_output(query_job_result):
"""Convert query result to json"""
if not query_job_result or query_job_result.result().total_rows == 0:
# return empty list if no record found
Expand Down Expand Up @@ -325,8 +329,8 @@ async def _execute_running_cost_query(
self._execute_query(_query, query_params),
)

@staticmethod
async def _append_total_running_cost(
self,
field: BillingColumn,
is_current_month: bool,
last_loaded_day: str | None,
Expand Down Expand Up @@ -437,9 +441,8 @@ async def _append_running_cost_records(

return results

def _prepare_order_by_string(
self, order_by: dict[BillingColumn, bool] | None
) -> str:
@staticmethod
def _prepare_order_by_string(order_by: dict[BillingColumn, bool] | None) -> str:
"""Prepare order by string"""
if not order_by:
return ''
Expand All @@ -452,9 +455,8 @@ def _prepare_order_by_string(

return f'ORDER BY {",".join(order_by_cols)}' if order_by_cols else ''

def _prepare_aggregation(
self, query: BillingTotalCostQueryModel
) -> tuple[str, str]:
@staticmethod
def _prepare_aggregation(query: BillingTotalCostQueryModel) -> tuple[str, str]:
"""Prepare both fields for aggregation and group by string"""
# Get columns to group by

Expand All @@ -479,7 +481,8 @@ def _prepare_aggregation(

return fields_selected, group_by

def _prepare_labels_function(self, query: BillingTotalCostQueryModel):
@staticmethod
def _prepare_labels_function(query: BillingTotalCostQueryModel):
if not query.filters:
return None

Expand Down Expand Up @@ -558,7 +561,7 @@ async def get_total_cost(
where_str = f'WHERE {where_str}'

_query = f"""
{func_filter.fun_implementation if func_filter else ''}
{func_filter.func_implementation if func_filter else ''}

WITH t AS (
SELECT {time_group.field}{time_group.separator} {fields_selected},
Expand Down
15 changes: 15 additions & 0 deletions db/python/tables/bq/billing_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import dataclasses
import datetime
from typing import Any

from db.python.tables.bq.generic_bq_filter import GenericBQFilter
from db.python.tables.bq.generic_bq_filter_model import GenericBQFilterModel
Expand Down Expand Up @@ -46,3 +47,17 @@ class BillingFilter(GenericBQFilterModel):
goog_pipelines_worker: GenericBQFilter[str] = None
wdl_task_name: GenericBQFilter[str] = None
namespace: GenericBQFilter[str] = None

def __eq__(self, other: Any) -> bool:
"""Equality operator"""
result = super().__eq__(other)
if not result or not isinstance(other, BillingFilter):
return False

# compare all attributes
for att in self.__dict__:
if getattr(self, att) != getattr(other, att):
return False

# all attributes are equal
return True
10 changes: 7 additions & 3 deletions db/python/tables/bq/billing_gcp_daily.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import datetime, timedelta
from typing import Any

from google.cloud import bigquery

Expand All @@ -9,7 +10,7 @@
)
from db.python.tables.bq.billing_filter import BillingFilter
from db.python.tables.bq.generic_bq_filter import GenericBQFilter
from models.models import BillingTotalCostQueryModel
from models.models import BillingColumn, BillingTotalCostQueryModel


class BillingGcpDailyTable(BillingBaseTable):
Expand All @@ -21,8 +22,9 @@ def get_table_name(self):
"""Get table name"""
return self.table_name

@staticmethod
def _query_to_partitioned_filter(
self, query: BillingTotalCostQueryModel
query: BillingTotalCostQueryModel,
) -> BillingFilter:
"""
add extra filter to limit materialized view partition
Expand Down Expand Up @@ -77,7 +79,9 @@ async def _last_loaded_day(self):

return None

def _prepare_daily_cost_subquery(self, field, query_params, last_loaded_day):
def _prepare_daily_cost_subquery(
self, field: BillingColumn, query_params: list[Any], last_loaded_day: str
):
"""prepare daily cost subquery"""

# add extra filter to limit materialized view partition
Expand Down
3 changes: 2 additions & 1 deletion db/python/tables/bq/billing_raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ def get_table_name(self):
"""Get table name"""
return self.table_name

@staticmethod
def _query_to_partitioned_filter(
self, query: BillingTotalCostQueryModel
query: BillingTotalCostQueryModel,
) -> BillingFilter:
"""
Raw BQ billing table is partitioned by usage_end_time
Expand Down
Loading
Loading