Skip to content

Commit

Permalink
configurable degree of parallelism via SNKIT_PROCESSES
Browse files Browse the repository at this point in the history
  • Loading branch information
thomas-fred committed Sep 8, 2023
1 parent 1a92dc6 commit 23d2364
Showing 1 changed file with 23 additions and 7 deletions.
30 changes: 23 additions & 7 deletions src/snkit/network.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Network representation and utilities
"""
import logging
import os
import warnings

Expand Down Expand Up @@ -28,6 +29,9 @@

from collections import Counter

# configure logging with a timestamp and process id prefix
logging.basicConfig(format="%(asctime)s %(process)s %(message)s", level=logging.INFO)

# optional progress bars
if "SNKIT_PROGRESS" in os.environ and os.environ["SNKIT_PROGRESS"] in ("1", "TRUE"):
try:
Expand All @@ -38,11 +42,20 @@
from snkit.utils import tqdm_standin as tqdm

# optional parallel processing
if "SNKIT_PARALLEL" in os.environ and os.environ["SNKIT_PARALLEL"] in ("1", "TRUE"):
PARALLEL = True
if "SNKIT_PROCESSES" in os.environ:
processes_env_var = os.environ["SNKIT_PROCESSES"]
try:
requested_processes = int(processes_env_var)
except TypeError:
raise RuntimeError(
"SNKIT_PROCESSES env var must be a non-negative integer. "
"Use 0 or unset for serial operation."
)
PARALLEL_PROCESS_COUNT = min([os.cpu_count(), requested_processes])
import multiprocessing
logging.info(f"SNKIT_PROCESSES={processes_env_var}, using {PARALLEL_PROCESS_COUNT} processes")
else:
PARALLEL = False
PARALLEL_PROCESS_COUNT = 0


class Network:
Expand Down Expand Up @@ -317,17 +330,20 @@ def _split_edges_at_nodes(


def split_edges_at_nodes(network, tolerance=1e-9):
"""Split network edges where they intersect node geometries"""
"""
Split network edges where they intersect node geometries. Will operate in
parallel if SNKIT_PROCESSES is in the environment and a positive integer.
"""
split_edges = []

n = len(network.edges)
if PARALLEL and (n > 10_000):
chunk_size = int(n / os.cpu_count())
if PARALLEL_PROCESS_COUNT and (n > 1_000):
chunk_size = max([1, int(n / PARALLEL_PROCESS_COUNT)])
args = [
(network.edges.iloc[i : i + chunk_size, :], network.nodes, tolerance)
for i in range(0, n, chunk_size)
]
with multiprocessing.Pool() as pool:
with multiprocessing.Pool(PARALLEL_PROCESS_COUNT) as pool:
results = pool.starmap(_split_edges_at_nodes, args)

# flatten return list
Expand Down

0 comments on commit 23d2364

Please sign in to comment.