Skip to content

Commit

Permalink
ccc: add parallelization of permutations (n_jobs_permutations parameter)
Browse files Browse the repository at this point in the history
  • Loading branch information
miltondp committed Sep 5, 2023
1 parent d2f0159 commit 2ded154
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 31 deletions.
88 changes: 57 additions & 31 deletions libs/ccc/coef/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ def ccc(
pvalue_n_permutations: int = None,
random_state: int = None,
n_jobs: int = 1,
n_jobs_permutations: int = 1,
) -> tuple[NDArray[float], NDArray[float], NDArray[np.uint64], NDArray[np.int16]]:
"""
This is the main function that computes the Clustermatch Correlation
Expand Down Expand Up @@ -340,6 +341,9 @@ def ccc(
n_jobs: number of CPU cores to use for parallelization. The value
None will use all available cores (`os.cpu_count()`), and negative
values will use `os.cpu_count() - n_jobs`. Default is 1.
n_jobs_permutations: number of CPU cores to use for parallelization when
computing the p-value of the coefficient using permutations.
Returns:
If returns_parts is True, then it returns a tuple with three values:
Expand Down Expand Up @@ -542,38 +546,60 @@ def compute_coef(idx_list):

# compute p-value if requested
if pvalue_n_permutations is not None and pvalue_n_permutations > 0:
# compute CCC on permuted data
p_ccc_values = np.full(pvalue_n_permutations, np.nan, dtype=float)

# select the variable that generated more partitions as the one
# to permute
obj_parts_sel_i = obji_parts
obj_parts_sel_j = objj_parts
if (obji_parts[:, 0] >= 0).sum() > (objj_parts[:, 0] >= 0).sum():
obj_parts_sel_i = objj_parts
obj_parts_sel_j = obji_parts

for idx_perm in range(pvalue_n_permutations):
# generate a random permutation of the partitions of one
# variable/feature
perm_idx = np.random.permutation(n_objects)
obj_parts_sel_j_permuted = np.array(
[
obj_parts_sel_j[i, perm_idx]
for i in range(obj_parts_sel_j.shape[0])
]
)

# compute the CCC using the permuted partitions
p_comp_values = cdist_func(
obj_parts_sel_i,
obj_parts_sel_j_permuted,
)
p_max_flat_idx = p_comp_values.argmax()
p_max_idx = unravel_index_2d(
p_max_flat_idx, p_comp_values.shape
with ThreadPoolExecutor(
max_workers=n_jobs_permutations
) as executor_perms:
# select the variable that generated more partitions as the one
# to permute
obj_parts_sel_i = obji_parts
obj_parts_sel_j = objj_parts
if (obji_parts[:, 0] >= 0).sum() > (
objj_parts[:, 0] >= 0
).sum():
obj_parts_sel_i = objj_parts
obj_parts_sel_j = obji_parts

cdist_here = cdist_parts_basic
if n_jobs_permutations == 1:
cdist_here = cdist_func

def compute_permutations(_):
perm_idx = np.random.permutation(n_objects)

# generate a random permutation of the partitions of one
# variable/feature
obj_parts_sel_j_permuted = np.array(
[
obj_parts_sel_j[it, perm_idx]
for it in range(obj_parts_sel_j.shape[0])
]
)

# compute the CCC using the permuted partitions
p_comp_values = cdist_here(
obj_parts_sel_i,
obj_parts_sel_j_permuted,
)
p_max_flat_idx = p_comp_values.argmax()
p_max_idx = unravel_index_2d(
p_max_flat_idx, p_comp_values.shape
)
return np.max((p_comp_values[p_max_idx], 0.0))

p_ccc_values = np.full(
pvalue_n_permutations, np.nan, dtype=float
)
p_ccc_values[idx_perm] = np.max((p_comp_values[p_max_idx], 0.0))
for p_idx, p_ccc_val in zip(
np.arange(pvalue_n_permutations),
executor_perms.map(
compute_permutations,
np.arange(pvalue_n_permutations),
chunksize=100,
),
):
# for i in range(pvalue_n_permutations):
p_ccc_values[p_idx] = p_ccc_val
# p_ccc_values[i] = compute_permutations()

# compute p-value
pvalues[idx] = (np.sum(p_ccc_values >= max_ari_list[idx]) + 1) / (
Expand Down
21 changes: 21 additions & 0 deletions tests/test_coef_pval.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,27 @@ def test_cm_large_n_objects_pvalue_computation_is_parallelized():
assert elapsed_time_multi_thread < 0.75 * elapsed_time_single_thread


def test_cm_medium_n_objects_with_many_pvalue_computation_is_parallelized():
# Prepare
rs = np.random.RandomState(0)

# two features on 100 objects with a linear relationship
feature0 = rs.rand(1000)
feature1 = rs.rand(1000)

# Run
start_time = time.time()
res = ccc(feature0, feature1, pvalue_n_permutations=1000, n_jobs=1)
elapsed_time_single_thread = time.time() - start_time

start_time = time.time()
res = ccc(feature0, feature1, pvalue_n_permutations=1000, n_jobs_permutations=2)
elapsed_time_multi_thread = time.time() - start_time

# Validate
assert elapsed_time_multi_thread < 0.75 * elapsed_time_single_thread


def test_cm_return_parts_quadratic_pvalue():
# Prepare
# rs = np.random.RandomState(0)
Expand Down

0 comments on commit 2ded154

Please sign in to comment.