Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
b8228c0
feat: first implementation of finding upstream nodes
Oisin-M Oct 18, 2025
633a9e7
feat: first implementation of finding sorted upstream nodes
Oisin-M Oct 18, 2025
285296e
feat: first percentile function
Oisin-M Oct 18, 2025
6b23495
refactor: split percentile code in separate file
Oisin-M Oct 31, 2025
fd4b31a
build: include rust in pre-commit
Oisin-M Oct 31, 2025
e4feadf
style: run pre-commit on rust files
Oisin-M Oct 31, 2025
2214f61
chore: add copyright header
Oisin-M Oct 31, 2025
7dc6195
feat: add upstream.percentile
Oisin-M Oct 31, 2025
5ed9c2f
build: update Cargo
Oisin-M Oct 31, 2025
1f4f078
chore: fix rust import path complaint
Oisin-M Oct 31, 2025
f60c139
chore: assert numpy backend for percentiles
Oisin-M Oct 31, 2025
85f0758
chore: autocast to f64 for percentiles
Oisin-M Oct 31, 2025
228542e
fix: typo
Oisin-M Oct 31, 2025
0d135b3
feat: add ekh.catchments.percentile
Oisin-M Oct 31, 2025
b418c75
fix: ensure p in [0,1]
Oisin-M Oct 31, 2025
de87a0e
Merge branch 'develop' into feat/rust_for_percentiles
Oisin-M Nov 3, 2025
d472823
Merge branch 'develop' into feat/rust_for_percentiles
Oisin-M Nov 17, 2025
366a4bb
Merge branch 'develop' into feat/rust_for_percentiles
Oisin-M Nov 17, 2025
73e00e1
Merge branch 'develop' into feat/rust_for_percentiles
Oisin-M Nov 17, 2025
e15acf7
Merge branch 'develop' into feat/rust_for_percentiles
Oisin-M Dec 4, 2025
8b4fc4c
feat: add weighted percentile
Oisin-M Mar 25, 2026
8fc3378
feat: add rust weighted percentiles
Oisin-M Mar 25, 2026
9b22cfd
docs: add catchments.percentile API docstring
Copilot Apr 17, 2026
4409967
docs: clarify percentile and locations parameters
Copilot Apr 17, 2026
d662693
Update src/earthkit/hydro/upstream/array/_operations.py
Oisin-M Apr 17, 2026
8b5ffdd
Merge branch 'develop' into feat/rust_for_percentiles
Oisin-M Apr 21, 2026
4f67208
tests: add percentile tests for upstream and catchments (array and xa…
Copilot Apr 21, 2026
17f3b83
chore: clean tests
Oisin-M Apr 21, 2026
ac30e03
docs: add docstrings with math equations for percentile functions
Copilot Apr 21, 2026
611ee2a
feat: add input core dims option to upstream percentile toplevel
Oisin-M Apr 21, 2026
f72f0ac
chore: please qa
Oisin-M Apr 21, 2026
c21259d
feat: implement downstream percentile with Rust backend and tests
Copilot Apr 21, 2026
9801bfe
feat: skip tests requiring rust if rust unavailable
Oisin-M Apr 21, 2026
7b46bd5
chore: please qa
Oisin-M Apr 21, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ repos:
rev: 7.0.0
hooks:
- id: flake8
- repo: https://github.com/doublify/pre-commit-rust
rev: v1.0
hooks:
- id: fmt
- id: cargo-check
Comment thread
Oisin-M marked this conversation as resolved.
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.4.0
hooks:
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pyo3 = { version = "0.26", features = ["extension-module"] }
numpy = "0.26"
rayon = "1.7"
fixedbitset = "0.5"
dashmap = "5"

[lib]
# See https://github.com/PyO3/pyo3 for details
Expand Down
36 changes: 25 additions & 11 deletions rust/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,16 @@
// granted to it by virtue of its status as an intergovernmental organisation
// nor does it submit to any jurisdiction.

use pyo3::prelude::*;
use rayon::prelude::*;
use fixedbitset::FixedBitSet;
use numpy::{PyArray1, PyReadonlyArray1};
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use rayon::prelude::*;
use std::sync::atomic::{AtomicI64, Ordering};
use fixedbitset::FixedBitSet;
mod percentile;
mod percentile_downstream;
mod weighted_percentile;
mod weighted_percentile_downstream;

#[pyfunction]
fn compute_topological_labels_rust<'py>(
Expand All @@ -21,10 +25,7 @@ fn compute_topological_labels_rust<'py>(
downstream_nodes: PyReadonlyArray1<'py, usize>,
n_nodes: usize,
) -> PyResult<Py<PyArray1<i64>>> {

let labels: Vec<AtomicI64> = (0..n_nodes)
.map(|_| AtomicI64::new(0))
.collect();
let labels: Vec<AtomicI64> = (0..n_nodes).map(|_| AtomicI64::new(0)).collect();

let mut current = sources.as_slice()?.to_vec();
let sinks = sinks.as_slice()?;
Expand Down Expand Up @@ -67,18 +68,31 @@ fn compute_topological_labels_rust<'py>(
}

if !current.is_empty() {
return Err(PyErr::new::<PyValueError, _>("River Network contains a cycle."));
return Err(PyErr::new::<PyValueError, _>(
"River Network contains a cycle.",
));
}

let result: Vec<i64> = labels.iter()
.map(|a| a.load(Ordering::Relaxed))
.collect();
let result: Vec<i64> = labels.iter().map(|a| a.load(Ordering::Relaxed)).collect();
let array = PyArray1::from_vec(py, result);
Ok(array.to_owned().into())
}

#[pymodule]
fn _rust(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_function(wrap_pyfunction!(compute_topological_labels_rust, m)?)?;
m.add_function(wrap_pyfunction!(percentile::calc_perc, m)?)?;
m.add_function(wrap_pyfunction!(
weighted_percentile::calc_weighted_perc,
m
)?)?;
m.add_function(wrap_pyfunction!(
percentile_downstream::calc_perc_downstream,
m
)?)?;
m.add_function(wrap_pyfunction!(
weighted_percentile_downstream::calc_weighted_perc_downstream,
m
)?)?;
Ok(())
}
156 changes: 156 additions & 0 deletions rust/percentile.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// (C) Copyright 2025- ECMWF.
//
// This software is licensed under the terms of the Apache Licence Version 2.0
// which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
// In applying this licence, ECMWF does not waive the privileges and immunities
// granted to it by virtue of its status as an intergovernmental organisation
// nor does it submit to any jurisdiction.

use dashmap::DashMap;
use numpy::ndarray::ArrayView1;
use numpy::{PyArray1, PyReadonlyArray1, PyReadonlyArray2};
use pyo3::prelude::*;
use rayon::prelude::*;

#[pyfunction]
pub fn calc_perc<'py>(
py: Python<'py>,
topo_groups: Vec<PyReadonlyArray2<'py, i64>>,
field: PyReadonlyArray1<'py, f64>,
p: f64,
) -> PyResult<Py<PyArray1<f64>>> {
let upstream_map: DashMap<i64, Vec<f64>> = DashMap::new();

let field_array: ArrayView1<f64> = field.as_array();

let mut result: Vec<f64> = field_array.to_vec(); //vec![0.0; field_array.len()];
Comment thread
Oisin-M marked this conversation as resolved.

for group in &topo_groups {
process_level_and_cleanup(group, &upstream_map, &field_array, &mut result, p);
}

let array = PyArray1::from_vec(py, result);
Ok(array.to_owned().into())
}

fn process_level_and_cleanup(
topo_group: &PyReadonlyArray2<'_, i64>,
upstream_map: &DashMap<i64, Vec<f64>>,
field: &ArrayView1<f64>,
result: &mut Vec<f64>,
p: f64,
) {
let arr = topo_group.as_array();
let did_vec = arr.row(0);
let uid_vec = arr.row(1);

let did_slice = did_vec
.as_slice()
.expect("Expected contiguous did_vec slice");
let uid_slice = uid_vec
.as_slice()
.expect("Expected contiguous uid_vec slice");

did_slice
.par_iter()
.zip(uid_slice.par_iter())
.for_each(|(&did, &uid)| {
let uid_upstream = {
// Get uid upstream vector by removing it from the map, so you can move it without cloning
// If it doesn't exist, fallback to vec![uid]
upstream_map
.remove(&uid)
.map(|entry| entry.1)
.unwrap_or_else(|| vec![field[uid as usize]])
};

// Insert or extend did's upstream vector
upstream_map
.entry(did)
.and_modify(|did_upstream| {
merge_sorted_f64(did_upstream, &uid_upstream);
})
.or_insert_with(|| {
let mut v = uid_upstream;
let pos = match binary_search_f64(&v, field[did as usize]) {
Ok(pos) | Err(pos) => pos,
};
v.insert(pos, field[did as usize]);
v
});
});

let pct_results: Vec<(i64, f64)> = did_slice
.par_iter()
.map(|&did| {
let values = upstream_map.get(&did).unwrap();
let pct = percentile(values.as_slice(), p);
(did, pct)
})
.collect();
Comment on lines +83 to +90
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pct_results is computed by iterating over did_slice, which can contain repeated downstream node IDs when a node has multiple upstream edges. This recomputes the same percentile multiple times per topo level and can become a noticeable CPU cost. Consider deduplicating did_slice (e.g., unique IDs) before mapping to percentiles.

Copilot uses AI. Check for mistakes.

for (did, pct) in pct_results {
let idx = did as usize;
if idx < result.len() {
result[idx] = pct;
}
}
}

fn percentile(sorted_values: &[f64], percentile: f64) -> f64 {
let n = sorted_values.len();
let rank = percentile * (n as f64 - 1.0);
let lower = rank.floor() as usize;
let upper = rank.ceil() as usize;

if lower == upper {
sorted_values[lower]
} else {
let weight = rank - lower as f64;
sorted_values[lower] * (1.0 - weight) + sorted_values[upper] * weight
}
}

fn binary_search_f64(slice: &[f64], target: f64) -> Result<usize, usize> {
let mut size = slice.len();
if size == 0 {
return Err(0);
}
let mut base = 0usize;

while size > 0 {
let half = size / 2;
let mid = base + half;

match slice[mid].partial_cmp(&target).unwrap() {
std::cmp::Ordering::Less => {
base = mid + 1;
size -= half + 1;
}
std::cmp::Ordering::Equal => return Ok(mid),
std::cmp::Ordering::Greater => size = half,
}
}
Err(base)
}
Comment thread
Oisin-M marked this conversation as resolved.

fn merge_sorted_f64(a: &mut Vec<f64>, b: &[f64]) {
let mut i = 0;
let mut j = 0;
let mut result = Vec::with_capacity(a.len() + b.len());

while i < a.len() && j < b.len() {
if a[i] <= b[j] {
result.push(a[i]);
i += 1;
} else {
result.push(b[j]);
j += 1;
}
}

result.extend_from_slice(&a[i..]);
result.extend_from_slice(&b[j..]);

*a = result;
}
Loading
Loading