Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create nodes at lines intersections #51

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 46 additions & 23 deletions src/snkit/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ def split_edges_at_intersections(network, tolerance=1e-9):
split_edges = []
split_points = []
for edge in tqdm(
network.edges.itertuples(index=False), desc='split', total=len(network.edges)
network.edges.itertuples(index=False), desc="split", total=len(network.edges)
):
# note: the symmetry of intersection is not exploited here.
# (If A intersects B, then B intersects A)
Expand All @@ -315,28 +315,15 @@ def split_edges_at_intersections(network, tolerance=1e-9):
# these are *new geometrical objects* not in the sindex
# therefore they cannot be returned by internal _intersects*
intersection = Point()
# restrict intersection to crossing
# this excludes self overlap and end-points intersections
# WARNING: how are self-crossing handled? (e.g. loop-like)
if edge.geometry.crosses(hit):
# excluding itself
# note that __eq__ is used on purpose instead of equals()
# this is stricter: for geometries constructed in the same way
# WARNING: this excludes sensical self-intersections, to be solved.
if edge != hit:
intersection = edge.geometry.intersection(hit)
jmon12 marked this conversation as resolved.
Show resolved Hide resolved

# then keep track of the intersection points
geom_type = intersection.geom_type
if intersection.is_empty:
continue
elif geom_type == 'Point':
hits_points.append(intersection)
elif geom_type == 'MultiPoint':
for point in intersection.geoms:
hits_points.append(point)
elif geom_type == 'MultiLineString':
# when lines almost overlap for a stretch
for line in intersection.geoms:
start = Point(line.coords[0])
end = Point(line.coords[-1])
hits_points.append(start)
hits_points.append(end)
# then extract the intersection points
hits_points = intersection_endpoints(intersection, hits_points)

# store the split edges and intersection points
split_points.extend(hits_points)
Expand Down Expand Up @@ -660,6 +647,36 @@ def line_endpoints(line):
return start, end


def intersection_endpoints(geom, output=[]):
jmon12 marked this conversation as resolved.
Show resolved Hide resolved
"""Return the points from an intersection geometry

It extracts the starting and ending points of intersection
geometries recursively and appends them to `output`.
This doesn't handle polygons or collections of polygons.
"""
geom_type = geom.geom_type
if geom.is_empty:
pass
elif geom_type == "Point":
output.append(geom)
elif geom_type == "LineString":
start = Point(geom.coords[0])
end = Point(geom.coords[-1])
output.append(start)
output.append(end)
# recursively for collections of geometries
# note that there is no shared inheritance relationship
elif (
geom_type == "MultiPoint"
or geom_type == "MultiLineString"
or geom_type == "GeometryCollection"
):
for geom_ in geom.geoms:
output = intersection_endpoints(geom_, output)

return output


def split_edge_at_points(edge, points, tolerance=1e-9):
"""Split edge at point/multipoint"""
try:
Expand Down Expand Up @@ -766,14 +783,20 @@ def to_networkx(network, directed=False, weight_col=None):
G.add_nodes_from(network.nodes.id.to_list())

# add nodal positions from geom
for node_id, x, y in zip(network.nodes.id, network.nodes.geometry.x, network.nodes.geometry.y):
for node_id, x, y in zip(
network.nodes.id, network.nodes.geometry.x, network.nodes.geometry.y
):
G.nodes[node_id]["pos"] = (x, y)

# get edges from network data
if weight_col is None:
# default to geometry length
edges_as_list = list(
zip(network.edges.from_id, network.edges.to_id, network.edges.geometry.length)
zip(
network.edges.from_id,
network.edges.to_id,
network.edges.geometry.length,
)
)
else:
edges_as_list = list(
Expand Down
206 changes: 206 additions & 0 deletions tests/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"""
# pylint: disable=C0103
import warnings
from black import Line
jmon12 marked this conversation as resolved.
Show resolved Hide resolved

with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=DeprecationWarning)
Expand Down Expand Up @@ -183,6 +184,162 @@ def split_intersection():
return snkit.Network(edges=edges, nodes=nodes)


@fixture
def unsplit_multiple_intersections():
"""Multiple edges intersections, all edges unsplit
b f
| |
c--|---|--d
| |
a e
"""
a = Point((1, 0))
b = Point((1, 2))
c = Point((0, 1))
d = Point((3, 1))
e = Point((2, 0))
f = Point((2, 2))
nodes = GeoDataFrame(data={"geometry": [a, b, c, d, e, f]})
ab = LineString([a, b])
cd = LineString([c, d])
ef = LineString([e, f])
edges = GeoDataFrame(data={"geometry": [ab, cd, ef]})
return snkit.Network(edges=edges, nodes=nodes)


@fixture
def split_multiple_intersections():
"""Multiple edges intersections, all edges split
b f
| |
c--x---y--d
| |
a e
"""
a = Point((1, 0))
b = Point((1, 2))
c = Point((0, 1))
d = Point((3, 1))
e = Point((2, 0))
f = Point((2, 2))
x = Point((1, 1))
y = Point((2, 1))
nodes = GeoDataFrame(data={"geometry": [a, b, c, d, e, f, x, y]})
ax = LineString([a, x])
xb = LineString([x, b])
cx = LineString([c, x])
xy = LineString([x, y])
yd = LineString([y, d])
ey = LineString([e, y])
yf = LineString([y, f])
edges = GeoDataFrame(data={"geometry": [ax, xb, cx, xy, yd, ey, yf]})
return snkit.Network(edges=edges, nodes=nodes)


@fixture
def unsplit_overlapping_lines():
"""Overlapping lines for a section
c--d
||
b--|
|
a
"""
a = Point((1, 0))
b = Point((0, 1))
c = Point((1, 2))
d = Point((2, 2))
# x is just a construction point
x = Point((1, 1))
nodes = GeoDataFrame(data={"geometry": [a, b, c, d]})
ac = LineString([a, c])
bd = LineString([b, x, c, d])
edges = GeoDataFrame(data={"geometry": [ac, bd]})
return snkit.Network(edges=edges, nodes=nodes)


@fixture
def split_overlapping_lines():
"""Split of overlapping lines for a section
c--d
||
b--x
|
a
"""
a = Point((1, 0))
b = Point((0, 1))
c = Point((1, 2))
d = Point((2, 2))
x = Point((1, 1))
nodes = GeoDataFrame(data={"geometry": [a, b, c, d, x]})
ax = LineString([a, x])
bx = LineString([b, x])
xc = LineString([x, c])
cd = LineString([c, d])
# note that there are two edges 'xc'
edges = GeoDataFrame(data={"geometry": [ax, xc, bx, xc, cd]})
return snkit.Network(edges=edges, nodes=nodes)


@fixture
def unsplit_heterogeneous_intersection():
"""Crossing point and overlapping line
b--c
| |
e--|--------f
| --d
a

* --d overlaps with e-f
"""
a = Point((1, 0))
b = Point((1, 2))
c = Point((2, 2))
# y is just a construction point
y = Point((2, 1))
d = Point((3, 1))
e = Point((0, 1))
f = Point((4, 1))
nodes = GeoDataFrame(data={"geometry": [a, b, c, d, e, f]})
ad = LineString([a, b, c, y, d])
ef = LineString([e, f])
edges = GeoDataFrame(data={"geometry": [ad, ef]})
return snkit.Network(edges=edges, nodes=nodes)


@fixture
def split_heterogeneous_intersection():
"""Crossing point and overlapping line, all edges split
b--c
| |
e--x--y--d--f
| --
a
"""
a = Point((1, 0))
b = Point((1, 2))
c = Point((2, 2))
d = Point((3, 1))
e = Point((0, 1))
f = Point((4, 1))
x = Point((1, 1))
y = Point((2, 1))
# note: this is order sensitive although it shouldn't matter
nodes = GeoDataFrame(data={"geometry": [a, b, c, d, e, f, y, x]})
ax = LineString([a, x])
xb = LineString([x, b])
bc = LineString([b, c])
cy = LineString([c, y])
yd = LineString([y, d])
ex = LineString([e, x])
xy = LineString([x, y])
df = LineString([d, f])
# note that there are two edges 'yd'
edges = GeoDataFrame(data={"geometry": [ax, xb, bc, cy, yd, ex, xy, yd, df]})
return snkit.Network(edges=edges, nodes=nodes)


@fixture
def gap():
"""T-junction with nodes, edges not quite intersecting:
Expand Down Expand Up @@ -279,6 +436,55 @@ def test_split_at_intersections(unsplit_intersection, split_intersection):
assert_frame_equal(split_intersection.nodes, actual.nodes)


def test_split_at_intersection_already_split(split_intersection):
"""Shouldn't do anything"""
actual = snkit.network.split_edges_at_intersections(split_intersection)
assert_frame_equal(split_intersection.edges, actual.edges)
assert_frame_equal(split_intersection.nodes, actual.nodes)


def test_split_at_intersection_endnode(unsplit, split):
"""Should split the edge at the endnode intersection

There shouldn't be any duplicate (no additional node).
"""
actual = snkit.network.split_edges_at_intersections(unsplit)
assert_frame_equal(split.edges, actual.edges)
assert_frame_equal(split.nodes, actual.nodes)


def test_split_at_intersection_multiple(
unsplit_multiple_intersections, split_multiple_intersections
):
"""Should split the edges at each intersection"""
actual = snkit.network.split_edges_at_intersections(unsplit_multiple_intersections)
assert_frame_equal(split_multiple_intersections.edges, actual.edges)
assert_frame_equal(split_multiple_intersections.nodes, actual.nodes)


def test_split_intersection_overlapping_edges(
unsplit_overlapping_lines, split_overlapping_lines
):
"""Should split at the start and end of the intersecting sector

The intersecting sector should be duplicated.
"""
actual = snkit.network.split_edges_at_intersections(unsplit_overlapping_lines)
assert_frame_equal(split_overlapping_lines.edges, actual.edges)
assert_frame_equal(split_overlapping_lines.nodes, actual.nodes)


def test_split_intersection_heterogeneous(
unsplit_heterogeneous_intersection, split_heterogeneous_intersection
):
"""Should split at intersection points and sectors (endpoints)"""
actual = snkit.network.split_edges_at_intersections(
unsplit_heterogeneous_intersection
)
assert_frame_equal(split_heterogeneous_intersection.edges, actual.edges)
assert_frame_equal(split_heterogeneous_intersection.nodes, actual.nodes)


def test_split_line():
"""Definitively split line at a point"""
line = LineString(
Expand Down