Skip to content

Commit

Permalink
Merge pull request #702 from populationgenomics/dev
Browse files Browse the repository at this point in the history
Release
  • Loading branch information
milo-hyben authored Mar 14, 2024
2 parents 5b55a13 + 2cb48ee commit 3dc5ff6
Show file tree
Hide file tree
Showing 29 changed files with 1,361 additions and 786 deletions.
21 changes: 14 additions & 7 deletions api/routes/billing.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
"""
Billing routes
"""

from async_lru import alru_cache
from fastapi import APIRouter
from fastapi.encoders import jsonable_encoder
from fastapi.responses import JSONResponse

from api.settings import BILLING_CACHE_RESPONSE_TTL, BQ_AGGREG_VIEW
from api.utils.db import BqConnection, get_author
from db.python.layers.billing import BillingLayer
from models.enums import BillingSource
from models.models import (
BillingBatchCostRecord,
BillingColumn,
BillingCostBudgetRecord,
BillingHailBatchCostRecord,
BillingTotalCostQueryModel,
BillingTotalCostRecord,
)
Expand Down Expand Up @@ -273,34 +276,38 @@ async def get_namespaces(

@router.get(
'/cost-by-ar-guid/{ar_guid}',
response_model=BillingHailBatchCostRecord,
response_model=list[BillingBatchCostRecord],
operation_id='costByArGuid',
)
@alru_cache(maxsize=10, ttl=BILLING_CACHE_RESPONSE_TTL)
async def get_cost_by_ar_guid(
ar_guid: str,
author: str = get_author,
) -> BillingHailBatchCostRecord:
) -> JSONResponse:
"""Get Hail Batch costs by AR GUID"""
billing_layer = _get_billing_layer_from(author)
records = await billing_layer.get_cost_by_ar_guid(ar_guid)
return records
headers = {'x-bq-cost': str(billing_layer.connection.cost)}
json_compatible_item_data = jsonable_encoder(records)
return JSONResponse(content=json_compatible_item_data, headers=headers)


@router.get(
'/cost-by-batch-id/{batch_id}',
response_model=BillingHailBatchCostRecord,
response_model=list[BillingBatchCostRecord],
operation_id='costByBatchId',
)
@alru_cache(maxsize=10, ttl=BILLING_CACHE_RESPONSE_TTL)
async def get_cost_by_batch_id(
batch_id: str,
author: str = get_author,
) -> BillingHailBatchCostRecord:
) -> JSONResponse:
"""Get Hail Batch costs by Batch ID"""
billing_layer = _get_billing_layer_from(author)
records = await billing_layer.get_cost_by_batch_id(batch_id)
return records
headers = {'x-bq-cost': str(billing_layer.connection.cost)}
json_compatible_item_data = jsonable_encoder(records)
return JSONResponse(content=json_compatible_item_data, headers=headers)


@router.post(
Expand Down
3 changes: 3 additions & 0 deletions api/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
BQ_GCP_BILLING_VIEW = os.getenv('SM_GCP_BQ_BILLING_VIEW')
BQ_BATCHES_VIEW = os.getenv('SM_GCP_BQ_BATCHES_VIEW')

# BQ cost per 1 TB, used to calculate cost of BQ queries
BQ_COST_PER_TB = 6.25

# This is to optimise BQ queries, DEV table has data only for Mar 2023
BQ_DAYS_BACK_OPTIMAL = 30 # Look back 30 days for optimal query
BILLING_CACHE_RESPONSE_TTL = 3600 # 1 Hour
Expand Down
13 changes: 13 additions & 0 deletions db/python/gcp_connect.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
Code for connecting to Big Query database
"""

import logging
import os

Expand All @@ -23,6 +24,8 @@ def __init__(
self.gcp_project = os.getenv('METAMIST_GCP_PROJECT')
self.connection: bq.Client = bq.Client(project=self.gcp_project)
self.author: str = author
# initialise cost of the query
self._cost: float = 0

@staticmethod
async def get_connection_no_project(author: str):
Expand All @@ -35,6 +38,16 @@ async def get_connection_no_project(author: str):

return BqConnection(author=author)

@property
def cost(self) -> float:
"""Get the cost of the query"""
return self._cost

@cost.setter
def cost(self, value: float):
"""Set the cost of the query"""
self._cost = value


class BqDbBase:
"""Base class for big query database subclasses"""
Expand Down
77 changes: 16 additions & 61 deletions db/python/layers/billing.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
from db.python.tables.bq.billing_daily_extended import BillingDailyExtendedTable
from db.python.tables.bq.billing_gcp_daily import BillingGcpDailyTable
from db.python.tables.bq.billing_raw import BillingRawTable
from models.enums import BillingSource, BillingTimeColumn, BillingTimePeriods
from models.enums import BillingSource
from models.models import (
BillingBatchCostRecord,
BillingColumn,
BillingCostBudgetRecord,
BillingHailBatchCostRecord,
BillingTotalCostQueryModel,
)

Expand All @@ -32,6 +32,8 @@ def table_factory(
return BillingGcpDailyTable(self.connection)
if source == BillingSource.RAW:
return BillingRawTable(self.connection)
if source == BillingSource.EXTENDED:
return BillingDailyExtendedTable(self.connection)

# check if any of the fields is in the extended columns
if fields:
Expand Down Expand Up @@ -202,7 +204,7 @@ async def get_running_cost(
async def get_cost_by_ar_guid(
self,
ar_guid: str | None = None,
) -> BillingHailBatchCostRecord:
) -> list[BillingBatchCostRecord]:
"""
Get Costs by AR GUID
"""
Expand All @@ -216,44 +218,18 @@ async def get_cost_by_ar_guid(
) = await ar_batch_lookup_table.get_batches_by_ar_guid(ar_guid)

if not batches:
return BillingHailBatchCostRecord(
ar_guid=ar_guid,
batch_ids=[],
costs=[],
)
return []

# Then get the costs for the given AR GUID/batches from the main table
all_cols = [BillingColumn.str_to_enum(v) for v in BillingColumn.raw_cols()]

query = BillingTotalCostQueryModel(
fields=all_cols,
source=BillingSource.RAW,
start_date=start_day.strftime('%Y-%m-%d'),
end_date=end_day.strftime('%Y-%m-%d'),
filters={
BillingColumn.LABELS: {
'batch_id': batches,
'ar-guid': ar_guid,
}
},
filters_op='OR',
group_by=False,
time_column=BillingTimeColumn.USAGE_END_TIME,
time_periods=BillingTimePeriods.DAY,
)

billing_table = self.table_factory(query.source, query.fields)
records = await billing_table.get_total_cost(query)
return BillingHailBatchCostRecord(
ar_guid=ar_guid,
batch_ids=batches,
costs=records,
billing_table = BillingDailyExtendedTable(self.connection)
results = await billing_table.get_batch_cost_summary(
start_day, end_day, batches, ar_guid
)
return results

async def get_cost_by_batch_id(
self,
batch_id: str | None = None,
) -> BillingHailBatchCostRecord:
) -> list[BillingBatchCostRecord]:
"""
Get Costs by Batch ID
"""
Expand All @@ -270,31 +246,10 @@ async def get_cost_by_batch_id(
) = await ar_batch_lookup_table.get_batches_by_ar_guid(ar_guid)

if not batches:
return BillingHailBatchCostRecord(ar_guid=ar_guid, batch_ids=[], costs=[])
return []

# Then get the costs for the given AR GUID/batches from the main table
all_cols = [BillingColumn.str_to_enum(v) for v in BillingColumn.raw_cols()]

query = BillingTotalCostQueryModel(
fields=all_cols,
source=BillingSource.RAW,
start_date=start_day.strftime('%Y-%m-%d'),
end_date=end_day.strftime('%Y-%m-%d'),
filters={
BillingColumn.LABELS: {
'batch_id': batches,
'ar-guid': ar_guid,
}
},
filters_op='OR',
group_by=False,
time_column=BillingTimeColumn.USAGE_END_TIME,
time_periods=BillingTimePeriods.DAY,
)
billing_table = self.table_factory(query.source, query.fields)
records = await billing_table.get_total_cost(query)
return BillingHailBatchCostRecord(
ar_guid=ar_guid,
batch_ids=batches,
costs=records,
billing_table = BillingDailyExtendedTable(self.connection)
results = await billing_table.get_batch_cost_summary(
start_day, end_day, batches, ar_guid
)
return results
5 changes: 5 additions & 0 deletions db/python/layers/bq_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,8 @@ def __init__(self, connection: BqConnection):
def author(self):
"""Get author from connection"""
return self.connection.author

@property
def cost(self):
"""Get author from connection"""
return self.connection.cost
3 changes: 2 additions & 1 deletion db/python/tables/assay.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ async def get_assay_type_numbers_by_batch_for_project(self, project: ProjectId):
"""
rows = await self.connection.fetch_all(_query, {'project': project})
batch_result: dict[str, dict[str, str]] = defaultdict(dict)
for batch, seqType, count in rows:
for row in rows:
batch, seqType, count = row['batch'], row['type'], row['n']
batch = str(batch).strip('\"') if batch != 'null' else 'no-batch'
batch_result[batch][seqType] = str(count)
if len(batch_result) == 1 and 'no-batch' in batch_result:
Expand Down
84 changes: 56 additions & 28 deletions db/python/tables/bq/billing_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from google.cloud import bigquery

from api.settings import BQ_BUDGET_VIEW, BQ_DAYS_BACK_OPTIMAL
from api.settings import BQ_BUDGET_VIEW, BQ_COST_PER_TB, BQ_DAYS_BACK_OPTIMAL
from api.utils.dates import get_invoice_month_range, reformat_datetime
from db.python.gcp_connect import BqDbBase
from db.python.tables.bq.billing_filter import BillingFilter
Expand Down Expand Up @@ -99,8 +99,10 @@ def get_table_name(self):
raise NotImplementedError('Calling Abstract method directly')

def _execute_query(
self, query: str, params: list[Any] = None, results_as_list: bool = True
) -> list[Any]:
self, query: str, params: list[Any] | None = None, results_as_list: bool = True
) -> (
list[Any] | bigquery.table.RowIterator | bigquery.table._EmptyRowIterator | None
):
"""Execute query, add BQ labels"""
if params:
job_config = bigquery.QueryJobConfig(
Expand All @@ -109,13 +111,31 @@ def _execute_query(
else:
job_config = bigquery.QueryJobConfig(labels=BQ_LABELS)

# We need to dry run to calulate the costs
# executing query does not provide the cost
# more info here:
# https://stackoverflow.com/questions/58561153/what-is-the-python-api-i-can-use-to-calculate-the-cost-of-a-bigquery-query/58561358#58561358
job_config.dry_run = True
job_config.use_query_cache = False
query_job = self._connection.connection.query(query, job_config=job_config)

# This should be thread/async safe as each request
# creates a new connection instance
# and queries per requests are run in sequencial order,
# waiting for the previous one to finish
self._connection.cost += (
query_job.total_bytes_processed / 1024**4
) * BQ_COST_PER_TB

# now execute the query
job_config.dry_run = False
job_config.use_query_cache = True
query_job = self._connection.connection.query(query, job_config=job_config)
if results_as_list:
return list(
self._connection.connection.query(query, job_config=job_config).result()
)
return list(query_job.result())

# otherwise return as BQ iterator
return self._connection.connection.query(query, job_config=job_config)
return query_job

@staticmethod
def _query_to_partitioned_filter(
Expand All @@ -129,12 +149,16 @@ def _query_to_partitioned_filter(

# initial partition filter
billing_filter.day = GenericBQFilter[datetime](
gte=datetime.strptime(query.start_date, '%Y-%m-%d')
if query.start_date
else None,
lte=datetime.strptime(query.end_date, '%Y-%m-%d')
if query.end_date
else None,
gte=(
datetime.strptime(query.start_date, '%Y-%m-%d')
if query.start_date
else None
),
lte=(
datetime.strptime(query.end_date, '%Y-%m-%d')
if query.end_date
else None
),
)
return billing_filter

Expand Down Expand Up @@ -362,17 +386,19 @@ async def _append_total_running_cost(
total_monthly=(
total_monthly[COMPUTE]['ALL'] + total_monthly[STORAGE]['ALL']
),
total_daily=(total_daily[COMPUTE]['ALL'] + total_daily[STORAGE]['ALL'])
if is_current_month
else None,
total_daily=(
(total_daily[COMPUTE]['ALL'] + total_daily[STORAGE]['ALL'])
if is_current_month
else None
),
compute_monthly=total_monthly[COMPUTE]['ALL'],
compute_daily=(total_daily[COMPUTE]['ALL'])
if is_current_month
else None,
compute_daily=(
(total_daily[COMPUTE]['ALL']) if is_current_month else None
),
storage_monthly=total_monthly[STORAGE]['ALL'],
storage_daily=(total_daily[STORAGE]['ALL'])
if is_current_month
else None,
storage_daily=(
(total_daily[STORAGE]['ALL']) if is_current_month else None
),
details=all_details,
budget_spent=None,
budget=None,
Expand Down Expand Up @@ -422,17 +448,19 @@ async def _append_running_cost_records(
{
'field': key,
'total_monthly': monthly,
'total_daily': (compute_daily + storage_daily)
if is_current_month
else None,
'total_daily': (
(compute_daily + storage_daily)
if is_current_month
else None
),
'compute_monthly': compute_monthly,
'compute_daily': compute_daily,
'storage_monthly': storage_monthly,
'storage_daily': storage_daily,
'details': details,
'budget_spent': 100 * monthly / budget_monthly
if budget_monthly
else None,
'budget_spent': (
100 * monthly / budget_monthly if budget_monthly else None
),
'budget': budget_monthly,
'last_loaded_day': last_loaded_day,
}
Expand Down
Loading

0 comments on commit 3dc5ff6

Please sign in to comment.