diff --git a/libs/ccc/coef/impl.py b/libs/ccc/coef/impl.py index 936e1f89..fafa846d 100644 --- a/libs/ccc/coef/impl.py +++ b/libs/ccc/coef/impl.py @@ -260,48 +260,6 @@ def get_coords_from_index(n_obj: int, idx: int) -> tuple[int]: return int(x), int(y) -def get_chunks( - iterable: Union[int, Iterable], n_threads: int, ratio: float = 1 -) -> Iterable[Iterable[int]]: - """ - It splits elements in an iterable in chunks according to the number of - CPU cores available for parallel processing. - - Args: - iterable: an iterable to be split in chunks. If it is an integer, it - will split the iterable given by np.arange(iterable). - n_threads: number of threads available for parallelization. - ratio: a ratio that allows to increase the number of splits given - n_threads. For example, with ratio=1, the function will just split - the iterable in n_threads chunks. If ratio is larger than 1, then - it will split in n_threads * ratio chunks. - - Results: - Another iterable with chunks according to the arguments given. For - example, if iterable is [0, 1, 2, 3, 4, 5] and n_threads is 2, it will - return [[0, 1, 2], [3, 4, 5]]. - """ - if isinstance(iterable, int): - iterable = np.arange(iterable) - - n = len(iterable) - expected_n_chunks = n_threads * ratio - - res = list(chunker(iterable, int(np.ceil(n / expected_n_chunks)))) - - while len(res) < expected_n_chunks <= n: - # look for an element in res that can be split in two - idx = 0 - while len(res[idx]) == 1: - idx = idx + 1 - - new_chunk = get_chunks(res[idx], 2) - res[idx] = new_chunk[0] - res.insert(idx + 1, new_chunk[1]) - - return res - - def get_feature_type_and_encode(feature_data: NDArray) -> tuple[NDArray, bool]: """ Given the data of one feature as a 1d numpy array (it could also be a pandas.Series), @@ -571,12 +529,10 @@ def ccc( with ThreadPoolExecutor(max_workers=default_n_threads) as executor: # pre-compute the internal partitions for each object in parallel - inputs = get_chunks(n_features, default_n_threads, n_chunks_threads_ratio) + inputs = np.arange(n_features) - def compute_parts(idxs): - return np.array( - [get_parts(X[i], range_n_clusters, X_numerical_type[i]) for i in idxs] - ) + def compute_parts(idx): + return np.array(get_parts(X[idx], range_n_clusters, X_numerical_type[idx])) for idx, ps in zip(inputs, executor.map(compute_parts, inputs)): parts[idx] = ps @@ -601,8 +557,24 @@ def cdist_func(x, y): else: cdist_func = cdist_parts_basic + compute_ccc_func = None + compute_ccc_perms_func = None + if not use_ari_numba: + + def compute_ccc_func(p0, p1): + return compute_ccc(p0, p1, cdist_func) + + def compute_ccc_perms_func(p0, p1, cdist_func): + return compute_ccc_perms(p0, p1, cdist_func) + + else: + compute_ccc_func = compute_ccc_numba + + def compute_ccc_perms_func(p0, p1, *args): + return compute_ccc_perms_numba(p0, p1) + # compute coefficients - def compute_coef(idx_list): + def compute_coef(idx): """ Given a list of indexes representing each a pair of objects/rows/genes, it computes the CCC coefficient for @@ -618,89 +590,66 @@ def compute_coef(idx_list): arrays returned by the main cm function (cm_values and max_parts) but for a subset of the data. """ - n_idxs = len(idx_list) - max_ari_list = np.full(n_idxs, np.nan, dtype=float) - max_part_idx_list = np.zeros((n_idxs, 2), dtype=np.uint64) - pvalues = np.full(n_idxs, np.nan, dtype=float) - - for idx, data_idx in enumerate(idx_list): - i, j = get_coords_from_index(n_features, data_idx) - - # get partitions for the pair of objects - obji_parts, objj_parts = parts[i], parts[j] - - # compute ari only if partitions are not marked as "missing" - # (negative values), which is assigned when partitions have - # one cluster (usually when all data in the feature has the same - # value). - if obji_parts[0, 0] == -2 or objj_parts[0, 0] == -2: - continue - - # compare all partitions of one object to the all the partitions - # of the other object, and get the maximium ARI - if not use_ari_numba: - max_ari_list[idx], max_part_idx_list[idx] = compute_ccc( - obji_parts, objj_parts, cdist_func - ) - else: - max_ari_list[idx], max_part_idx_list[idx] = compute_ccc_numba( - obji_parts, objj_parts - ) - - # compute p-value if requested - if pvalue_n_perms is not None and pvalue_n_perms > 0: - with ThreadPoolExecutor( - max_workers=pvalue_n_jobs - ) as executor_perms: - # select the variable that generated more partitions as the one - # to permute - obj_parts_sel_i = obji_parts - obj_parts_sel_j = objj_parts - if (obji_parts[:, 0] >= 0).sum() > ( - objj_parts[:, 0] >= 0 - ).sum(): - obj_parts_sel_i = objj_parts - obj_parts_sel_j = obji_parts - - # do not use cdist_parts_parallel here unless pvalue_n_jobs is 1 - # otherwise, there is no time gain - cdist_here = cdist_parts_basic - if pvalue_n_jobs == 1: - cdist_here = cdist_func - - if not use_ari_numba: - - def compute_permutations(_): - return compute_ccc_perms( - obj_parts_sel_i, obj_parts_sel_j, cdist_here - ) - - else: - - def compute_permutations(_): - return compute_ccc_perms_numba( - obj_parts_sel_i, obj_parts_sel_j - ) - - p_ccc_values = np.full(pvalue_n_perms, np.nan, dtype=float) - for p_idx, p_ccc_val in zip( + max_ari = np.nan + max_part_idx = np.zeros((1, 2), dtype=np.uint64) + pvalue = np.nan + + # for idx, data_idx in enumerate(idx_list): + i, j = get_coords_from_index(n_features, idx) + + # get partitions for the pair of objects + obji_parts, objj_parts = parts[i], parts[j] + + # compute ari only if partitions are not marked as "missing" + # (negative values), which is assigned when partitions have + # one cluster (usually when all data in the feature has the same + # value). + if obji_parts[0, 0] == -2 or objj_parts[0, 0] == -2: + return max_ari, max_part_idx, pvalue + + # compare all partitions of one object to the all the partitions + # of the other object, and get the maximium ARI + max_ari, max_part_idx = compute_ccc_func(obji_parts, objj_parts) + + # compute p-value if requested + if pvalue_n_perms is not None and pvalue_n_perms > 0: + with ThreadPoolExecutor(max_workers=pvalue_n_jobs) as executor_perms: + # select the variable that generated more partitions as the one + # to permute + obj_parts_sel_i = obji_parts + obj_parts_sel_j = objj_parts + if (obji_parts[:, 0] >= 0).sum() > (objj_parts[:, 0] >= 0).sum(): + obj_parts_sel_i = objj_parts + obj_parts_sel_j = obji_parts + + # do not use cdist_parts_parallel here unless pvalue_n_jobs is 1 + # otherwise, there is no time gain + cdist_here = cdist_parts_basic + if pvalue_n_jobs == 1: + cdist_here = cdist_func + + def compute_permutations(_): + return compute_ccc_perms_func( + obj_parts_sel_i, obj_parts_sel_j, cdist_here + ) + + p_ccc_values = np.full(pvalue_n_perms, np.nan, dtype=float) + for p_idx, p_ccc_val in zip( + np.arange(pvalue_n_perms), + executor_perms.map( + compute_permutations, np.arange(pvalue_n_perms), - executor_perms.map( - compute_permutations, - np.arange(pvalue_n_perms), - ), - ): - p_ccc_values[p_idx] = p_ccc_val + ), + ): + p_ccc_values[p_idx] = p_ccc_val - # compute p-value - pvalues[idx] = (np.sum(p_ccc_values >= max_ari_list[idx]) + 1) / ( - pvalue_n_perms + 1 - ) + # compute p-value + pvalue = (np.sum(p_ccc_values >= max_ari) + 1) / (pvalue_n_perms + 1) - return max_ari_list, max_part_idx_list, pvalues + return max_ari, max_part_idx, pvalue # iterate over all chunks of object pairs and compute the coefficient - inputs = get_chunks(n_features_comp, default_n_threads, n_chunks_threads_ratio) + inputs = np.arange(n_features_comp) for idx, (max_ari_list, max_part_idx_list, pvalues) in zip( inputs, map_func(compute_coef, inputs) diff --git a/tests/test_coef.py b/tests/test_coef.py index 60e5de47..745c3bb8 100644 --- a/tests/test_coef.py +++ b/tests/test_coef.py @@ -16,7 +16,6 @@ get_coords_from_index, cdist_parts_basic, cdist_parts_parallel, - get_chunks, ) @@ -960,230 +959,6 @@ def test_cm_return_parts_with_matrix_as_input(): np.testing.assert_array_equal(max_parts, np.array([0, 0])) -def test_get_chunks_n_large(): - # n = 100 (even) - n_comp = 100 - n_threads = 2 - - cs = get_chunks(n_comp, n_threads) - - assert len(cs) == 2 - assert set(map(len, cs)) == {50} - assert {x for i in cs for x in i} == set(np.arange(n_comp)) - - # n = 100 (even) - n_comp = 100 - n_threads = 5 - - cs = get_chunks(n_comp, n_threads) - - assert len(cs) == 5 - assert set(map(len, cs)) == {20} - assert {x for i in cs for x in i} == set(np.arange(n_comp)) - - # n = 100 (even) not equal groups - n_comp = 100 - n_threads = 8 - - cs = get_chunks(n_comp, n_threads) - - assert len(cs) == 8 - assert set(map(len, cs)) == {9, 13} - assert {x for i in cs for x in i} == set(np.arange(n_comp)) - - # n = 101 (odd) - n_comp = 101 - n_threads = 2 - - cs = get_chunks(n_comp, n_threads) - - assert len(cs) == 2 - assert set(map(len, cs)) == {50, 51} - assert {x for i in cs for x in i} == set(np.arange(n_comp)) - - -def test_get_chunks_n_small(): - # n = 10 - n_comp = 10 - n_threads = 2 - - cs = get_chunks(n_comp, n_threads) - - # for this cases, an automatic ratio will avoid singleton chunks, and - # instead will generate n_threads groups - assert len(cs) == 2 - assert set(map(len, cs)) == {5} - assert {x for i in cs for x in i} == set(np.arange(n_comp)) - - # n = 10 - n_comp = 10 - n_threads = 5 - - cs = get_chunks(n_comp, n_threads) - - assert len(cs) == 5 - assert set(map(len, cs)) == {2} - assert {x for i in cs for x in i} == set(np.arange(n_comp)) - - # n = 11 - n_comp = 11 - n_threads = 2 - - cs = get_chunks(n_comp, n_threads) - - # for this case, it will avoid singleton chunks - assert len(cs) == 2 - assert set(map(len, cs)) == {5, 6} - assert {x for i in cs for x in i} == set(np.arange(n_comp)) - - # n = 10 and n_threads is odd - n_comp = 10 - n_threads = 3 - - cs = get_chunks(n_comp, n_threads) - - assert len(cs) == 3 - assert set(map(len, cs)) == {4, 2} - assert {x for i in cs for x in i} == set(np.arange(n_comp)) - - -def test_get_chunks_n_small_n_threads_large(): - # in all these cases, n_thread is larger than n_comp / 2, but always - # smaller than n_comp. The problem in this test is that we want to make sure - # that we are using all threads - - # n_threads = 6 - n_comp = 10 - n_threads = 6 - cs = get_chunks(n_comp, n_threads) - assert len(cs) == 6 - assert set(map(len, cs)) == {1, 2} - assert {x for i in cs for x in i} == set(np.arange(n_comp)) - - # n_threads = 7 - n_comp = 10 - n_threads = 7 - cs = get_chunks(n_comp, n_threads) - assert len(cs) == 7 - assert set(map(len, cs)) == {1, 2} - assert {x for i in cs for x in i} == set(np.arange(n_comp)) - - # n_threads = 8 - n_comp = 10 - n_threads = 8 - cs = get_chunks(n_comp, n_threads) - assert len(cs) == 8 - assert set(map(len, cs)) == {1, 2} - assert {x for i in cs for x in i} == set(np.arange(n_comp)) - - # n_threads = 9 - n_comp = 10 - n_threads = 9 - cs = get_chunks(n_comp, n_threads) - assert len(cs) == 9 - assert set(map(len, cs)) == {1, 2} - assert {x for i in cs for x in i} == set(np.arange(n_comp)) - - -def test_get_chunks_n_small_n_threads_very_large(): - # in all these cases, n_thread is at least as large as n_comp. - # that we are using all threads - # n_threads = 10 - n_comp = 10 - n_threads = 10 - cs = get_chunks(n_comp, n_threads) - assert len(cs) == 10 - assert set(map(len, cs)) == {1} - assert {x for i in cs for x in i} == set(np.arange(n_comp)) - - # n_threads = 11 - n_comp = 10 - n_threads = 11 - cs = get_chunks(n_comp, n_threads) - assert len(cs) == 10 - assert set(map(len, cs)) == {1} - assert {x for i in cs for x in i} == set(np.arange(n_comp)) - - -def test_get_chunks_ratio_is_one(): - n_comp = 10 - n_threads = 2 - ratio = 1 - - cs = get_chunks(n_comp, n_threads, ratio) - - assert len(cs) == 2 - assert set(map(len, cs)) == {5} - assert {x for i in cs for x in i} == set(np.arange(n_comp)) - - -def test_get_chunks_ratio_is_two(): - n_comp = 10 - n_threads = 2 - ratio = 2 - - cs = get_chunks(n_comp, n_threads, ratio) - - assert len(cs) == 4 - assert set(map(len, cs)) == {1, 3} - assert {x for i in cs for x in i} == set(np.arange(n_comp)) - - -def test_get_chunks_ratio_is_large(): - # ratio = 4 - n_comp = 10 - n_threads = 2 - ratio = 4 - - cs = get_chunks(n_comp, n_threads, ratio) - - assert len(cs) == 8 - assert set(map(len, cs)) == {1, 2} - assert {x for i in cs for x in i} == set(np.arange(n_comp)) - - # ratio = 5 - n_comp = 10 - n_threads = 2 - ratio = 5 - - cs = get_chunks(n_comp, n_threads, ratio) - - assert len(cs) == 10 - assert set(map(len, cs)) == {1} - assert {x for i in cs for x in i} == set(np.arange(n_comp)) - - # ratio = 6 - n_comp = 10 - n_threads = 2 - ratio = 6 - - cs = get_chunks(n_comp, n_threads, ratio) - - assert len(cs) == 10 - assert set(map(len, cs)) == {1} - assert {x for i in cs for x in i} == set(np.arange(n_comp)) - - -def test_get_chunks_iterable(): - # here the first argument is an iterable, not an integer - - # n_threads = 2 - iterable = np.arange(10) - n_threads = 2 - cs = get_chunks(iterable, n_threads) - assert len(cs) == 2 - assert set(map(len, cs)) == {5} - assert {x for i in cs for x in i} == set(iterable) - - # n_threads = 10 - iterable = np.arange(10) - n_threads = 10 - cs = get_chunks(iterable, n_threads) - assert len(cs) == 10 - assert set(map(len, cs)) == {1} - assert {x for i in cs for x in i} == set(iterable) - - def test_cm_data_is_binary_evenly_distributed(): # Prepare np.random.seed(0)