From 23d23644844feb48bf781fad646590c8c254a27d Mon Sep 17 00:00:00 2001 From: Fred Thomas Date: Fri, 8 Sep 2023 18:06:13 +0100 Subject: [PATCH 1/4] configurable degree of parallelism via SNKIT_PROCESSES --- src/snkit/network.py | 30 +++++++++++++++++++++++------- 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/src/snkit/network.py b/src/snkit/network.py index cdc6588..0c16119 100644 --- a/src/snkit/network.py +++ b/src/snkit/network.py @@ -1,5 +1,6 @@ """Network representation and utilities """ +import logging import os import warnings @@ -28,6 +29,9 @@ from collections import Counter +# configure logging with a timestamp and process id prefix +logging.basicConfig(format="%(asctime)s %(process)s %(message)s", level=logging.INFO) + # optional progress bars if "SNKIT_PROGRESS" in os.environ and os.environ["SNKIT_PROGRESS"] in ("1", "TRUE"): try: @@ -38,11 +42,20 @@ from snkit.utils import tqdm_standin as tqdm # optional parallel processing -if "SNKIT_PARALLEL" in os.environ and os.environ["SNKIT_PARALLEL"] in ("1", "TRUE"): - PARALLEL = True +if "SNKIT_PROCESSES" in os.environ: + processes_env_var = os.environ["SNKIT_PROCESSES"] + try: + requested_processes = int(processes_env_var) + except TypeError: + raise RuntimeError( + "SNKIT_PROCESSES env var must be a non-negative integer. " + "Use 0 or unset for serial operation." + ) + PARALLEL_PROCESS_COUNT = min([os.cpu_count(), requested_processes]) import multiprocessing + logging.info(f"SNKIT_PROCESSES={processes_env_var}, using {PARALLEL_PROCESS_COUNT} processes") else: - PARALLEL = False + PARALLEL_PROCESS_COUNT = 0 class Network: @@ -317,17 +330,20 @@ def _split_edges_at_nodes( def split_edges_at_nodes(network, tolerance=1e-9): - """Split network edges where they intersect node geometries""" + """ + Split network edges where they intersect node geometries. Will operate in + parallel if SNKIT_PROCESSES is in the environment and a positive integer. + """ split_edges = [] n = len(network.edges) - if PARALLEL and (n > 10_000): - chunk_size = int(n / os.cpu_count()) + if PARALLEL_PROCESS_COUNT and (n > 1_000): + chunk_size = max([1, int(n / PARALLEL_PROCESS_COUNT)]) args = [ (network.edges.iloc[i : i + chunk_size, :], network.nodes, tolerance) for i in range(0, n, chunk_size) ] - with multiprocessing.Pool() as pool: + with multiprocessing.Pool(PARALLEL_PROCESS_COUNT) as pool: results = pool.starmap(_split_edges_at_nodes, args) # flatten return list From 2ecd69b015e17e5750bc6d7076cc3adbd9860391 Mon Sep 17 00:00:00 2001 From: Fred Thomas Date: Wed, 13 Sep 2023 10:47:14 +0100 Subject: [PATCH 2/4] remove logging config --- src/snkit/network.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/snkit/network.py b/src/snkit/network.py index 0c16119..59de770 100644 --- a/src/snkit/network.py +++ b/src/snkit/network.py @@ -29,9 +29,6 @@ from collections import Counter -# configure logging with a timestamp and process id prefix -logging.basicConfig(format="%(asctime)s %(process)s %(message)s", level=logging.INFO) - # optional progress bars if "SNKIT_PROGRESS" in os.environ and os.environ["SNKIT_PROGRESS"] in ("1", "TRUE"): try: From 839d1cf623954f6c31035948f0a7e3bcc00f6cae Mon Sep 17 00:00:00 2001 From: Fred Thomas Date: Wed, 13 Sep 2023 10:47:38 +0100 Subject: [PATCH 3/4] chunk_size as optional arg --- src/snkit/network.py | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/src/snkit/network.py b/src/snkit/network.py index 59de770..958acf1 100644 --- a/src/snkit/network.py +++ b/src/snkit/network.py @@ -326,18 +326,30 @@ def _split_edges_at_nodes( return split_edges -def split_edges_at_nodes(network, tolerance=1e-9): +def split_edges_at_nodes(network: Network, tolerance: float = 1e-9, chunk_size: int | None = None): """ - Split network edges where they intersect node geometries. Will operate in - parallel if SNKIT_PROCESSES is in the environment and a positive integer. + Split network edges where they intersect node geometries. + + N.B. Can operate in parallel if SNKIT_PROCESSES is in the environment and a + positive integer. + + Args: + network: Network object to split edges for. + tolerance: Proximity within which nodes are said to intersect an edge. + chunk_size: When splitting in parallel, set the number of edges per + unit of work. + + Returns: + Network with edges split at nodes (within proximity tolerance). """ split_edges = [] n = len(network.edges) - if PARALLEL_PROCESS_COUNT and (n > 1_000): - chunk_size = max([1, int(n / PARALLEL_PROCESS_COUNT)]) + if PARALLEL_PROCESS_COUNT > 1: + if chunk_size is None: + chunk_size = max([1, int(n / PARALLEL_PROCESS_COUNT)]) args = [ - (network.edges.iloc[i : i + chunk_size, :], network.nodes, tolerance) + (network.edges.iloc[i: i + chunk_size, :], network.nodes, tolerance) for i in range(0, n, chunk_size) ] with multiprocessing.Pool(PARALLEL_PROCESS_COUNT) as pool: From c5b0748d9a1262a62605e1883106994991d80d05 Mon Sep 17 00:00:00 2001 From: Fred Thomas Date: Wed, 13 Sep 2023 10:51:19 +0100 Subject: [PATCH 4/4] try older syntax for type hint --- src/snkit/network.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/snkit/network.py b/src/snkit/network.py index 958acf1..b9cbd5d 100644 --- a/src/snkit/network.py +++ b/src/snkit/network.py @@ -2,6 +2,7 @@ """ import logging import os +from typing import Optional import warnings import geopandas @@ -326,7 +327,7 @@ def _split_edges_at_nodes( return split_edges -def split_edges_at_nodes(network: Network, tolerance: float = 1e-9, chunk_size: int | None = None): +def split_edges_at_nodes(network: Network, tolerance: float = 1e-9, chunk_size: Optional[int] = None): """ Split network edges where they intersect node geometries.