forked from smandaric/bigquerylayers
-
Notifications
You must be signed in to change notification settings - Fork 1
/
background_tasks.py
341 lines (293 loc) · 15.2 KB
/
background_tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
import csv
import tempfile
import subprocess
import shutil
from qgis.core import QgsTask, QgsMessageLog, Qgis
class UpstreamTaskCanceled(Exception):
pass
class BaseQueryTask(QgsTask):
"""Here we subclass QgsTask"""
def __init__(self, desc, iface, base_query_job, query_progress_field, geometry_column_combo_box, base_query_elements, layer_import_elements, run_query_button):
QgsTask.__init__(self, desc)
self.iface = iface
self.base_query_job = base_query_job
self.exception = None
self.query_progress_field = query_progress_field
self.geometry_column_combo_box = geometry_column_combo_box
self.base_query_elements = base_query_elements
self.layer_import_elements = layer_import_elements
self.run_query_button = run_query_button
self.query_result = None
def run(self):
"""This function is where you do the 'heavy lifting' or implement
the task which you want to run in a background thread. This function
must return True or False and should only interact with the main thread
via signals"""
try:
QgsMessageLog.logMessage('In backgrond task', 'BigQuery Layers', Qgis.Info)
base_query_job = self.base_query_job.get()
self.query_result = base_query_job.result()
self.base_query_job.put(base_query_job)
QgsMessageLog.logMessage('Query complete', 'BigQuery Layers', Qgis.Info)
self.setProgress(100)
return True
except Exception as e:
self.exception = e
return True
def finished(self, result):
"""This function is called automatically when the task is completed and is
called from the main thread so it is safe to interact with the GUI etc here"""
QgsMessageLog.logMessage('Finished is called', 'BigQuery Layers', Qgis.Info)
if result is False:
self.iface.messageBar().pushMessage('Task was cancelled')
elif result is True and self.exception:
self.iface.messageBar().pushMessage('Query Failed: ' + self.exception.__repr__(), level=Qgis.Critical)
#QgsMessageLog.logMessage(self.exception.__repr__(), 'BigQuery Layers', Qgis.Critical)
else:
QgsMessageLog.logMessage('Query successfull', 'BigQuery Layers', Qgis.Info)
num_rows = self.query_result.total_rows
self.query_progress_field.setText('Rows returned: {}'.format(num_rows))
fields = [field.name for field in self.query_result.schema]
self.geometry_column_combo_box.addItems(fields)
for elm in self.base_query_elements:
elm.setEnabled(True)
for elm in self.layer_import_elements:
elm.setEnabled(True)
self.run_query_button.setText('Run query')
class RetrieveQueryResultTask(QgsTask):
"""Here we subclass QgsTask"""
def __init__(self, desc, iface, query_job, file_queue, elements_in_layer, upstream_taks_canceled):
QgsTask.__init__(self, desc)
self.iface = iface
self.query_job = query_job
self.file_queue = file_queue
self.exception = False
self.elements_in_layer = elements_in_layer
self.upstream_taks_canceled = upstream_taks_canceled
def run(self):
"""This function is where you do the 'heavy lifting' or implement
the task which you want to run in a background thread. This function
must return True or False and should only interact with the main thread
via signals"""
try:
upstream_taks_canceled = self.upstream_taks_canceled.get()
if upstream_taks_canceled:
raise UpstreamTaskCanceled
QgsMessageLog.logMessage('In write tempfile task', 'BigQuery Layers', Qgis.Info)
#query_job = self.query_job.get()
#self.query_job.put(query_job)
query_job = self.query_job.get()
self.query_job.put(query_job)
query_result = query_job.result()
schema_fields = [field.name for field in query_result.schema]
total_rows = query_result.total_rows
self.elements_in_layer.put(total_rows)
QgsMessageLog.logMessage('Total rows: '+str(total_rows), 'BigQuery Layers', Qgis.Info)
with tempfile.NamedTemporaryFile(mode='w', encoding='utf-8', delete=False) as fp:
filepath = fp.name
writer = csv.DictWriter(fp, fieldnames=schema_fields)
writer.writeheader()
progress_percent = 0
self.setProgress(progress_percent)
QgsMessageLog.logMessage('File path: '+ filepath, 'BigQuery Layers', Qgis.Info)
for i, row in enumerate(query_result, 1):
new_progress_percent = int(100 * (i / total_rows))
if new_progress_percent > progress_percent:
progress_percent = new_progress_percent
self.setProgress(progress_percent)
if self.isCanceled():
return False
writer.writerow(dict(row.items()))
self.file_queue.put(filepath)
QgsMessageLog.logMessage('Query complete', 'BigQuery Layers', Qgis.Info)
self.setProgress(100)
return True
except Exception as e:
self.exception = e
return True
def finished(self, result):
"""This function is called automatically when the task is completed and is
called from the main thread so it is safe to interact with the GUI etc here"""
QgsMessageLog.logMessage('Finished is called', 'BigQuery Layers', Qgis.Info)
if result is False:
self.iface.messageBar().pushMessage('Task was cancelled')
self.upstream_taks_canceled.put(True)
if result is True and isinstance(self.exception, UpstreamTaskCanceled):
self.upstream_taks_canceled.put(True)
elif result is True and self.exception:
QgsMessageLog.logMessage('Result retrival: '+self.exception.__repr__(), 'BigQuery Layers', Qgis.Critical)
self.upstream_taks_canceled.put(True)
else:
QgsMessageLog.logMessage('Finished import', 'BigQuery Layers', Qgis.Info)
self.upstream_taks_canceled.put(False)
class ConvertToGeopackage(QgsTask):
"""Here we subclass QgsTask"""
def __init__(self, desc, iface, geometry_column, file_queue, upstream_taks_canceled):
QgsTask.__init__(self, desc, QgsTask.CanCancel)
self.iface = iface
self.geometry_column = geometry_column
self.file_queue = file_queue
self.exception = None
self.upstream_taks_canceled = upstream_taks_canceled
def run(self):
"""This function is where you do the 'heavy lifting' or implement
the task which you want to run in a background thread. This function
must return True or False and should only interact with the main thread
via signals"""
try:
upstream_taks_canceled = self.upstream_taks_canceled.get()
if upstream_taks_canceled:
raise UpstreamTaskCanceled
QgsMessageLog.logMessage('Running conversion', 'BigQuery Layers', Qgis.Info)
input_file_path = self.file_queue.get()
temp_file_path = input_file_path + '.csv'
output_file_path = input_file_path + '.gpkg'
ogr2ogr_executable = shutil.which('ogr2ogr')
if not ogr2ogr_executable:
QgsMessageLog.logMessage('ogr2ogr executable not found', 'BigQuery Layers', Qgis.Info)
return False
cp_params = [
'cp',
input_file_path,
temp_file_path
]
ogr2ogr_params = [
ogr2ogr_executable,
'-f', 'GPKG', output_file_path,
temp_file_path,
'-oo', 'HEADERS=YES',
'-oo', 'GEOM_POSSIBLE_NAMES={}'.format(self.geometry_column),
'-a_srs', 'EPSG:4326'
]
subprocess.check_output(cp_params)
subprocess.check_output(ogr2ogr_params)
self.file_queue.put(output_file_path)
return True
except Exception as e:
self.exception = e
return True
def finished(self, result):
"""This function is called automatically when the task is completed and is
called from the main thread so it is safe to interact with the GUI etc here"""
if result is False:
self.iface.messageBar().pushMessage('Task was cancelled')
self.upstream_taks_canceled.put(True)
if result is True and isinstance(self.exception, UpstreamTaskCanceled):
self.upstream_taks_canceled.put(True)
elif result is True and self.exception:
QgsMessageLog.logMessage('Result retrival: '+self.exception.__repr__(), 'BigQuery Layers', Qgis.Critical)
self.upstream_taks_canceled.put(True)
else:
QgsMessageLog.logMessage('Finished conversion', 'BigQuery Layers', Qgis.Info)
self.upstream_taks_canceled.put(False)
class LayerImportTask(QgsTask):
def __init__(self, desc, iface, layer_file_path, add_all_button, add_extents_button, base_query_elements, layer_import_elements, elements_in_layer, upstream_taks_canceled):
QgsTask.__init__(self, desc)
self.iface = iface
self.layer_file_path = layer_file_path
self.exception = None
self.add_all_button = add_all_button
self.add_extents_button = add_extents_button
self.base_query_elements = base_query_elements
self.layer_import_elements = layer_import_elements
self.elements_in_layer = elements_in_layer
self.upstream_taks_canceled = upstream_taks_canceled
def run(self):
try:
upstream_taks_canceled = self.upstream_taks_canceled.get()
if upstream_taks_canceled:
raise UpstreamTaskCanceled
return True
except Exception as e:
self.exception = e
return True
def finished(self, result):
QgsMessageLog.logMessage('LayerImportTask has finished', 'BigQuery Layers', Qgis.Info)
if result is True and isinstance(self.exception, UpstreamTaskCanceled):
self.iface.messageBar().pushMessage('Layer import failed. See logs for more info', level=Qgis.Critical)
if result is True and not self.exception:
gpkg_layer_name = self.layer_file_path.get()
try:
gpkg_layer = gpkg_layer_name + '|layername=' + gpkg_layer_name.split('/')[-1].split('.')[0]
display_name = 'BigQuery layer'
vlayer = self.iface.addVectorLayer(gpkg_layer, display_name, 'ogr')
if vlayer:
#elements_added = BigQueryConnector.num_rows(self.bq.client, self.bq.last_query_run)
elements_in_layer = self.elements_in_layer.get()
self.iface.messageBar().pushMessage('BigQuery Layers', 'Added {} elements'.format(elements_in_layer),
level=Qgis.Info)
except Exception as e:
self.iface.messageBar().pushMessage('Layer import failed: ' + self.exception.__repr__(), level=Qgis.Critical)
self.add_all_button.setText('Add all')
self.add_extents_button.setText('Add window extents')
for elm in self.base_query_elements + self.layer_import_elements:
elm.setEnabled(True)
class ExtentsQueryTask(QgsTask):
"""Here we subclass QgsTask"""
def __init__(self, desc, iface, client, base_query_job, extent_query_job, extent, geo_field, upstream_taks_canceled):
QgsTask.__init__(self, desc)
self.iface = iface
self.client = client
self.base_query_job = base_query_job
self.extent_query_job = extent_query_job
self.extent = extent
self.geo_field = geo_field
self.exception = None
self.upstream_taks_canceled = upstream_taks_canceled
def run(self):
"""This function is where you do the 'heavy lifting' or implement
the task which you want to run in a background thread. This function
must return True or False and should only interact with the main thread
via signals"""
try:
upstream_taks_canceled = self.upstream_taks_canceled.get()
if upstream_taks_canceled:
raise UpstreamTaskCanceled
QgsMessageLog.logMessage('In ExentsQueryTask', 'BigQuery Layers', Qgis.Info)
base_query_job = self.base_query_job.get()
self.base_query_job.put(base_query_job)
base_query_table_path = '.'.join([self.client.project,
base_query_job.destination.dataset_id,
base_query_job.destination.table_id])
base_query_table = self.client.get_table(base_query_table_path)
base_table_geo_field = [field for field in base_query_table.schema if field.name == self.geo_field][0]
if base_table_geo_field.field_type == 'GEOGRAPHY':
q = """SELECT
*
FROM
`{}`
WHERE
ST_INTERSECTS({}, ST_GEOGFROMTEXT('{}'))""".format(base_query_table_path,
base_table_geo_field.name,
self.extent)
else:
q = """SELECT
*
FROM
`{}`
WHERE
ST_INTERSECTS(ST_GEOGFROMTEXT({}), ST_GEOGFROMTEXT('{}'))""".format(base_query_table_path,
base_table_geo_field.name,
self.extent)
extent_query_job = self.client.query(q)
extent_query_results = extent_query_job.result()
self.extent_query_job.put(extent_query_job)
return True
except Exception as e:
self.exception = e
return True
def finished(self, result):
"""This function is called automatically when the task is completed and is
called from the main thread so it is safe to interact with the GUI etc here"""
QgsMessageLog.logMessage('Finished is called', 'BigQuery Layers', Qgis.Info)
if result is False:
self.iface.messageBar().pushMessage('Task was cancelled')
self.upstream_taks_canceled.put(True)
if result is True and isinstance(self.exception, UpstreamTaskCanceled):
self.upstream_taks_canceled.put(True)
elif result is True and self.exception:
QgsMessageLog.logMessage('Extent query task: ' + self.exception.__repr__(), 'BigQuery Layers', Qgis.Critical)
self.upstream_taks_canceled.put(True)
else:
QgsMessageLog.logMessage('Extents query completed successfully', 'BigQuery Layers', Qgis.Info)
self.upstream_taks_canceled.put(False)