1
+ import datetime
2
+ import json
3
+ import logging
4
+ from typing import Any
5
+
6
+ from db .python .gcp_connect import BqDbBase , PubSubConnection
7
+ from db .python .layers .bq_base import BqBaseLayer
8
+ from models .models import SearchItem , EtlSummary
9
+
10
+
11
+ def reformat_record (record ):
12
+ """Reformat record to be more readable"""
13
+
14
+ record ['details' ] = json .loads (record ['details' ])
15
+ record ['sample_record' ] = json .loads (record ['sample_record' ])
16
+
17
+ return EtlSummary (
18
+ request_id = record ['request_id' ],
19
+ last_run_at = record ['last_run_at' ],
20
+ status = record ['status' ],
21
+ source_type = record ['details' ]['source_type' ],
22
+ submitting_user = record ['details' ]['submitting_user' ],
23
+ parser_result = record ['details' ]['result' ],
24
+ )
25
+
26
+
27
+ class EtlLayer (BqBaseLayer ):
28
+ """Web layer"""
29
+
30
+ async def get_etl_summary (
31
+ self ,
32
+ grid_filter : list [SearchItem ],
33
+ token : int = 0 ,
34
+ limit : int = 20 ,
35
+ ) -> Any :
36
+ """
37
+ TODO
38
+ """
39
+ etlDb = EtlDb (self .connection )
40
+ return await etlDb .get_etl_summary ()
41
+
42
+
43
+ class EtlDb (BqDbBase ):
44
+ """Db layer for web related routes,"""
45
+
46
+ async def get_etl_summary (
47
+ self ,
48
+ ) -> list [EtlSummary ]:
49
+ """
50
+ TODO
51
+ """
52
+ return await self .get_report ()
53
+
54
+ async def get_report (
55
+ self , source_type : str = None , etl_status : str = None , start_dt : str = None
56
+ ):
57
+ """Get ETL report from BQ"""
58
+
59
+ # build query filter
60
+ query_filters = []
61
+
62
+ if source_type :
63
+ query_filters .append (
64
+ f'JSON_VALUE(details.source_type) LIKE "\/{ source_type } \/%"'
65
+ )
66
+
67
+ if start_dt :
68
+ query_filters .append (f'timestamp > "{ start_dt } "' )
69
+
70
+ # construct query filter
71
+ query_filter = ' AND' .join (query_filters )
72
+ if query_filter :
73
+ query_filter = 'WHERE ' + query_filter
74
+
75
+ # Status filter applied after grouping by request_id
76
+ # One rquest can have multiple runs, we are interested only in the last run
77
+ if etl_status :
78
+ status_filter = f'WHERE status = "{ etl_status .upper ()} "'
79
+ else :
80
+ status_filter = ''
81
+
82
+ # query BQ table
83
+ _query = f"""
84
+ WITH l AS (
85
+ SELECT request_id, max(timestamp) as last_time
86
+ FROM `{ self ._connection .gcp_project } .metamist.etl-logs`
87
+ { query_filter }
88
+ group by request_id
89
+ )
90
+ select logs.request_id, logs.timestamp as last_run_at,
91
+ logs.status, logs.details, d.body as sample_record
92
+ from l
93
+ inner join `{ self ._connection .gcp_project } .metamist.etl-logs` logs on
94
+ l.request_id = logs.request_id
95
+ and logs.timestamp = l.last_time
96
+ INNER JOIN `{ self ._connection .gcp_project } .metamist.etl-data` d on
97
+ d.request_id = logs.request_id
98
+ { status_filter }
99
+ """
100
+
101
+ query_job_result = self ._connection .connection .query (_query ).result ()
102
+ records = [reformat_record (dict (row )) for row in query_job_result ]
103
+ return records
104
+
105
+
106
+ class EtlPubSub :
107
+ """Etl Pub Sub wrapper"""
108
+
109
+ def __init__ (
110
+ self ,
111
+ connection : PubSubConnection ,
112
+ ):
113
+ self .connection = connection
114
+
115
+ async def publish (
116
+ self ,
117
+ msg : dict ,
118
+ ) -> str :
119
+ """
120
+ publish to pubsub, append user and timestampe to the message
121
+ """
122
+ msg ['timestamp' ] = datetime .datetime .utcnow ().isoformat ()
123
+ msg ['submitting_user' ] = self .connection .author
124
+
125
+ try :
126
+ self .connection .client .publish (self .connection .topic , json .dumps (msg ).encode ())
127
+ return 'submitted'
128
+ except Exception as e : # pylint: disable=broad-exception-caught
129
+ logging .error (f'Failed to publish to pubsub: { e } ' )
130
+
131
+ return 'failed'
132
+
0 commit comments