Skip to content

Commit

Permalink
Merge from dev.
Browse files Browse the repository at this point in the history
  • Loading branch information
milo-hyben committed Jan 29, 2024
2 parents f629891 + 91b5ff7 commit 78f9f0d
Show file tree
Hide file tree
Showing 22 changed files with 539 additions and 24 deletions.
7 changes: 7 additions & 0 deletions api/routes/billing.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,18 @@
from api.settings import BILLING_CACHE_RESPONSE_TTL, BQ_AGGREG_VIEW
from api.utils.db import BqConnection, get_author
from db.python.layers.billing import BillingLayer
<<<<<<< HEAD
=======
from models.enums import BillingSource
>>>>>>> dev
from models.models import (
BillingColumn,
BillingCostBudgetRecord,
BillingHailBatchCostRecord,
<<<<<<< HEAD
BillingSource,
=======
>>>>>>> dev
BillingTotalCostQueryModel,
BillingTotalCostRecord,
)
Expand Down
4 changes: 2 additions & 2 deletions api/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,11 @@ async def exception_handler(request: Request, e: Exception):
cors_middleware = middlewares[0]

request_origin = request.headers.get('origin', '')
if cors_middleware and '*' in cors_middleware.options['allow_origins']: # type: ignore[attr-defined]
if cors_middleware and '*' in cors_middleware.options['allow_origins']: # type: ignore
response.headers['Access-Control-Allow-Origin'] = '*'
elif (
cors_middleware
and request_origin in cors_middleware.options['allow_origins'] # type: ignore[attr-defined]
and request_origin in cors_middleware.options['allow_origins'] # type: ignore
):
response.headers['Access-Control-Allow-Origin'] = request_origin

Expand Down
4 changes: 1 addition & 3 deletions db/python/layers/billing.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,11 @@
from db.python.tables.bq.billing_daily_extended import BillingDailyExtendedTable
from db.python.tables.bq.billing_gcp_daily import BillingGcpDailyTable
from db.python.tables.bq.billing_raw import BillingRawTable
from models.enums import BillingSource, BillingTimeColumn, BillingTimePeriods
from models.models import (
BillingColumn,
BillingCostBudgetRecord,
BillingHailBatchCostRecord,
BillingSource,
BillingTimeColumn,
BillingTimePeriods,
BillingTotalCostQueryModel,
)

Expand Down
160 changes: 160 additions & 0 deletions db/python/tables/bq/billing_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,18 @@
from db.python.tables.bq.billing_filter import BillingFilter
from db.python.tables.bq.function_bq_filter import FunctionBQFilter
from db.python.tables.bq.generic_bq_filter import GenericBQFilter
<<<<<<< HEAD
from models.models import (
BillingColumn,
BillingCostBudgetRecord,
BillingTimePeriods,
=======
from models.enums import BillingTimeColumn, BillingTimePeriods
from models.models import (
BillingColumn,
BillingCostBudgetRecord,
BillingCostDetailsRecord,
>>>>>>> dev
BillingTotalCostQueryModel,
)

Expand All @@ -29,16 +37,28 @@
'TimeGroupingDetails', ['field', 'formula', 'separator']
)

<<<<<<< HEAD

def abbrev_cost_category(cost_category: str) -> str:
"""abbreviate cost category"""
return 'S' if cost_category == 'Cloud Storage' else 'C'
=======
# constants to abbrevate (S)tores and (C)ompute
STORAGE = 'S'
COMPUTE = 'C'


def abbrev_cost_category(cost_category: str) -> str:
"""abbreviate cost category"""
return STORAGE if cost_category == 'Cloud Storage' else COMPUTE
>>>>>>> dev


def prepare_time_periods(
query: BillingTotalCostQueryModel,
) -> TimeGroupingDetails:
"""Prepare Time periods grouping and parsing formulas"""
<<<<<<< HEAD
time_column = query.time_column or 'day'
result = TimeGroupingDetails('', '', '')

Expand All @@ -63,12 +83,52 @@ def prepare_time_periods(
)
elif query.time_periods == BillingTimePeriods.INVOICE_MONTH:
result = TimeGroupingDetails(
=======
time_column: BillingTimeColumn = query.time_column or BillingTimeColumn.DAY

# Based on specified time period, add the corresponding column
if query.time_periods == BillingTimePeriods.DAY:
return TimeGroupingDetails(
field=f'FORMAT_DATE("%Y-%m-%d", {time_column.value}) as day',
formula='PARSE_DATE("%Y-%m-%d", day) as day',
separator=',',
)

if query.time_periods == BillingTimePeriods.WEEK:
return TimeGroupingDetails(
field=f'FORMAT_DATE("%Y%W", {time_column.value}) as day',
formula='PARSE_DATE("%Y%W", day) as day',
separator=',',
)

if query.time_periods == BillingTimePeriods.MONTH:
return TimeGroupingDetails(
field=f'FORMAT_DATE("%Y%m", {time_column.value}) as day',
formula='PARSE_DATE("%Y%m", day) as day',
separator=',',
)

if query.time_periods == BillingTimePeriods.INVOICE_MONTH:
return TimeGroupingDetails(
>>>>>>> dev
field='invoice_month as day',
formula='PARSE_DATE("%Y%m", day) as day',
separator=',',
)

<<<<<<< HEAD
return result
=======
return TimeGroupingDetails('', '', '')


def time_optimisation_parameter() -> bigquery.ScalarQueryParameter:
"""
BQ tables and views are partitioned by day, to avoid full scans
we need to limit the amount of data scanned
"""
return bigquery.ScalarQueryParameter('days', 'INT64', -int(BQ_DAYS_BACK_OPTIMAL))
>>>>>>> dev


class BillingBaseTable(BqDbBase):
Expand Down Expand Up @@ -102,9 +162,14 @@ def _execute_query(
# otherwise return as BQ iterator
return self._connection.connection.query(query, job_config=job_config)

<<<<<<< HEAD
@staticmethod
def _query_to_partitioned_filter(
query: BillingTotalCostQueryModel,
=======
def _query_to_partitioned_filter(
self, query: BillingTotalCostQueryModel
>>>>>>> dev
) -> BillingFilter:
"""
By default views are partitioned by 'day',
Expand All @@ -123,6 +188,7 @@ def _query_to_partitioned_filter(
)
return billing_filter

<<<<<<< HEAD
@staticmethod
def _filter_to_optimise_query() -> str:
"""Filter string to optimise BQ query"""
Expand All @@ -135,6 +201,17 @@ def _last_loaded_day_filter() -> str:

@staticmethod
def _convert_output(query_job_result):
=======
def _filter_to_optimise_query(self) -> str:
"""Filter string to optimise BQ query"""
return 'day >= TIMESTAMP(@start_day) AND day <= TIMESTAMP(@last_day)'

def _last_loaded_day_filter(self) -> str:
"""Last Loaded day filter string"""
return 'day = TIMESTAMP(@last_loaded_day)'

def _convert_output(self, query_job_result):
>>>>>>> dev
"""Convert query result to json"""
if not query_job_result or query_job_result.result().total_rows == 0:
# return empty list if no record found
Expand Down Expand Up @@ -169,7 +246,11 @@ async def _budgets_by_gcp_project(
WITH t AS (
SELECT gcp_project, MAX(created_at) as last_created_at
FROM `{BQ_BUDGET_VIEW}`
<<<<<<< HEAD
GROUP BY 1
=======
GROUP BY gcp_project
>>>>>>> dev
)
SELECT t.gcp_project, d.budget
FROM t inner join `{BQ_BUDGET_VIEW}` d
Expand Down Expand Up @@ -197,7 +278,11 @@ async def _last_loaded_day(self):
"""

query_parameters = [
<<<<<<< HEAD
bigquery.ScalarQueryParameter('days', 'INT64', -int(BQ_DAYS_BACK_OPTIMAL)),
=======
time_optimisation_parameter(),
>>>>>>> dev
]
query_job_result = self._execute_query(_query, query_parameters)

Expand All @@ -206,9 +291,13 @@ async def _last_loaded_day(self):

return None

<<<<<<< HEAD
def _prepare_daily_cost_subquery(
self, field: BillingColumn, query_params: list[Any], last_loaded_day: str
):
=======
def _prepare_daily_cost_subquery(self, field, query_params, last_loaded_day):
>>>>>>> dev
"""prepare daily cost subquery"""

daily_cost_field = ', day.cost as daily_cost'
Expand Down Expand Up @@ -316,8 +405,13 @@ async def _execute_running_cost_query(
self._execute_query(_query, query_params),
)

<<<<<<< HEAD
@staticmethod
async def _append_total_running_cost(
=======
async def _append_total_running_cost(
self,
>>>>>>> dev
field: BillingColumn,
is_current_month: bool,
last_loaded_day: str | None,
Expand All @@ -334,6 +428,7 @@ async def _append_total_running_cost(
all_details = []
for cat, mth_cost in total_monthly_category.items():
all_details.append(
<<<<<<< HEAD
{
'cost_group': abbrev_cost_category(cat),
'cost_category': cat,
Expand All @@ -342,10 +437,19 @@ async def _append_total_running_cost(
else None,
'monthly_cost': mth_cost,
}
=======
BillingCostDetailsRecord(
cost_group=abbrev_cost_category(cat),
cost_category=cat,
daily_cost=total_daily_category[cat] if is_current_month else None,
monthly_cost=mth_cost,
)
>>>>>>> dev
)

# add total row: compute + storage
results.append(
<<<<<<< HEAD
BillingCostBudgetRecord.from_json(
{
'field': f'{BillingColumn.generate_all_title(field)}',
Expand All @@ -366,6 +470,28 @@ async def _append_total_running_cost(
'details': all_details,
'last_loaded_day': last_loaded_day,
}
=======
BillingCostBudgetRecord(
field=f'{BillingColumn.generate_all_title(field)}',
total_monthly=(
total_monthly[COMPUTE]['ALL'] + total_monthly[STORAGE]['ALL']
),
total_daily=(total_daily[COMPUTE]['ALL'] + total_daily[STORAGE]['ALL'])
if is_current_month
else None,
compute_monthly=total_monthly[COMPUTE]['ALL'],
compute_daily=(total_daily[COMPUTE]['ALL'])
if is_current_month
else None,
storage_monthly=total_monthly[STORAGE]['ALL'],
storage_daily=(total_daily[STORAGE]['ALL'])
if is_current_month
else None,
details=all_details,
budget_spent=None,
budget=None,
last_loaded_day=last_loaded_day,
>>>>>>> dev
)
)

Expand All @@ -391,13 +517,27 @@ async def _append_running_cost_records(

# add rows by field
for key, details in field_details.items():
<<<<<<< HEAD
compute_daily = total_daily['C'][key] if key in total_daily['C'] else 0
storage_daily = total_daily['S'][key] if key in total_daily['S'] else 0
compute_monthly = (
total_monthly['C'][key] if key in total_monthly['C'] else 0
)
storage_monthly = (
total_monthly['S'][key] if key in total_monthly['S'] else 0
=======
compute_daily = (
total_daily[COMPUTE][key] if key in total_daily[COMPUTE] else 0
)
storage_daily = (
total_daily[STORAGE][key] if key in total_daily[STORAGE] else 0
)
compute_monthly = (
total_monthly[COMPUTE][key] if key in total_monthly[COMPUTE] else 0
)
storage_monthly = (
total_monthly[STORAGE][key] if key in total_monthly[STORAGE] else 0
>>>>>>> dev
)
monthly = compute_monthly + storage_monthly
budget_monthly = budgets_per_gcp_project.get(key)
Expand Down Expand Up @@ -426,8 +566,14 @@ async def _append_running_cost_records(

return results

<<<<<<< HEAD
@staticmethod
def _prepare_order_by_string(order_by: dict[BillingColumn, bool] | None) -> str:
=======
def _prepare_order_by_string(
self, order_by: dict[BillingColumn, bool] | None
) -> str:
>>>>>>> dev
"""Prepare order by string"""
if not order_by:
return ''
Expand All @@ -440,8 +586,14 @@ def _prepare_order_by_string(order_by: dict[BillingColumn, bool] | None) -> str:

return f'ORDER BY {",".join(order_by_cols)}' if order_by_cols else ''

<<<<<<< HEAD
@staticmethod
def _prepare_aggregation(query: BillingTotalCostQueryModel) -> tuple[str, str]:
=======
def _prepare_aggregation(
self, query: BillingTotalCostQueryModel
) -> tuple[str, str]:
>>>>>>> dev
"""Prepare both fields for aggregation and group by string"""
# Get columns to group by

Expand All @@ -466,8 +618,12 @@ def _prepare_aggregation(query: BillingTotalCostQueryModel) -> tuple[str, str]:

return fields_selected, group_by

<<<<<<< HEAD
@staticmethod
def _prepare_labels_function(query: BillingTotalCostQueryModel):
=======
def _prepare_labels_function(self, query: BillingTotalCostQueryModel):
>>>>>>> dev
if not query.filters:
return None

Expand Down Expand Up @@ -546,7 +702,11 @@ async def get_total_cost(
where_str = f'WHERE {where_str}'

_query = f"""
<<<<<<< HEAD
{func_filter.func_implementation if func_filter else ''}
=======
{func_filter.fun_implementation if func_filter else ''}
>>>>>>> dev
WITH t AS (
SELECT {time_group.field}{time_group.separator} {fields_selected},
Expand Down
Loading

0 comments on commit 78f9f0d

Please sign in to comment.