Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release #702

Merged
merged 3 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions api/routes/billing.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
"""
Billing routes
"""

from async_lru import alru_cache
from fastapi import APIRouter
from fastapi.encoders import jsonable_encoder
from fastapi.responses import JSONResponse

from api.settings import BILLING_CACHE_RESPONSE_TTL, BQ_AGGREG_VIEW
from api.utils.db import BqConnection, get_author
from db.python.layers.billing import BillingLayer
from models.enums import BillingSource
from models.models import (
BillingBatchCostRecord,
BillingColumn,
BillingCostBudgetRecord,
BillingHailBatchCostRecord,
BillingTotalCostQueryModel,
BillingTotalCostRecord,
)
Expand Down Expand Up @@ -273,34 +276,38 @@ async def get_namespaces(

@router.get(
'/cost-by-ar-guid/{ar_guid}',
response_model=BillingHailBatchCostRecord,
response_model=list[BillingBatchCostRecord],
operation_id='costByArGuid',
)
@alru_cache(maxsize=10, ttl=BILLING_CACHE_RESPONSE_TTL)
async def get_cost_by_ar_guid(
ar_guid: str,
author: str = get_author,
) -> BillingHailBatchCostRecord:
) -> JSONResponse:
"""Get Hail Batch costs by AR GUID"""
billing_layer = _get_billing_layer_from(author)
records = await billing_layer.get_cost_by_ar_guid(ar_guid)
return records
headers = {'x-bq-cost': str(billing_layer.connection.cost)}
json_compatible_item_data = jsonable_encoder(records)
return JSONResponse(content=json_compatible_item_data, headers=headers)


@router.get(
'/cost-by-batch-id/{batch_id}',
response_model=BillingHailBatchCostRecord,
response_model=list[BillingBatchCostRecord],
operation_id='costByBatchId',
)
@alru_cache(maxsize=10, ttl=BILLING_CACHE_RESPONSE_TTL)
async def get_cost_by_batch_id(
batch_id: str,
author: str = get_author,
) -> BillingHailBatchCostRecord:
) -> JSONResponse:
"""Get Hail Batch costs by Batch ID"""
billing_layer = _get_billing_layer_from(author)
records = await billing_layer.get_cost_by_batch_id(batch_id)
return records
headers = {'x-bq-cost': str(billing_layer.connection.cost)}
json_compatible_item_data = jsonable_encoder(records)
return JSONResponse(content=json_compatible_item_data, headers=headers)


@router.post(
Expand Down
3 changes: 3 additions & 0 deletions api/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
BQ_GCP_BILLING_VIEW = os.getenv('SM_GCP_BQ_BILLING_VIEW')
BQ_BATCHES_VIEW = os.getenv('SM_GCP_BQ_BATCHES_VIEW')

# BQ cost per 1 TB, used to calculate cost of BQ queries
BQ_COST_PER_TB = 6.25

# This is to optimise BQ queries, DEV table has data only for Mar 2023
BQ_DAYS_BACK_OPTIMAL = 30 # Look back 30 days for optimal query
BILLING_CACHE_RESPONSE_TTL = 3600 # 1 Hour
Expand Down
13 changes: 13 additions & 0 deletions db/python/gcp_connect.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
Code for connecting to Big Query database
"""

import logging
import os

Expand All @@ -23,6 +24,8 @@ def __init__(
self.gcp_project = os.getenv('METAMIST_GCP_PROJECT')
self.connection: bq.Client = bq.Client(project=self.gcp_project)
self.author: str = author
# initialise cost of the query
self._cost: float = 0

@staticmethod
async def get_connection_no_project(author: str):
Expand All @@ -35,6 +38,16 @@ async def get_connection_no_project(author: str):

return BqConnection(author=author)

@property
def cost(self) -> float:
"""Get the cost of the query"""
return self._cost

@cost.setter
def cost(self, value: float):
"""Set the cost of the query"""
self._cost = value


class BqDbBase:
"""Base class for big query database subclasses"""
Expand Down
77 changes: 16 additions & 61 deletions db/python/layers/billing.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
from db.python.tables.bq.billing_daily_extended import BillingDailyExtendedTable
from db.python.tables.bq.billing_gcp_daily import BillingGcpDailyTable
from db.python.tables.bq.billing_raw import BillingRawTable
from models.enums import BillingSource, BillingTimeColumn, BillingTimePeriods
from models.enums import BillingSource
from models.models import (
BillingBatchCostRecord,
BillingColumn,
BillingCostBudgetRecord,
BillingHailBatchCostRecord,
BillingTotalCostQueryModel,
)

Expand All @@ -32,6 +32,8 @@ def table_factory(
return BillingGcpDailyTable(self.connection)
if source == BillingSource.RAW:
return BillingRawTable(self.connection)
if source == BillingSource.EXTENDED:
return BillingDailyExtendedTable(self.connection)

# check if any of the fields is in the extended columns
if fields:
Expand Down Expand Up @@ -202,7 +204,7 @@ async def get_running_cost(
async def get_cost_by_ar_guid(
self,
ar_guid: str | None = None,
) -> BillingHailBatchCostRecord:
) -> list[BillingBatchCostRecord]:
"""
Get Costs by AR GUID
"""
Expand All @@ -216,44 +218,18 @@ async def get_cost_by_ar_guid(
) = await ar_batch_lookup_table.get_batches_by_ar_guid(ar_guid)

if not batches:
return BillingHailBatchCostRecord(
ar_guid=ar_guid,
batch_ids=[],
costs=[],
)
return []

# Then get the costs for the given AR GUID/batches from the main table
all_cols = [BillingColumn.str_to_enum(v) for v in BillingColumn.raw_cols()]

query = BillingTotalCostQueryModel(
fields=all_cols,
source=BillingSource.RAW,
start_date=start_day.strftime('%Y-%m-%d'),
end_date=end_day.strftime('%Y-%m-%d'),
filters={
BillingColumn.LABELS: {
'batch_id': batches,
'ar-guid': ar_guid,
}
},
filters_op='OR',
group_by=False,
time_column=BillingTimeColumn.USAGE_END_TIME,
time_periods=BillingTimePeriods.DAY,
)

billing_table = self.table_factory(query.source, query.fields)
records = await billing_table.get_total_cost(query)
return BillingHailBatchCostRecord(
ar_guid=ar_guid,
batch_ids=batches,
costs=records,
billing_table = BillingDailyExtendedTable(self.connection)
results = await billing_table.get_batch_cost_summary(
start_day, end_day, batches, ar_guid
)
return results

async def get_cost_by_batch_id(
self,
batch_id: str | None = None,
) -> BillingHailBatchCostRecord:
) -> list[BillingBatchCostRecord]:
"""
Get Costs by Batch ID
"""
Expand All @@ -270,31 +246,10 @@ async def get_cost_by_batch_id(
) = await ar_batch_lookup_table.get_batches_by_ar_guid(ar_guid)

if not batches:
return BillingHailBatchCostRecord(ar_guid=ar_guid, batch_ids=[], costs=[])
return []

# Then get the costs for the given AR GUID/batches from the main table
all_cols = [BillingColumn.str_to_enum(v) for v in BillingColumn.raw_cols()]

query = BillingTotalCostQueryModel(
fields=all_cols,
source=BillingSource.RAW,
start_date=start_day.strftime('%Y-%m-%d'),
end_date=end_day.strftime('%Y-%m-%d'),
filters={
BillingColumn.LABELS: {
'batch_id': batches,
'ar-guid': ar_guid,
}
},
filters_op='OR',
group_by=False,
time_column=BillingTimeColumn.USAGE_END_TIME,
time_periods=BillingTimePeriods.DAY,
)
billing_table = self.table_factory(query.source, query.fields)
records = await billing_table.get_total_cost(query)
return BillingHailBatchCostRecord(
ar_guid=ar_guid,
batch_ids=batches,
costs=records,
billing_table = BillingDailyExtendedTable(self.connection)
results = await billing_table.get_batch_cost_summary(
start_day, end_day, batches, ar_guid
)
return results
5 changes: 5 additions & 0 deletions db/python/layers/bq_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,8 @@ def __init__(self, connection: BqConnection):
def author(self):
"""Get author from connection"""
return self.connection.author

@property
def cost(self):
"""Get author from connection"""
return self.connection.cost
3 changes: 2 additions & 1 deletion db/python/tables/assay.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ async def get_assay_type_numbers_by_batch_for_project(self, project: ProjectId):
"""
rows = await self.connection.fetch_all(_query, {'project': project})
batch_result: dict[str, dict[str, str]] = defaultdict(dict)
for batch, seqType, count in rows:
for row in rows:
batch, seqType, count = row['batch'], row['type'], row['n']
batch = str(batch).strip('\"') if batch != 'null' else 'no-batch'
batch_result[batch][seqType] = str(count)
if len(batch_result) == 1 and 'no-batch' in batch_result:
Expand Down
84 changes: 56 additions & 28 deletions db/python/tables/bq/billing_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from google.cloud import bigquery

from api.settings import BQ_BUDGET_VIEW, BQ_DAYS_BACK_OPTIMAL
from api.settings import BQ_BUDGET_VIEW, BQ_COST_PER_TB, BQ_DAYS_BACK_OPTIMAL
from api.utils.dates import get_invoice_month_range, reformat_datetime
from db.python.gcp_connect import BqDbBase
from db.python.tables.bq.billing_filter import BillingFilter
Expand Down Expand Up @@ -99,8 +99,10 @@ def get_table_name(self):
raise NotImplementedError('Calling Abstract method directly')

def _execute_query(
self, query: str, params: list[Any] = None, results_as_list: bool = True
) -> list[Any]:
self, query: str, params: list[Any] | None = None, results_as_list: bool = True
) -> (
list[Any] | bigquery.table.RowIterator | bigquery.table._EmptyRowIterator | None
):
"""Execute query, add BQ labels"""
if params:
job_config = bigquery.QueryJobConfig(
Expand All @@ -109,13 +111,31 @@ def _execute_query(
else:
job_config = bigquery.QueryJobConfig(labels=BQ_LABELS)

# We need to dry run to calulate the costs
# executing query does not provide the cost
# more info here:
# https://stackoverflow.com/questions/58561153/what-is-the-python-api-i-can-use-to-calculate-the-cost-of-a-bigquery-query/58561358#58561358
job_config.dry_run = True
job_config.use_query_cache = False
query_job = self._connection.connection.query(query, job_config=job_config)

# This should be thread/async safe as each request
# creates a new connection instance
# and queries per requests are run in sequencial order,
# waiting for the previous one to finish
self._connection.cost += (
query_job.total_bytes_processed / 1024**4
) * BQ_COST_PER_TB

# now execute the query
job_config.dry_run = False
job_config.use_query_cache = True
query_job = self._connection.connection.query(query, job_config=job_config)
if results_as_list:
return list(
self._connection.connection.query(query, job_config=job_config).result()
)
return list(query_job.result())

# otherwise return as BQ iterator
return self._connection.connection.query(query, job_config=job_config)
return query_job

@staticmethod
def _query_to_partitioned_filter(
Expand All @@ -129,12 +149,16 @@ def _query_to_partitioned_filter(

# initial partition filter
billing_filter.day = GenericBQFilter[datetime](
gte=datetime.strptime(query.start_date, '%Y-%m-%d')
if query.start_date
else None,
lte=datetime.strptime(query.end_date, '%Y-%m-%d')
if query.end_date
else None,
gte=(
datetime.strptime(query.start_date, '%Y-%m-%d')
if query.start_date
else None
),
lte=(
datetime.strptime(query.end_date, '%Y-%m-%d')
if query.end_date
else None
),
)
return billing_filter

Expand Down Expand Up @@ -362,17 +386,19 @@ async def _append_total_running_cost(
total_monthly=(
total_monthly[COMPUTE]['ALL'] + total_monthly[STORAGE]['ALL']
),
total_daily=(total_daily[COMPUTE]['ALL'] + total_daily[STORAGE]['ALL'])
if is_current_month
else None,
total_daily=(
(total_daily[COMPUTE]['ALL'] + total_daily[STORAGE]['ALL'])
if is_current_month
else None
),
compute_monthly=total_monthly[COMPUTE]['ALL'],
compute_daily=(total_daily[COMPUTE]['ALL'])
if is_current_month
else None,
compute_daily=(
(total_daily[COMPUTE]['ALL']) if is_current_month else None
),
storage_monthly=total_monthly[STORAGE]['ALL'],
storage_daily=(total_daily[STORAGE]['ALL'])
if is_current_month
else None,
storage_daily=(
(total_daily[STORAGE]['ALL']) if is_current_month else None
),
details=all_details,
budget_spent=None,
budget=None,
Expand Down Expand Up @@ -422,17 +448,19 @@ async def _append_running_cost_records(
{
'field': key,
'total_monthly': monthly,
'total_daily': (compute_daily + storage_daily)
if is_current_month
else None,
'total_daily': (
(compute_daily + storage_daily)
if is_current_month
else None
),
'compute_monthly': compute_monthly,
'compute_daily': compute_daily,
'storage_monthly': storage_monthly,
'storage_daily': storage_daily,
'details': details,
'budget_spent': 100 * monthly / budget_monthly
if budget_monthly
else None,
'budget_spent': (
100 * monthly / budget_monthly if budget_monthly else None
),
'budget': budget_monthly,
'last_loaded_day': last_loaded_day,
}
Expand Down
Loading
Loading