From 18a91c27e7149c05764e070729d3889dbfff00d3 Mon Sep 17 00:00:00 2001 From: "Liu, Yan Y" Date: Sun, 27 Feb 2022 18:06:08 -0500 Subject: [PATCH] fixed issue #544: split_flows.py: tiny split reaches caused by mismatch of flowline and lake polygon --- src/split_flows.py | 273 +++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 261 insertions(+), 12 deletions(-) diff --git a/src/split_flows.py b/src/split_flows.py index 4dff74fe0..a04d0e7f5 100755 --- a/src/split_flows.py +++ b/src/split_flows.py @@ -24,7 +24,242 @@ import build_stream_traversal from utils.shared_functions import getDriver, mem_profile from utils.shared_variables import FIM_ID - +import shapely # ornl +import shapely.geometry # ornl +import rtree # ornl + +# ornl +def connect_seg_by_lake(lakeseq0): + ''' + lakeseq0 is indexed by flowlines and the value of each element is + associated lake id. it is likely that a stream line goes in and out + of a lake polygon multiple times, which produces a subsequence like + 0,a,a,0,0,a,0,a,0. The intention of lake overlay is to split an + overlapped stream line into at most three parts: the part to the + inlet to the lake, the part in the lake, and the part from the outlet + of the lake. this function converts the above subsequence to + 0,a,a,a,a,a,a,a,0 so as to label lake id to consecutive segments + that belong to that lake. + observations: let's add a virtual lake id to the beginning -1 + 1. any lake seg begins with that lake id. no zeros in front + (otherwise it is not the beginning) + 2. any lake subseq ends with that lake id (zero cannot be at the end) + 3. the code is to implement the lake subseq matching regular expression + a(0*a+)*0*b + + Parameters + ---------- + lakeseq0: array_like + lakeseq0 is indexed by flowlines and the value of each element is + associated lake id. + + Returns + ------- + array_like + the new sequence with relabeled lake id + ''' + lakeseq = lakeseq0.copy() + curlake = -1 # suppose we start from late id -1 + numout = 0 # num of segments outside of the lake but need to be label with this lake id + for i in range(len(lakeseq)): # now look next + l = lakeseq[i] + if l == 0: # find more outside edges + numout += 1 + elif l == curlake: # belong to the same lake + for j in range(numout): # fill in-between with the same lake id + lakeseq[i-j-1] = curlake + numout = 0 # reset counter + else: # a new lake. already marked curlake range, outside segs are marked, too. just restart + curlake = l + numout = 0 + + return lakeseq + +# ornl +def split_through_polygons(flowline, ol_polygons, ol_lakes): + ''' + Split a flowline into segments that overlap multiple lakes. + usually, a flowline may overlap with only one lake. this + function is designed to handle long flowlines. + when a flowline goes through an overlay of multiple polygons, + the intention is to keep the segment in any polygon + as a single segment. + assumpution: polygons do not overlap. + + Parameters + flowline : LineString + the flowline LineString + ol_polygons: array_like + the list of lake polygons + ol_lakes: array_like + the list of lake IDs corresponding to lake polygons + + Returns + ------- + tuple + ([split flow LineStrings], [lake IDs]) + ''' + # use geopandas overlay's identity method to chop flowline + gdf_flow = gpd.GeoDataFrame({'geometry': gpd.GeoSeries([flowline]), 'flow':[1]}) + gdf_lake = gpd.GeoDataFrame({'geometry': gpd.GeoSeries(ol_polygons), 'lake':ol_lakes}) + gdf_ol_ident = gpd.overlay(gdf_flow, gdf_lake, how='identity') + #print('identity overlap generated', gdf_ol_ident.shape[0], 'segments') + gdf_ol_ident = gdf_ol_ident.loc[~gdf_ol_ident.is_empty, :] + #print('after removing empty geom, there are', gdf_ol_ident.shape[0], 'segments') + gdf_ol_ident = gdf_ol_ident.explode().reset_index(drop=True) + num_edges = gdf_ol_ident.shape[0] + if num_edges == 1: # no split, accelerate by skipping the rest + lakeid = gdf_ol_ident.iloc[0]['lake'] + if pd.isna(lakeid): + lakeid = -999 + return [flowline], [lakeid] + + # hash segment end points and segments each point is connected to + node2edges = {} + inlake = np.zeros(num_edges, dtype=np.int32) + for i, (flow, lake, geom) in gdf_ol_ident.iterrows(): + for p in [geom.coords[0], geom.coords[-1]]: + if not p in node2edges: + node2edges[p] = [i] + else: + node2edges[p].append(i) + if not pd.isna(lake): # part of lake + inlake[i] = lake # lake id needs to >0 + + # order segments + h_edges = np.zeros(num_edges, dtype=np.int8) # record if an edge has been visited + inlet = flowline.coords[0] + inlets = {inlet} # start from flowline's first end point + ordered_edges = [] + for i in range(num_edges): + edges = node2edges[inlet] + for edge in edges: + if h_edges[edge] == 0: # not visited before + ordered_edges.append(edge) # add segment to ordered list + h_edges[edge] = 1 # visited + geom = gdf_ol_ident.iloc[edge]['geometry'] + outlet = None + for p in [geom.coords[0], geom.coords[-1]]: + if not p in inlets: # outlet, i.e., next inlet + outlet = p + break + if outlet is None: + print('ERROR could not find outlet') + else: + inlet = outlet + inlets.add(inlet) + + # create splits + lakeseq = [inlake[i] for i in ordered_edges] + newseq = connect_seg_by_lake(lakeseq) # label consecutive segments using their lake ids + #print('lakeseq', lakeseq) + #print('lakeseq (labeled)', newseq) + # now merge segments into line geometry + newseq += [-1] # to handle the end + lakeids = [] + flow_splits = [] + start = 0 + lakeid = newseq[0] + for i in range(len(newseq)): + if newseq[i] != lakeid: # merge prev segments + lakeids.append(lakeid) + combined_seg = [pnt for geom in gdf_ol_ident.iloc[ordered_edges[start:i]]['geometry'] for pnt in geom.coords[:-1]] + combined_seg.append(gdf_ol_ident.iloc[ordered_edges[i-1]]['geometry'].coords[-1]) + flow_splits.append(shapely.geometry.LineString(combined_seg)) + start = i + lakeid = newseq[i] + + return flow_splits, lakeids # lakeid == 0 if outside of the lake + +# ornl +def construct_lakes_rtree(lakes, fn_lakeid): + ''' + Construct r-tree for lakes in order to speed up queries + + Parameters + ---------- + lakes : GeoDataFrame + lakes GeoDataFrame + fn_lakeid : str + field(column) name of the lake ID + + Returns + ------- + rtree + the constructed lakes rtree + ''' + def generate_items(): + index = 0 + for i, row in lakes.iterrows(): + box = row['geometry'].bounds + yield (index, box, (i, row[fn_lakeid], row['geometry'])) + index += 1 + return rtree.index.Index(generate_items()) + +# ornl +def split_flows_on_lakes(flows, lakes, fn_lakeid, fn_new_lakeid): + ''' + This function replaces the original overlay logic on lakes. + + Parameters + ---------- + flows : GeoDataFrame + flows data frame + lakes : GeoDataFrame + lakes data frame + fn_lakeid : str + field name of the lake ID in the input lakes data frame + fn_new_lakeid : str + field name of the lake ID in the output lakes data frame + + Returns + ------- + GeoDataFrame + The split flowline data frame + ''' + if lakes is None or len(lakes) <= 0: # no need to split + flows[fn_new_lakeid] = -999 # no lake + return flows + # build rtree on lake bboxes + searchtree = construct_lakes_rtree(lakes, fn_lakeid) + # split each flowline on lakes + l_flows = [] + l_lakeids = [] + count_rtree_hits = 0 + count_splits = 0 + count_split_flowlines = 0 + count_flowlines_tosplit = 0 + for i, row in flows.iterrows(): + flowline = row['geometry'] + hits = searchtree.intersection(flowline.bounds, objects='raw') + ol_lakes = [] + ol_polygons = [] + for j, lakeid, lakegeom in hits: + ol_lakes.append(lakeid) + ol_polygons.append(lakegeom) + if len(ol_polygons) == 0: # no hits + l_flows += [flowline] + l_lakeids += [-999] + continue + count_rtree_hits += len(ol_polygons) + count_flowlines_tosplit += 1 + # split flows by lake polygons + flow_splits, lakeids = split_through_polygons(flowline, ol_polygons, ol_lakes) + lakeids = [-999 if l==0 else l for l in lakeids] + l_flows += flow_splits + l_lakeids += lakeids + if len(flow_splits)>1: + count_splits += len(flow_splits) + count_split_flowlines += 1 + print('split_flows_on_lakes: rtree efficiency:', 'space:', len(flows), 'x', len(lakes), '=', len(flows)*len(lakes), + 'match rate:', count_rtree_hits, '/', len(flows)*len(lakes), '=', count_rtree_hits/(len(flows)*len(lakes))) + print('split_flows_on_lakes: split stats:', 'flowlines_considered', count_flowlines_tosplit, + 'split_ratio:', count_splits, '/', count_split_flowlines, '=', count_splits/count_split_flowlines ) + print('split_flows_on_lakes: preserved stats:', 'not_touched:', (len(flows) - count_flowlines_tosplit), + 'no_split:', count_flowlines_tosplit - count_split_flowlines) + return gpd.GeoDataFrame({'geometry': l_flows, fn_new_lakeid: l_lakeids}, crs=flows.crs, geometry='geometry') + +# ornl @mem_profile def split_flows(max_length, slope_min, lakes_buffer_input, flows_filename, dem_filename, split_flows_filename, split_points_filename, wbd8_clp_filename, lakes_filename): wbd = gpd.read_file(wbd8_clp_filename) @@ -67,17 +302,27 @@ def split_flows(max_length, slope_min, lakes_buffer_input, flows_filename, dem_f print ('splitting stream segments at ' + str(len(lakes)) + ' waterbodies') #create splits at lake boundaries lakes = lakes.filter(items=['newID', 'geometry']) - lakes = lakes.set_index('newID') - flows = gpd.overlay(flows, lakes, how='union').explode().reset_index(drop=True) - lakes_buffer = lakes.copy() - lakes_buffer['geometry'] = lakes.buffer(lakes_buffer_input) # adding X meter buffer for spatial join comparison (currently using 20meters) + # remove empty geometries # ornl + flows = flows.loc[~flows.is_empty,:] # ornl: this is necessary + flows = split_flows_on_lakes(flows, lakes, 'newID', 'LakeID') # ornl + + #lakes = lakes.set_index('newID') # ornl + #flows = gpd.overlay(flows, lakes, how='union').explode().reset_index(drop=True) # ornl + #lakes_buffer = lakes.copy() # onrl + #lakes_buffer['geometry'] = lakes.buffer(lakes_buffer_input) # adding X meter buffer for spatial join comparison (currently using 20meters) # ornl + else: # ornl + flows['LakeID'] = -999 # ornl print ('splitting ' + str(len(flows)) + ' stream segments based on ' + str(max_length) + ' m max length') # remove empty geometries flows = flows.loc[~flows.is_empty,:] - for i,lineString in tqdm(enumerate(flows.geometry),total=len(flows.geometry)): + l_lakeid = [] # ornl + #for i,lineString in tqdm(enumerate(flows.geometry),total=len(flows.geometry)): # ornl + for i,row in flows.iterrows(): # ornl + lineString = row['geometry'] # ornl + lakeid = row['LakeID'] # ornl # Reverse geometry order (necessary for BurnLines) lineString = LineString(lineString.coords[::-1]) @@ -88,6 +333,7 @@ def split_flows(max_length, slope_min, lakes_buffer_input, flows_filename, dem_f # existing reaches of less than max_length if lineString.length < max_length: split_flows = split_flows + [lineString] + l_lakeid += [lakeid] # ornl line_points = [point for point in zip(*lineString.coords.xy)] # Calculate channel slope @@ -125,6 +371,7 @@ def split_flows(max_length, slope_min, lakes_buffer_input, flows_filename, dem_f splitLineString = LineString(cumulative_line) split_flows = split_flows + [splitLineString] + l_lakeid += [lakeid] # ornl # Calculate channel slope start_point = cumulative_line[0]; end_point = cumulative_line[-1] @@ -144,6 +391,7 @@ def split_flows(max_length, slope_min, lakes_buffer_input, flows_filename, dem_f splitLineString = LineString(cumulative_line) split_flows = split_flows + [splitLineString] + l_lakeid += [lakeid] # ornl # Calculate channel slope start_point = cumulative_line[0]; end_point = cumulative_line[-1] @@ -153,13 +401,14 @@ def split_flows(max_length, slope_min, lakes_buffer_input, flows_filename, dem_f slope = slope_min slopes = slopes + [slope] - split_flows_gdf = gpd.GeoDataFrame({'S0' : slopes ,'geometry':split_flows}, crs=flows.crs, geometry='geometry') + #split_flows_gdf = gpd.GeoDataFrame({'S0' : slopes ,'geometry':split_flows}, crs=flows.crs, geometry='geometry') # ornl + split_flows_gdf = gpd.GeoDataFrame({'S0' : slopes ,'geometry':split_flows, 'LakeID': l_lakeid}, crs=flows.crs, geometry='geometry') # ornl split_flows_gdf['LengthKm'] = split_flows_gdf.geometry.length * toMetersConversion - if lakes is not None: - split_flows_gdf = gpd.sjoin(split_flows_gdf, lakes_buffer, how='left', op='within') #options: intersects, within, contains, crosses - split_flows_gdf = split_flows_gdf.rename(columns={"index_right": "LakeID"}).fillna(-999) - else: - split_flows_gdf['LakeID'] = -999 + #if lakes is not None: # ornl + # split_flows_gdf = gpd.sjoin(split_flows_gdf, lakes_buffer, how='left', op='within') #options: intersects, within, contains, crosses # ornl + # split_flows_gdf = split_flows_gdf.rename(columns={"index_right": "LakeID"}).fillna(-999) # ornl + #else: # ornl + # split_flows_gdf['LakeID'] = -999 # ornl # need to figure out why so many duplicate stream segments for 04010101 FR split_flows_gdf = split_flows_gdf.drop_duplicates()