-
Notifications
You must be signed in to change notification settings - Fork 28
/
Copy pathfwdet_21.py
693 lines (526 loc) · 29.8 KB
/
fwdet_21.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
# -*- coding: utf-8 -*-
"""
2023-10-19 created by Seth Bryant
***************************************************************************
* *
* This program is free software; you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation; either version 2 of the License, or *
* (at your option) any later version. *
* *
***************************************************************************
"""
__author__ = 'Seth Bryant'
__date__ = 'October 2023'
__version__ = '2024.05.18'
import pprint, os, datetime, tempfile
from qgis import processing
from qgis.PyQt.QtCore import QCoreApplication
from qgis.core import (QgsProcessing,
QgsFeatureSink,
QgsProcessingException, #still works
QgsProcessingAlgorithm,
QgsProcessingParameterRasterLayer,
QgsProcessingParameterRasterDestination,
QgsProcessingParameterVectorLayer,
QgsProcessingParameterNumber,
QgsProcessingParameterString,
QgsRasterLayer ,
QgsCoordinateTransformContext,
QgsMapLayerStore,
QgsProcessingOutputLayerDefinition,
QgsProject,
QgsProcessingParameterFeatureSource,
QgsVectorLayer,
QgsProcessingFeatureSource,
)
from qgis.analysis import QgsNativeAlgorithms, QgsRasterCalculatorEntry, QgsRasterCalculator
#import pandas as pd
descriptions_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'descriptions')
class FwDET(QgsProcessingAlgorithm):
"""FwDET 2.1 QGIS processing algorithim.
see self.run_algo() for more documentation
"""
#input layers
INPUT_DEM = 'INPUT_DEM'
INUN_VLAY = 'INUN_VLAY'
#input parameters
numIterations = 'numIterations' #number of smoothing iterations
slopeTH = 'slopeTH' #filtering slope threshold
grow_metric='grow_metric'
#outputs
OUTPUT_WSH = 'water_depth'
OUTPUT_WSH_SMOOTH = 'water_depth_filtered'
OUTPUT_SHORE='boundary'
#options
grow_metric_d = {'euclidean': 0,'squared': 1,'maximum': 2,'manhattan': 3,'geodesic': 4}
def tr(self, string):
"""
Returns a translatable string with the self.tr() function.
"""
return QCoreApplication.translate('Processing', string)
def createInstance(self):
return FwDET()
def name(self):
"""
Returns the algorithm name, used for identifying the algorithm. This
string should be fixed for the algorithm, and must not be localised.
The name should be unique within each provider. Names should contain
lowercase alphanumeric characters only and no spaces or other
formatting characters.
"""
return 'fwdet_v21'
def displayName(self):
"""
Returns the translated algorithm name, which should be used for any
user-visible display of the algorithm name.
"""
return self.tr('Floodwater Depth Estimation Tool (FwDET) v2.1')
def group(self):
"""
Returns the name of the group this algorithm belongs to. This string
should be localised.
"""
return self.tr('FwDET')
def groupId(self):
"""
Returns the unique ID of the group this algorithm belongs to. This
string should be fixed for the algorithm, and must not be localised.
The group id should be unique within each provider. Group id should
contain lowercase alphanumeric characters only and no spaces or other
formatting characters.
"""
return 'fwdet'
def shortHelpString(self):
"""return the help string by loading from an HTML file"""
#get the filepath
fp = os.path.join(descriptions_dir, self.name()+'.html')
#load the html description file
with open(fp, 'r') as file:
return self.tr(file.read() + f'\n version {__version__}')
def initAlgorithm(self, config=None):
"""
Here we define the inputs and output of the algorithm, along
with some other properties.
"""
#=======================================================================
# control
#=======================================================================
#=======================================================================
# INPUTS-----------
#=======================================================================
self.addParameter(
QgsProcessingParameterRasterLayer(self.INPUT_DEM, self.tr('Terrain Raster (DEM)'))
)
self.addParameter(
QgsProcessingParameterFeatureSource(self.INUN_VLAY,self.tr('Inundation Polygon'),
types=[QgsProcessing.TypeVectorPolygon]
)
)
#=======================================================================
# pars--------
#=======================================================================
#add parameters
param = QgsProcessingParameterNumber(self.numIterations, 'Number of Smoothing Iterations',
type=QgsProcessingParameterNumber.Integer, minValue=0)
self.addParameter(param)
param = QgsProcessingParameterNumber(self.slopeTH, 'Slope Threshold (percent)',
type=QgsProcessingParameterNumber.Double, minValue=0, maxValue=100)
self.addParameter(param)
param = QgsProcessingParameterString(self.grow_metric, 'r.grow.distance metric', defaultValue='euclidean', optional=False)
param.setMetadata( {'widget_wrapper':
{ 'value_hints': list(self.grow_metric_d.keys()) }
})
self.addParameter(param)
#=======================================================================
# OUTPUTS------
#=======================================================================
self.addParameter(
QgsProcessingParameterRasterDestination(self.OUTPUT_SHORE, self.tr('Inundation Shore-Boundary Raster'),
optional=True)
)
self.addParameter(
QgsProcessingParameterRasterDestination(self.OUTPUT_WSH, self.tr('Interpolated Water Depths Raster'))
)
self.addParameter(
QgsProcessingParameterRasterDestination(self.OUTPUT_WSH_SMOOTH, self.tr('Interpolated Water Depths Raster (with Low-Pass filter)'),
optional=True)
)
def _init_algo(self, params, context, feedback):
"""common init for tests"""
self.proc_kwargs = dict(feedback=feedback, context=context, is_child_algorithm=True)
self.context, self.feedback, self.params = context, feedback, params
#self.feedback.pushInfo(f'initalized w/ v{__version__}')
def processAlgorithm(self, params, context, feedback):
"""
Here is where the processing itself takes place.
"""
#=======================================================================
# defaults
#=======================================================================
feedback.pushInfo(f'\n\nv{__version__} starting w/ \n%s\n\n'%(pprint.pformat(params, width=30)))
self._init_algo(params, context, feedback)
#=======================================================================
# retrieve inputs---
#=======================================================================
#=======================================================================
# DEM
#=======================================================================
def get_rlay(attn):
"""load a raster layer and do some checks"""
inrlay = self.parameterAsRasterLayer(params, getattr(self, attn), context)
if not isinstance(inrlay, QgsRasterLayer):
raise QgsProcessingException(f'bad type on {attn}')
return inrlay
input_dem = get_rlay(self.INPUT_DEM)
#=======================================================================
# inundation polygon
#=======================================================================
"""QGIS returns QgsProcessingFeatureSource for vector algo inputs
algos handle this fine, but we use some expanded features of QgsVectorLayer (e.g., extents)
Also, couldnt figure out a way to instance QgsProcessingFeatureSource in unit tests
as a workaround, we just convert this to a vector layer here"""
inun_fsource = self.parameterAsSource(params, self.INUN_VLAY, context)
#QgsProcessingFeatureSource
if not isinstance(inun_fsource, QgsProcessingFeatureSource):
raise QgsProcessingException(f'bad type on {self.INUN_VLAY}: {type(inun_fsource)}')
#convert to QgsVectorLayer
numfeatures = inun_fsource.featureCount()
feedback.pushInfo(f'converting \'{type(inun_fsource)}\' to layer w/ {numfeatures} feats')
inun_vlay_fp = self._algo('native:savefeatures', {'INPUT':params[self.INUN_VLAY], 'OUTPUT':'TEMPORARY_OUTPUT'})['OUTPUT']
inun_vlay = QgsVectorLayer(inun_vlay_fp, 'INUN_VLAY')
#print(inun_fsource)
#inun_vlay_fp = self._algo('native:fixgeometries', {'INPUT':inun_fsource, 'OUTPUT':'TEMPORARY_OUTPUT'})['OUTPUT']
inun_vlay = QgsVectorLayer(inun_vlay_fp, 'INUN_VLAY')
feedback.pushInfo('finished loading layers')
#=======================================================================
# params
#=======================================================================
numIterations = self.parameterAsInt(params, self.numIterations,context)
slopeTH = self.parameterAsDouble(params, self.slopeTH, context)
grow_metric = self.parameterAsString(params, self.grow_metric, context)
#feedback.pushInfo(f'running with \'{inun_vlay.name()}\'')
if feedback.isCanceled():
return {}
#=======================================================================
# exceute specified method----
#=======================================================================.
return self.run_algo(input_dem, inun_vlay, numIterations, slopeTH, grow_metric)
def run_algo(self, dem_rlay_raw, inun_vlay, numIterations, slopeTH, grow_distance,
#cost_raster=None,
):
"""generate gridded depths from inundation polygon
FwDET QGIS port from ArcMap script ./FwDET_2p1_Standalone.py
main steps:
1) clip and pre-check
2) compute the shore/boundary pixels (filtering, smoothing, etc.)
3) grow/extend the shore/boundary pixels onto the full domain
4) clip and subtract with DEM to compute depths
5) apply low-pass filter
Params
------------
cost_raster: QgsRasterLayer, optional
cost surface. if not provided, all 1s where DEM>0
NOT IMPLEMENTED: see note below
inun_vlay: QgsVectorLayer
inundation polygon
"""
feedback=self.feedback
res_d = dict()
#=======================================================================
# pre-check
#=======================================================================
assert isinstance(inun_vlay, QgsVectorLayer)
feedback.pushInfo(f'on {inun_vlay.name()}')
#vector geometry
for feature in inun_vlay.getFeatures():
geom = feature.geometry()
if not geom.isGeosValid():
"""better to let the user fix this first as the fix method may produce unexpected results"""
feedback.pushWarning(f'passed inundation polygon layer \'{inun_vlay.name()}\''+\
'has invalid geometries. try \'Fix Geometries\'?')
#CRS
if not inun_vlay.crs()==dem_rlay_raw.crs():
raise AssertionError(f'crs must match between DEM and inundation polygon')
if inun_vlay.crs().isGeographic():
feedback.pushWarning(f'{inun_vlay.name()}s CRS ({inun_vlay.crs()}) is Geographic. this may lead to unexpected results. consider re-projecting')
#extetns
if not dem_rlay_raw.extent().contains(inun_vlay.extent()):
feedback.pushWarning(f'inundation polygon extents are not contained with the DEM extents' +\
'\nthis may lead to unexpected results. \nconsider trimming the inundation polygon')
#=======================================================================
# clip
#=======================================================================
#clip DEM raster
#TODO: fix clipping
dem_rlay_fp = self._algo('gdal:cliprasterbyextent',
{ 'DATA_TYPE' : 0, 'EXTRA' : '',
'INPUT' : dem_rlay_raw, 'NODATA' : -9999,
'OUTPUT' : 'TEMPORARY_OUTPUT', 'OVERCRS' : False,
'PROJWIN' : get_extent_str(inun_vlay) })['OUTPUT']
dem_rlay = QgsRasterLayer(dem_rlay_fp, 'DEM_clipped')
#=======================================================================
# shore Line/boundary------
#=======================================================================
boundary = self.CalculateBoundary(dem_rlay,inun_vlay, numIterations, slopeTH)
res_d[self.OUTPUT_SHORE] = boundary
#=======================================================================
# cost allocation/grow-----
#=======================================================================
"""2023-10-20: could not find an equivalent cost allocation function with pre-installed QGIS providers
(whitebox tools can do this.. but requires more install)
instaed, using 'grass7:r.grow.distance' which performs an equivalent operation with a neutral cost surface
this reduces some flexibility as the user can not specify their own cost surface
this was the same method adopted by the original FwDET 2.0 processing script (FwDET_QGIS_plugin.py)
using the same naming convention as
"""
#=======================================================================
# cost surface (grow boundarY)
#=======================================================================
#=======================================================================
# if cost_raster is None:
# feedback.pushInfo(f'no cost raster provided... computing from DEM > 0')
# cost_fp = self._gdal_calc({'FORMULA':'A > 0', 'INPUT_A':dem_rlay, 'BAND_A':1, 'NO_DATA':0.0,
# 'RTYPE':5, 'OUTPUT':'TEMPORARY_OUTPUT'})
#
# #cost_raster = QgsRasterLayer(cost_fp, 'cost_raster')
#=======================================================================
#cost allocation
feedback.pushInfo(f'growing w/ \n boundary: {boundary}')
cost_alloc = self._algo('grass7:r.grow.distance',
{ '-m' : False, #Output distances in meters instead of map units
'-n' : False, #Calculate distance to nearest NULL cell
'GRASS_RASTER_FORMAT_META' : '', 'GRASS_RASTER_FORMAT_OPT' : '', 'GRASS_REGION_CELLSIZE_PARAMETER' : 0, 'GRASS_REGION_PARAMETER' : None,
'distance' : 'TEMPORARY_OUTPUT',
'input' : boundary,
'metric' :self.grow_metric_d[grow_distance],
'value' : 'TEMPORARY_OUTPUT' })['value']
assert os.path.exists(cost_alloc), f'grass7:r.grow.distance failed to produce a result on \n {boundary}'
#=======================================================================
# clip/filter grown boundary w/ DEM-----
#=======================================================================
feedback.pushInfo(f'computing water_depths on DEM\n\n')
#compute difference
diff_rlay = self._gdal_calc({'FORMULA':'A - B',
'INPUT_A':cost_alloc, 'BAND_A':1, 'INPUT_B':dem_rlay, 'BAND_B':1,
'NO_DATA':-9999,'OUTPUT':'TEMPORARY_OUTPUT', 'RTYPE':5})
#rasterize inundation
inun_rlay = self._algo('gdal:rasterize',
{ 'BURN' : 1, 'DATA_TYPE' : 5,
'EXTENT' : get_extent_str(dem_rlay),
#'EXTENT':'-80.118404571,-79.972518169,35.048219968,35.201742050 [EPSG:4326]',
'EXTRA' : '', 'FIELD' : '', 'INIT' : None,
'INPUT' :inun_vlay, 'INVERT' : False,'NODATA' : 0, 'OPTIONS' : '',
'OUTPUT' : 'TEMPORARY_OUTPUT', 'USE_Z' : False,
'UNITS' : 0,#pixels
'WIDTH' : dem_rlay.width(), 'HEIGHT' : dem_rlay.height(),}
)['OUTPUT']
#mask negatives and inundation
water_depth = self._gdal_calc({'FORMULA':'A * (A > 0) * (B == 1)',
'INPUT_A':diff_rlay, 'BAND_A':1,
#'INPUT_B':dem_rlay, 'BAND_B':1,
'INPUT_B':inun_rlay, 'BAND_B':1,
'NO_DATA':0,'OUTPUT':self._get_out(self.OUTPUT_WSH), 'RTYPE':5})
res_d[self.OUTPUT_WSH] = water_depth
#=======================================================================
# #low-pass filter-----
#=======================================================================
if self.OUTPUT_WSH_SMOOTH in self.params:
feedback.pushInfo(f'applying low-pass filter to {water_depth}\n\n')
#3x3 kernal filter
wd_smooth_fp = self._r_neighbors(water_depth, neighborhood_size=3, method='average')
#mask
res_d[self.OUTPUT_WSH_SMOOTH] = self._gdal_calc(
{'FORMULA':'A*(B > 0)',
'INPUT_A':wd_smooth_fp, 'BAND_A':1, 'INPUT_B':water_depth, 'BAND_B':1,
'NO_DATA':0,'OUTPUT':self._get_out(self.OUTPUT_WSH_SMOOTH), 'RTYPE':5})
return res_d
def CalculateBoundary(self, dem_rlay, inun_vlay, numIterations, slopeTH,
neighborhood_size=5,
):
"""build, smooth, and filter the shore/boundary raster
TODO: add some pre-clipping
Params
---------
numIterations: int
number of smoothing iterations
slopeTH: float
threshold for slope filter (percent)
neighborhood_size: int
size of neighbourhood for smoothing kernal
"""
feedback=self.feedback
assert isinstance(dem_rlay, QgsRasterLayer)
#=======================================================================
# rastesrize inundatin boundary
#=======================================================================
feedback.pushInfo(f'rasterizing inundation boundary\n\n')
#convert polygons to lines
polyline = self._algo('native:polygonstolines',
{'INPUT':inun_vlay, 'OUTPUT':tfp('.gpkg')})['OUTPUT']
#rasterize
raster_polyline = self._algo('gdal:rasterize',
{ 'BURN' : 1, 'DATA_TYPE' : 5,
'EXTENT' : get_extent_str(dem_rlay),
#'EXTENT':'-80.118404571,-79.972518169,35.048219968,35.201742050 [EPSG:4326]',
'EXTRA' : '', 'FIELD' : '', 'INIT' : None,
'INPUT' :polyline, 'INVERT' : False,'NODATA' : 0, 'OPTIONS' : '',
'OUTPUT' : 'TEMPORARY_OUTPUT', 'USE_Z' : False,
'UNITS' : 0,#pixels
'WIDTH' : dem_rlay.width(), 'HEIGHT' : dem_rlay.height(),}
)['OUTPUT']
#=======================================================================
# extract shore values
#=======================================================================
feedback.pushInfo(f'sampling DEM values from shore line\n\n')
boundary_fp = self._gdal_calc({'FORMULA':'(B > 0)*A',
'INPUT_A':dem_rlay, 'BAND_A':1,
'INPUT_B':raster_polyline, 'BAND_B':1,
'NO_DATA':0.0,'RTYPE':5, 'OUTPUT':'TEMPORARY_OUTPUT'})
#=======================================================================
# smooth shore values
#=======================================================================
boundary_fp_i = boundary_fp
if numIterations>0:
feedback.pushInfo(f'smoothing shore values w/ {numIterations} iterations\n\n')
for i in range(numIterations):
feedback.pushInfo(f' {i+1}/{numIterations} smoothing')
#compute 5x5 neighbour
neigh_fp = self._r_neighbors(boundary_fp_i, neighborhood_size=neighborhood_size)
#re-mask to input
boundary_fp_i = self._gdal_calc_mask_apply(neigh_fp, raster_polyline)
feedback.pushInfo(f' finished smoothing w/ {boundary_fp_i}')
#=======================================================================
# handle ocean boundary
#=======================================================================
"""consider adding an option to skip this
doesnt do anything if there are no negative DEM values in the shore line"""
feedback.pushInfo(f'removing ocean boundary\n\n')
#'fuzzy' nearest neighbour lowering of the DEM
dem_min_fp = self._r_neighbors(dem_rlay, neighborhood_size=neighborhood_size,
circular=True, method='minimum')
#mask out any negatives from the boundary
boundary1 = self._gdal_calc({'FORMULA':'A*(B > 0)',
'INPUT_A':boundary_fp_i, 'BAND_A':1, 'INPUT_B':dem_min_fp, 'BAND_B':1,
'NO_DATA':0.0,'OUTPUT':'TEMPORARY_OUTPUT', 'RTYPE':5})
#=======================================================================
# slope filter
#=======================================================================
if slopeTH>0.0:
feedback.pushInfo(f'slope filtering w/ threshold={slopeTH} \n\n')
#boundary1 = QgsRasterLayer(boundary1, 'boundary1')
#compute slope
#doesnt seem to work... maybe crs problem?
#===================================================================
# slope_fp = self._algo('gdal:slope',
# { 'AS_PERCENT' : True, 'BAND' : 1, 'COMPUTE_EDGES' : False,
# 'EXTRA' : '', 'INPUT' :dem_rlay_c,
# 'OPTIONS' : '', 'OUTPUT' : 'TEMPORARY_OUTPUT', 'SCALE' : 1, 'ZEVENBERGEN' : False }
# )['OUTPUT']
#===================================================================
#ranges from 0-76 for test
slope_fp = self._algo('grass7:r.slope.aspect',
{ '-a' : True, '-e' : False, '-n' : False,
'GRASS_RASTER_FORMAT_META' : '', 'GRASS_RASTER_FORMAT_OPT' : '',
'GRASS_REGION_CELLSIZE_PARAMETER' : 0, 'GRASS_REGION_PARAMETER' : None,
'elevation' : dem_rlay,
'format' : 1, 'min_slope' : 0, 'precision' : 0, 'slope' : 'TEMPORARY_OUTPUT',
'zscale' : 1 })['slope']
#apply filter
boundary2 = self._gdal_calc({'FORMULA':f'B*(A > {slopeTH})',
'INPUT_A':slope_fp, 'BAND_A':1,
'INPUT_B':boundary1, 'BAND_B':1,
'NO_DATA':0.0,'OUTPUT':'TEMPORARY_OUTPUT', 'RTYPE':5})
else:
feedback.pushInfo(f'no slope threshold set to zero... skipping filtering')
boundary2=boundary1
#=======================================================================
# rounding
#=======================================================================
#mostly doing this to get a consistent output name
boundary3 = self._algo('native:roundrastervalues',
{ 'BAND' : 1, 'BASE_N' : 10, 'DECIMAL_PLACES' : 4,
'INPUT' : boundary2, 'OUTPUT' : self._get_out(self.OUTPUT_SHORE), 'ROUNDING_DIRECTION' : 1 }
)['OUTPUT']
feedback.pushInfo(f'finished constructing shore/boundary raster\n {boundary3} \n\n')
#assert isinstance(boundary2, QgsRasterLayer)
return boundary3
def _get_out(self, attn):
output= self.parameterAsOutputLayer(self.params, attn, self.context)
assert not output=='', f'bad parameter name \'{attn}\''
return output
def _gdal_calc(self, pars_d):
"""Raster calculator (gdal:rastercalculator)
- 0: Byte
- 1: Int16
- 2: UInt16
- 3: UInt32
- 4: Int32
- 5: Float32
- 6: Float64
"""
ofp = processing.run('gdal:rastercalculator', pars_d, **self.proc_kwargs)['OUTPUT']
if not os.path.exists(ofp):
raise QgsProcessingException('gdal:rastercalculator failed to get a result for \n%s'%pars_d['FORMULA'])
return ofp
def _gdal_calc_mask_apply(self, rlay, mask, OUTPUT='TEMPORARY_OUTPUT'):
return self._gdal_calc({'FORMULA':'A*B',
'INPUT_A':rlay, 'BAND_A':1, 'INPUT_B':mask, 'BAND_B':1,
'NO_DATA':-9999,'OUTPUT':OUTPUT, 'RTYPE':5})
def _gdal_calc_mask(self, rlay, OUTPUT='TEMPORARY_OUTPUT'):
return self._gdal_calc({'FORMULA':'A/A', 'INPUT_A':rlay, 'BAND_A':1,'NO_DATA':-9999,'OUTPUT':OUTPUT, 'RTYPE':5})
def _r_neighbors(self, input_rlay,
neighborhood_size=5,
method='average',
output='TEMPORARY_OUTPUT',
circular=False,
):
return self._algo('grass7:r.neighbors',
{'-a':False, '-c':circular, #Use circular neighborhood
'GRASS_REGION_CELLSIZE_PARAMETER':0, 'GRASS_REGION_PARAMETER':None,
'gauss':None,
'input':input_rlay,
'method':{'average':0, 'minimum':3}[method],
'output':output, 'selection':None,
'size':neighborhood_size})['output']
def _algo(self, algoName, pars_d):
return processing.run(algoName, pars_d, **self.proc_kwargs)
#===============================================================================
# HELPERS----------
#===============================================================================
"""cant do imports without creating a provider and a plugin"""
def now():
return datetime.datetime.now()
def tfp(suffix='.tif'):
return tempfile.NamedTemporaryFile(suffix=suffix).name
def get_resolution_ratio(
rlay_s1, #fine
rlay_s2, #coarse
):
s1 = rlay_get_resolution(rlay_s1)
s2 = rlay_get_resolution(rlay_s2)
assert (s2/ s1)>=1.0
return s2 / s1
def rlay_get_resolution(rlay):
return (rlay.rasterUnitsPerPixelY() + rlay.rasterUnitsPerPixelX())*0.5
def assert_extent_equal(left, right, msg='',):
""" extents check"""
assert isinstance(left, QgsRasterLayer), type(left).__name__+ '\n'+msg
assert isinstance(right, QgsRasterLayer), type(right).__name__+ '\n'+msg
__tracebackhide__ = True
#===========================================================================
# crs
#===========================================================================
if not left.crs()==right.crs():
raise QgsProcessingException('crs mismatch')
#===========================================================================
# extents
#===========================================================================
if not left.extent()==right.extent():
raise QgsProcessingException('%s != %s extent\n %s != %s\n '%(
left.name(), right.name(), left.extent(), right.extent()) +msg)
def get_extent_str(layer):
rect = layer.extent()
return '%.8f,%.8f,%.8f,%.8f [%s]'%(
rect.xMinimum(), rect.xMaximum(),rect.yMinimum(), rect.yMaximum(),
layer.crs().authid())