Skip to content

Commit

Permalink
Fixing linting and updating ETLAdmin page.
Browse files Browse the repository at this point in the history
  • Loading branch information
milo-hyben committed Sep 27, 2023
1 parent fa28e37 commit 2ab6dc8
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 74 deletions.
37 changes: 15 additions & 22 deletions api/routes/etl.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
"""
Web routes
"""
from typing import Any, Optional


from fastapi import APIRouter
from api.utils.db import (
BqConnection,
PubSubConnection,
get_projectless_bq_connection,
get_projectless_pubsub_connection
get_projectless_pubsub_connection,
)
from db.python.layers.etl import EtlLayer, EtlPubSub
from fastapi import APIRouter, Request
from models.models.etl import EtlRecord
from models.enums import EtlStatus
from models.models.etl import EtlRecord

router = APIRouter(prefix='/etl', tags=['etl'])

Expand All @@ -23,10 +21,10 @@ async def get_etl_record_by_id(
request_id: str,
connection: BqConnection = get_projectless_bq_connection,
):
"""Get ETL record by request_id """
"""Get ETL record by request_id"""
if not request_id:
raise ValueError('request_id must be provided')

etl_layer = EtlLayer(connection)
return await etl_layer.get_etl_record(request_id)

Expand All @@ -37,18 +35,15 @@ async def get_etl_record_by_id(
operation_id='getEtlSummary',
)
async def get_etl_summary(
request: Request,
source_type: str = None,
status: EtlStatus = None,
from_date: str = None,
connection: BqConnection = get_projectless_bq_connection,
) -> list[EtlRecord]:
"""Get ETL process summary by source_type / status / from_date"""

etl_layer = EtlLayer(connection)
summary = await etl_layer.get_etl_summary(
source_type, status, from_date
)
summary = await etl_layer.get_etl_summary(source_type, status, from_date)
return summary


Expand All @@ -58,17 +53,16 @@ async def get_etl_summary(
operation_id='etlResubmit',
)
async def etl_resubmit(
request: Request,
request_id: str = None,
bq_onnection: BqConnection = get_projectless_bq_connection,
pubsub_connection: PubSubConnection = get_projectless_pubsub_connection,
):
"""Resubmit record to Transform/Load process by request_id
request_id is output of Load process
Only already loaded requests can be resubmitted
{"request_id": "640e8f2e-4e20-4959-8620-7b7741265895"}
"""
if not request_id:
raise ValueError('request_id must be provided')
Expand All @@ -77,17 +71,16 @@ async def etl_resubmit(
etl_layer = EtlLayer(bq_onnection)
rec = await etl_layer.get_etl_record(request_id)
if rec is None:
raise ValueError(f"Invalid request_id: {request_id}")
raise ValueError(f'Invalid request_id: {request_id}')

# do nod reload already sucessfull records
if rec.status == EtlStatus.LOADED:
raise ValueError(f"Already sucessfully loaded request_id: {request_id}")
# only reload failed requests
if rec.status == EtlStatus.SUCCESS:
raise ValueError(f'Already sucessfully loaded request_id: {request_id}')

# only reload failed requests
pubsub = EtlPubSub(pubsub_connection)
return await pubsub.publish(
{
'request_id': request_id,
}
)

9 changes: 4 additions & 5 deletions db/python/gcp_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@
"""
import logging
import os
from typing import Optional

import google.cloud.bigquery as bq
from google.cloud import pubsub_v1
from db.python.utils import InternalError
import google.cloud.pubsub_v1 as pubsub_v1

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
Expand All @@ -23,7 +22,7 @@ def __init__(
self.gcp_project = os.getenv('METAMIST_INFRA_GCP_PROJECT')
self.connection: bq.Client = bq.Client(project=self.gcp_project)
self.author: str = author

@staticmethod
async def get_connection_no_project(author: str):
"""Get a db connection from a project and user"""
Expand Down Expand Up @@ -63,7 +62,7 @@ def __init__(
self.client: pubsub_v1.PublisherClient = pubsub_v1.PublisherClient()
self.author: str = author
self.topic: str = os.getenv('METAMIST_INFRA_PUBSUB_TOPIC')

@staticmethod
async def get_connection_no_project(author: str):
"""Get a pubsub connection from a project and user"""
Expand All @@ -72,5 +71,5 @@ async def get_connection_no_project(author: str):

# we don't authenticate project-less connection, but rely on the
# the endpoint to validate the resources

return PubSubConnection(author=author)
51 changes: 24 additions & 27 deletions db/python/layers/etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@
import logging
from typing import Any

from fastapi import HTTPException
from models.enums import EtlStatus
from models.models import EtlRecord

from db.python.gcp_connect import BqDbBase, PubSubConnection
from db.python.layers.bq_base import BqBaseLayer
from models.models import EtlRecord
from models.enums import EtlStatus


class EtlLayer(BqBaseLayer):
Expand All @@ -25,7 +24,7 @@ async def get_etl_summary(
"""
etlDb = EtlDb(self.connection)
return await etlDb.get_etl_summary(source_type, status, from_date)

async def get_etl_record(
self,
request_id: str,
Expand All @@ -44,31 +43,31 @@ async def get_etl_record(
self,
request_id: str,
) -> EtlRecord | None:

"""Get ETL record from BQ"""

_query = f"""
WITH l AS (
SELECT request_id, max(timestamp) as last_time
FROM `{self._connection.gcp_project}.metamist.etl-logs`
FROM `{self._connection.gcp_project}.metamist.etl-logs`
WHERE request_id = "{request_id}"
group by request_id
)
select logs.request_id, logs.timestamp as last_run_at,
logs.status, logs.details, d.body as sample_record
from l
from l
inner join `{self._connection.gcp_project}.metamist.etl-logs` logs on
l.request_id = logs.request_id
l.request_id = logs.request_id
and logs.timestamp = l.last_time
INNER JOIN `{self._connection.gcp_project}.metamist.etl-data` d on
d.request_id = logs.request_id
"""

query_job_result = list(self._connection.connection.query(_query).result())

if query_job_result:
return EtlRecord.from_json(dict(query_job_result[0]))

raise ValueError('No record found')


async def get_etl_summary(
self,
Expand All @@ -88,7 +87,7 @@ async def get_report(
from_date: str | None = None,
) -> list[EtlRecord]:
"""Get ETL report from BQ"""

# build query filter
query_filters = []

Expand Down Expand Up @@ -120,31 +119,29 @@ async def get_report(
_query = f"""
WITH l AS (
SELECT request_id, max(timestamp) as last_time
FROM `{self._connection.gcp_project}.metamist.etl-logs`
FROM `{self._connection.gcp_project}.metamist.etl-logs`
{query_filter}
group by request_id
)
select logs.request_id, logs.timestamp as last_run_at,
logs.status, logs.details, d.body as sample_record
from l
from l
inner join `{self._connection.gcp_project}.metamist.etl-logs` logs on
l.request_id = logs.request_id
l.request_id = logs.request_id
and logs.timestamp = l.last_time
INNER JOIN `{self._connection.gcp_project}.metamist.etl-data` d on
d.request_id = logs.request_id
{status_filter}
"""

print("\n=====================", _query, "\n=====================\n")


query_job_result = self._connection.connection.query(_query).result()
records = [EtlRecord.from_json(dict(row)) for row in query_job_result]
return records


class EtlPubSub:
"""Etl Pub Sub wrapper"""

def __init__(
self,
connection: PubSubConnection,
Expand All @@ -156,18 +153,18 @@ async def publish(
msg: dict,
) -> bool:
"""
publish to pubsub, append user and timestampe to the message
publish to pubsub, append user and timestampe to the message
"""
msg['timestamp'] = datetime.datetime.utcnow().isoformat()
msg['submitting_user'] = self.connection.author

print(msg)


try:
self.connection.client.publish(self.connection.topic, json.dumps(msg).encode())
self.connection.client.publish(
self.connection.topic, json.dumps(msg).encode()
)
logging.info(f'Published message to {self.connection.topic}.')
return True
except Exception as e: # pylint: disable=broad-exception-caught
logging.error(f'Failed to publish to pubsub: {e}')
return False

return False
2 changes: 1 addition & 1 deletion models/enums/etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ class EtlStatus(Enum):
"""Etl Status Enum"""

FAILED = 'FAILED'
SUCCESS = 'SUCCESS'
SUCCESS = 'SUCCESS'
18 changes: 9 additions & 9 deletions models/models/etl.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,35 @@
import datetime
import json
from models.base import SMBase
from models.enums import EtlStatus
from models.enums import EtlStatus



class EtlRecord(SMBase):
"""Return class for the ETL record"""

request_id: str
last_run_at: datetime.datetime | None
status: EtlStatus
source_type: str

submitting_user: str
parser_result: str

class Config:
"""Config for EtlRecord Response"""
orm_mode = True

@staticmethod
def from_json(record):
"""Create EtlRecord from json"""

record['details'] = json.loads(record['details'])
record['sample_record'] = json.loads(record['sample_record'])

return EtlRecord(
request_id=record['request_id'],
last_run_at=record['last_run_at'],
status=EtlStatus[str(record['status']).upper()],
last_run_at=record['last_run_at'],
status=EtlStatus[str(record['status']).upper()],
# remove leading backslash fromt the source_type
source_type=record['details']['source_type'].strip('/'),
submitting_user=record['details']['submitting_user'],
Expand Down
22 changes: 12 additions & 10 deletions web/src/pages/etl/EtlAdmin.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,28 @@
// Project ETL specific page

import * as React from 'react'
import { Message, Button, Checkbox, Input, InputProps } from 'semantic-ui-react'
import { Link } from 'react-router-dom'
import { EtlApi, EtlSummary } from '../../sm-api/api'
import { Button } from 'semantic-ui-react'
import { EtlApi, EtlRecord } from '../../sm-api/api'

const EtlAdmin = () => {
// requests by sample type?
// requests by loading status?
const [records, setRecords] = React.useState<EtlSummary[] | null>(null)
const [records, setRecords] = React.useState<EtlRecord[] | null>(null)
const [text, setText] = React.useState<string | null>(null)

const getEtlSummary = React.useCallback(async () => {
try {
const response = await new EtlApi().getEtlSummary([], undefined, undefined, undefined)
const response = await new EtlApi().getEtlSummary(undefined, undefined, undefined)
setRecords(response.data)
} catch (er: any) {
setText(`Failed with error: ${er.message}`)
}
}, [])

const reloadRequest = (request_id) => {
console.log('reloadRequest: ', request_id)
const resubmitRequest = (request_id: any) => {
console.log('resubmitRequest: ', request_id)
new EtlApi()
.etlReload(request_id)
.etlResubmit(request_id)
.then(() => getEtlSummary())
.catch((er) => setText(er.message))
}
Expand All @@ -50,8 +49,11 @@ const EtlAdmin = () => {
<td style={{ padding: '15px' }}>{item.status}</td>
<td style={{ padding: '15px' }}>{item.parser_result}</td>
<td style={{ padding: '15px' }}>
<Button color="red" onClick={() => reloadRequest(item.request_id)}>
Reload
<Button
color="red"
onClick={() => resubmitRequest(item.request_id)}
>
Resubmit
</Button>
</td>
</tr>
Expand Down

0 comments on commit 2ab6dc8

Please sign in to comment.