Skip to content

Commit

Permalink
ccc: remove get_chunks function and simplify parallelization
Browse files Browse the repository at this point in the history
  • Loading branch information
miltondp committed Sep 6, 2023
1 parent 61ebefd commit 4555e9d
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 351 deletions.
201 changes: 75 additions & 126 deletions libs/ccc/coef/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,48 +260,6 @@ def get_coords_from_index(n_obj: int, idx: int) -> tuple[int]:
return int(x), int(y)


def get_chunks(
iterable: Union[int, Iterable], n_threads: int, ratio: float = 1
) -> Iterable[Iterable[int]]:
"""
It splits elements in an iterable in chunks according to the number of
CPU cores available for parallel processing.
Args:
iterable: an iterable to be split in chunks. If it is an integer, it
will split the iterable given by np.arange(iterable).
n_threads: number of threads available for parallelization.
ratio: a ratio that allows to increase the number of splits given
n_threads. For example, with ratio=1, the function will just split
the iterable in n_threads chunks. If ratio is larger than 1, then
it will split in n_threads * ratio chunks.
Results:
Another iterable with chunks according to the arguments given. For
example, if iterable is [0, 1, 2, 3, 4, 5] and n_threads is 2, it will
return [[0, 1, 2], [3, 4, 5]].
"""
if isinstance(iterable, int):
iterable = np.arange(iterable)

n = len(iterable)
expected_n_chunks = n_threads * ratio

res = list(chunker(iterable, int(np.ceil(n / expected_n_chunks))))

while len(res) < expected_n_chunks <= n:
# look for an element in res that can be split in two
idx = 0
while len(res[idx]) == 1:
idx = idx + 1

new_chunk = get_chunks(res[idx], 2)
res[idx] = new_chunk[0]
res.insert(idx + 1, new_chunk[1])

return res


def get_feature_type_and_encode(feature_data: NDArray) -> tuple[NDArray, bool]:
"""
Given the data of one feature as a 1d numpy array (it could also be a pandas.Series),
Expand Down Expand Up @@ -571,12 +529,10 @@ def ccc(

with ThreadPoolExecutor(max_workers=default_n_threads) as executor:
# pre-compute the internal partitions for each object in parallel
inputs = get_chunks(n_features, default_n_threads, n_chunks_threads_ratio)
inputs = np.arange(n_features)

def compute_parts(idxs):
return np.array(
[get_parts(X[i], range_n_clusters, X_numerical_type[i]) for i in idxs]
)
def compute_parts(idx):
return np.array(get_parts(X[idx], range_n_clusters, X_numerical_type[idx]))

for idx, ps in zip(inputs, executor.map(compute_parts, inputs)):
parts[idx] = ps
Expand All @@ -601,8 +557,24 @@ def cdist_func(x, y):
else:
cdist_func = cdist_parts_basic

compute_ccc_func = None
compute_ccc_perms_func = None
if not use_ari_numba:

def compute_ccc_func(p0, p1):
return compute_ccc(p0, p1, cdist_func)

def compute_ccc_perms_func(p0, p1, cdist_func):
return compute_ccc_perms(p0, p1, cdist_func)

else:
compute_ccc_func = compute_ccc_numba

def compute_ccc_perms_func(p0, p1, *args):
return compute_ccc_perms_numba(p0, p1)

# compute coefficients
def compute_coef(idx_list):
def compute_coef(idx):
"""
Given a list of indexes representing each a pair of
objects/rows/genes, it computes the CCC coefficient for
Expand All @@ -618,89 +590,66 @@ def compute_coef(idx_list):
arrays returned by the main cm function (cm_values and
max_parts) but for a subset of the data.
"""
n_idxs = len(idx_list)
max_ari_list = np.full(n_idxs, np.nan, dtype=float)
max_part_idx_list = np.zeros((n_idxs, 2), dtype=np.uint64)
pvalues = np.full(n_idxs, np.nan, dtype=float)

for idx, data_idx in enumerate(idx_list):
i, j = get_coords_from_index(n_features, data_idx)

# get partitions for the pair of objects
obji_parts, objj_parts = parts[i], parts[j]

# compute ari only if partitions are not marked as "missing"
# (negative values), which is assigned when partitions have
# one cluster (usually when all data in the feature has the same
# value).
if obji_parts[0, 0] == -2 or objj_parts[0, 0] == -2:
continue

# compare all partitions of one object to the all the partitions
# of the other object, and get the maximium ARI
if not use_ari_numba:
max_ari_list[idx], max_part_idx_list[idx] = compute_ccc(
obji_parts, objj_parts, cdist_func
)
else:
max_ari_list[idx], max_part_idx_list[idx] = compute_ccc_numba(
obji_parts, objj_parts
)

# compute p-value if requested
if pvalue_n_perms is not None and pvalue_n_perms > 0:
with ThreadPoolExecutor(
max_workers=pvalue_n_jobs
) as executor_perms:
# select the variable that generated more partitions as the one
# to permute
obj_parts_sel_i = obji_parts
obj_parts_sel_j = objj_parts
if (obji_parts[:, 0] >= 0).sum() > (
objj_parts[:, 0] >= 0
).sum():
obj_parts_sel_i = objj_parts
obj_parts_sel_j = obji_parts

# do not use cdist_parts_parallel here unless pvalue_n_jobs is 1
# otherwise, there is no time gain
cdist_here = cdist_parts_basic
if pvalue_n_jobs == 1:
cdist_here = cdist_func

if not use_ari_numba:

def compute_permutations(_):
return compute_ccc_perms(
obj_parts_sel_i, obj_parts_sel_j, cdist_here
)

else:

def compute_permutations(_):
return compute_ccc_perms_numba(
obj_parts_sel_i, obj_parts_sel_j
)

p_ccc_values = np.full(pvalue_n_perms, np.nan, dtype=float)
for p_idx, p_ccc_val in zip(
max_ari = np.nan
max_part_idx = np.zeros((1, 2), dtype=np.uint64)
pvalue = np.nan

# for idx, data_idx in enumerate(idx_list):
i, j = get_coords_from_index(n_features, idx)

# get partitions for the pair of objects
obji_parts, objj_parts = parts[i], parts[j]

# compute ari only if partitions are not marked as "missing"
# (negative values), which is assigned when partitions have
# one cluster (usually when all data in the feature has the same
# value).
if obji_parts[0, 0] == -2 or objj_parts[0, 0] == -2:
return max_ari, max_part_idx, pvalue

# compare all partitions of one object to the all the partitions
# of the other object, and get the maximium ARI
max_ari, max_part_idx = compute_ccc_func(obji_parts, objj_parts)

# compute p-value if requested
if pvalue_n_perms is not None and pvalue_n_perms > 0:
with ThreadPoolExecutor(max_workers=pvalue_n_jobs) as executor_perms:
# select the variable that generated more partitions as the one
# to permute
obj_parts_sel_i = obji_parts
obj_parts_sel_j = objj_parts
if (obji_parts[:, 0] >= 0).sum() > (objj_parts[:, 0] >= 0).sum():
obj_parts_sel_i = objj_parts
obj_parts_sel_j = obji_parts

# do not use cdist_parts_parallel here unless pvalue_n_jobs is 1
# otherwise, there is no time gain
cdist_here = cdist_parts_basic
if pvalue_n_jobs == 1:
cdist_here = cdist_func

def compute_permutations(_):
return compute_ccc_perms_func(
obj_parts_sel_i, obj_parts_sel_j, cdist_here
)

p_ccc_values = np.full(pvalue_n_perms, np.nan, dtype=float)
for p_idx, p_ccc_val in zip(
np.arange(pvalue_n_perms),
executor_perms.map(
compute_permutations,
np.arange(pvalue_n_perms),
executor_perms.map(
compute_permutations,
np.arange(pvalue_n_perms),
),
):
p_ccc_values[p_idx] = p_ccc_val
),
):
p_ccc_values[p_idx] = p_ccc_val

# compute p-value
pvalues[idx] = (np.sum(p_ccc_values >= max_ari_list[idx]) + 1) / (
pvalue_n_perms + 1
)
# compute p-value
pvalue = (np.sum(p_ccc_values >= max_ari) + 1) / (pvalue_n_perms + 1)

return max_ari_list, max_part_idx_list, pvalues
return max_ari, max_part_idx, pvalue

# iterate over all chunks of object pairs and compute the coefficient
inputs = get_chunks(n_features_comp, default_n_threads, n_chunks_threads_ratio)
inputs = np.arange(n_features_comp)

for idx, (max_ari_list, max_part_idx_list, pvalues) in zip(
inputs, map_func(compute_coef, inputs)
Expand Down
Loading

0 comments on commit 4555e9d

Please sign in to comment.