Skip to content

Commit 7ba26ac

Browse files
committed
Added WIP Billing API point to query Big table billing aggregate.
1 parent 0df355e commit 7ba26ac

File tree

8 files changed

+263
-0
lines changed

8 files changed

+263
-0
lines changed

api/routes/__init__.py

+1
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,4 @@
88
from api.routes.web import router as web_router
99
from api.routes.enum import router as enum_router
1010
from api.routes.sequencing_groups import router as sequencing_groups_router
11+
from api.routes.billing import router as billing_router

api/routes/billing.py

+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
"""
2+
Billing routes
3+
"""
4+
5+
import json
6+
from fastapi import APIRouter
7+
from api.utils.db import (
8+
BqConnection,
9+
get_projectless_bq_connection,
10+
)
11+
from db.python.layers.billing import BillingLayer
12+
from models.models.billing import BillingRecord
13+
14+
router = APIRouter(prefix='/billing', tags=['billing'])
15+
16+
17+
@router.post(
18+
'/search',
19+
response_model=list[BillingRecord],
20+
operation_id='postBillingSearch',
21+
)
22+
async def post_search(
23+
filter: str = None,
24+
connection: BqConnection = get_projectless_bq_connection,
25+
) -> list[BillingRecord]:
26+
"""Get Billing records"""
27+
28+
filter_dict = {}
29+
if filter:
30+
filter_dict = json.loads(filter)
31+
32+
billing_layer = BillingLayer(connection)
33+
records = await billing_layer.search_records(filter_dict)
34+
return records

api/utils/db.py

+13
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from api.settings import get_default_user
1111
from api.utils.gcp import email_from_id_token
1212
from db.python.connect import SMConnections, Connection
13+
from db.python.gcp_connect import BqConnection, PubSubConnection
1314

1415

1516
EXPECTED_AUDIENCE = getenv('SM_OAUTHAUDIENCE')
@@ -76,6 +77,16 @@ async def dependable_get_connection(author: str = Depends(authenticate)):
7677
return await SMConnections.get_connection_no_project(author)
7778

7879

80+
async def dependable_get_bq_connection(author: str = Depends(authenticate)):
81+
"""FastAPI handler for getting connection withOUT project"""
82+
return await BqConnection.get_connection_no_project(author)
83+
84+
85+
async def dependable_get_pubsub_connection(author: str = Depends(authenticate)):
86+
"""FastAPI handler for getting connection withOUT project"""
87+
return await PubSubConnection.get_connection_no_project(author)
88+
89+
7990
def validate_iap_jwt_and_get_email(iap_jwt, audience):
8091
"""
8192
Validate an IAP JWT and return email
@@ -102,3 +113,5 @@ def validate_iap_jwt_and_get_email(iap_jwt, audience):
102113
get_project_readonly_connection = Depends(dependable_get_readonly_project_connection)
103114
get_project_write_connection = Depends(dependable_get_write_project_connection)
104115
get_projectless_db_connection = Depends(dependable_get_connection)
116+
get_projectless_bq_connection = Depends(dependable_get_bq_connection)
117+
get_projectless_pubsub_connection = Depends(dependable_get_pubsub_connection)

db/python/gcp_connect.py

+76
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
"""
2+
Code for connecting to Big Query database
3+
"""
4+
import logging
5+
import os
6+
7+
import google.cloud.bigquery as bq
8+
from google.cloud import pubsub_v1
9+
from db.python.utils import InternalError
10+
11+
logging.basicConfig(level=logging.DEBUG)
12+
logger = logging.getLogger(__name__)
13+
14+
15+
class BqConnection:
16+
"""Stores a Big Query DB connection, project and author"""
17+
18+
def __init__(
19+
self,
20+
author: str,
21+
):
22+
self.gcp_project = os.getenv('METAMIST_GCP_PROJECT')
23+
self.connection: bq.Client = bq.Client(project=self.gcp_project)
24+
self.author: str = author
25+
26+
@staticmethod
27+
async def get_connection_no_project(author: str):
28+
"""Get a db connection from a project and user"""
29+
# maybe it makes sense to perform permission checks here too
30+
logger.debug(f'Authenticate no-project connection with {author!r}')
31+
32+
# we don't authenticate project-less connection, but rely on the
33+
# the endpoint to validate the resources
34+
35+
return BqConnection(author=author)
36+
37+
38+
class BqDbBase:
39+
"""Base class for big query database subclasses"""
40+
41+
def __init__(self, connection: BqConnection):
42+
if connection is None:
43+
raise InternalError(
44+
f'No connection was provided to the table {self.__class__.__name__!r}'
45+
)
46+
if not isinstance(connection, BqConnection):
47+
raise InternalError(
48+
f'Expected connection type Connection, received {type(connection)}, '
49+
f'did you mean to call self._connection?'
50+
)
51+
52+
self._connection = connection
53+
54+
55+
class PubSubConnection:
56+
"""Stores a PubSub connection, project and author"""
57+
58+
def __init__(
59+
self,
60+
author: str,
61+
topic: str,
62+
):
63+
self.client: pubsub_v1.PublisherClient = pubsub_v1.PublisherClient()
64+
self.author: str = author
65+
self.topic: str = os.getenv('METAMIST_GCP_PROJECT') + topic
66+
67+
@staticmethod
68+
async def get_connection_no_project(author: str):
69+
"""Get a pubsub connection from a project and user"""
70+
# maybe it makes sense to perform permission checks here too
71+
logger.debug(f'Authenticate no-project connection with {author!r}')
72+
73+
# we don't authenticate project-less connection, but rely on the
74+
# the endpoint to validate the resources
75+
76+
return PubSubConnection(author=author)

db/python/layers/billing.py

+56
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
from typing import Any
2+
3+
from models.models import BillingRecord
4+
5+
from db.python.gcp_connect import BqDbBase
6+
from db.python.layers.bq_base import BqBaseLayer
7+
8+
9+
class BillingLayer(BqBaseLayer):
10+
"""Billing layer"""
11+
12+
async def search_records(
13+
self,
14+
filter: dict[str, Any] | None = None,
15+
) -> list[BillingRecord] | None:
16+
"""
17+
Get ETL record for the given request_id
18+
"""
19+
billing_db = BillingDb(self.connection)
20+
return await billing_db.search_records(filter)
21+
22+
23+
class BillingDb(BqDbBase):
24+
"""Db layer for billing related routes"""
25+
26+
async def search_records(
27+
self,
28+
filter: dict[str, Any] | None = None,
29+
) -> list[BillingRecord] | None:
30+
"""Get Billing record from BQ"""
31+
32+
filter_parts = []
33+
34+
for k, v in filter.items():
35+
# value is a tupple of (operator, value)
36+
op_value = list(v)
37+
filter_parts.append(f'{k} {op_value[0]} {op_value[1]}')
38+
39+
filter = ' AND '.join(filter_parts)
40+
41+
if filter_parts:
42+
filter = f'WHERE {filter}'
43+
44+
_query = f"""
45+
SELECT *
46+
FROM `{self._connection.gcp_project}.billing_aggregate.aggregate`
47+
{filter}
48+
LIMIT 10
49+
"""
50+
51+
query_job_result = list(self._connection.connection.query(_query).result())
52+
53+
if query_job_result:
54+
return [BillingRecord.from_json(dict(row)) for row in query_job_result]
55+
56+
raise ValueError('No record found')

db/python/layers/bq_base.py

+13
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
from db.python.gcp_connect import BqConnection
2+
3+
4+
class BqBaseLayer:
5+
"""Base of all Big Query DB layers"""
6+
7+
def __init__(self, connection: BqConnection):
8+
self.connection = connection
9+
10+
@property
11+
def author(self):
12+
"""Get author from connection"""
13+
return self.connection.author

models/models/__init__.py

+1
Original file line numberDiff line numberDiff line change
@@ -59,3 +59,4 @@
5959
ProjectSummaryInternal,
6060
WebProject,
6161
)
62+
from models.models.billing import BillingRecord

models/models/billing.py

+69
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
import json
2+
from models.base import SMBase
3+
4+
5+
class BillingRecord(SMBase):
6+
"""Return class for the Billing record"""
7+
8+
id: str
9+
topic: str
10+
service_id: str
11+
service_description: str
12+
13+
gcp_project_id: str
14+
gcp_project_name: str
15+
16+
dataset: str
17+
batch_id: str
18+
job_id: str
19+
batch_name: str
20+
21+
cost: str
22+
currency: str
23+
invoice_month: str
24+
cost_type: str
25+
26+
class Config:
27+
"""Config for BillingRecord Response"""
28+
29+
orm_mode = True
30+
31+
@staticmethod
32+
def from_json(record):
33+
"""Create BillingRecord from json"""
34+
35+
print('\n\n ========================= \n\n')
36+
37+
print(type(record))
38+
print(record)
39+
40+
print('\n\n ========================= \n\n')
41+
42+
record['service'] = record['service'] if record['service'] else {}
43+
record['project'] = record['project'] if record['project'] else {}
44+
record['invoice'] = record['invoice'] if record['invoice'] else {}
45+
46+
labels = {}
47+
48+
if record['labels']:
49+
for lbl in record['labels']:
50+
labels[lbl['key']] = lbl['value']
51+
52+
record['labels'] = labels
53+
54+
return BillingRecord(
55+
id=record['id'],
56+
topic=record['topic'],
57+
service_id=record['service'].get('id', ''),
58+
service_description=record['service'].get('description', ''),
59+
gcp_project_id=record['project'].get('id', ''),
60+
gcp_project_name=record['project'].get('name', ''),
61+
dataset=record['labels'].get('dataset', ''),
62+
batch_id=record['labels'].get('batch_id', ''),
63+
job_id=record['labels'].get('job_id', ''),
64+
batch_name=record['labels'].get('batch_name', ''),
65+
cost=record['cost'],
66+
currency=record['currency'],
67+
invoice_month=record['invoice'].get('month', ''),
68+
cost_type=record['cost_type'],
69+
)

0 commit comments

Comments
 (0)