Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

agg func is optional to support base pair level data for each input region #10

Merged
merged 3 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 25 additions & 37 deletions src/genomicarrays/GenomicArrayDataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,8 @@ def __init__(
self._dataset_path = dataset_path
# TODO: Maybe switch to on-demand loading of these objects
self._matrix_tdb_tdb = tiledb.open(f"{dataset_path}/{matrix_tdb_uri}", "r")
self._feature_annotation_tdb = tiledb.open(
f"{dataset_path}/{feature_annotation_uri}", "r"
)
self._sample_metadata_tdb = tiledb.open(
f"{dataset_path}/{sample_metadata_uri}", "r"
)
self._feature_annotation_tdb = tiledb.open(f"{dataset_path}/{feature_annotation_uri}", "r")
self._sample_metadata_tdb = tiledb.open(f"{dataset_path}/{sample_metadata_uri}", "r")

def __del__(self):
self._matrix_tdb_tdb.close()
Expand Down Expand Up @@ -110,9 +106,7 @@ def get_feature_annotation_index(self) -> List[str]:
res = qtd.get_a_column(self._feature_annotation_tdb, "genarr_feature_index")
return res["genarr_feature_index"].tolist()

def get_feature_subset(
self, subset: Union[slice, List[str], tiledb.QueryCondition], columns=None
) -> pd.DataFrame:
def get_feature_subset(self, subset: Union[slice, List[str], tiledb.QueryCondition], columns=None) -> pd.DataFrame:
"""Slice the ``feature_annotation`` store.

Args:
Expand Down Expand Up @@ -147,16 +141,12 @@ def get_feature_subset(
_not_avail.append(col)

if len(_not_avail) > 0:
raise ValueError(
f"Columns '{', '.join(_not_avail)}' are not available."
)
raise ValueError(f"Columns '{', '.join(_not_avail)}' are not available.")

if qtd._is_list_strings(subset):
subset = self._get_indices_for_gene_list(subset)

return qtd.subset_frame(
self._feature_annotation_tdb, subset=subset, columns=columns
)
return qtd.subset_frame(self._feature_annotation_tdb, subset=subset, columns=columns)

####
## Subset methods for the `sample_metadata` TileDB file.
Expand All @@ -183,9 +173,7 @@ def get_sample_metadata_column(self, column_name: str) -> pd.DataFrame:
res = qtd.get_a_column(self._sample_metadata_tdb, column_name=column_name)
return res[column_name]

def get_sample_subset(
self, subset: Union[slice, tiledb.QueryCondition], columns=None
) -> pd.DataFrame:
def get_sample_subset(self, subset: Union[slice, tiledb.QueryCondition], columns=None) -> pd.DataFrame:
"""Slice the ``sample_metadata`` store.

Args:
Expand Down Expand Up @@ -216,13 +204,9 @@ def get_sample_subset(
_not_avail.append(col)

if len(_not_avail) > 0:
raise ValueError(
f"Columns '{', '.join(_not_avail)}' are not available."
)
raise ValueError(f"Columns '{', '.join(_not_avail)}' are not available.")

return qtd.subset_frame(
self._sample_metadata_tdb, subset=subset, columns=columns
)
return qtd.subset_frame(self._sample_metadata_tdb, subset=subset, columns=columns)

####
## Subset methods for the `matrix` TileDB file.
Expand Down Expand Up @@ -266,16 +250,14 @@ def get_matrix_subset(self, subset: Union[int, Sequence, tuple]) -> pd.DataFrame
shape=(len(subset[0]), len(subset[1])),
)
else:
raise ValueError(
f"`{type(self).__name__}` only supports 2-dimensional slicing."
)
raise ValueError(f"`{type(self).__name__}` only supports 2-dimensional slicing.")

####
## Subset methods by cell and gene dimensions.
####
def get_slice(
self,
feature_subset: Union[slice, tiledb.QueryCondition],
feature_subset: Union[slice, int],
sample_subset: Union[slice, List[str], tiledb.QueryCondition],
) -> GenomicArrayDatasetSlice:
"""Subset a ``GenomicArrayDataset``.
Expand All @@ -297,14 +279,24 @@ def get_slice(
_ssubset = self.get_sample_subset(sample_subset)
_sample_indices = _ssubset.index.tolist()

if not isinstance(feature_subset, (int, slice)):
raise TypeError("feature indices must be continous; either a 'slice' or 'int' index.")
_fsubset = self.get_feature_subset(feature_subset)
_feature_indices = _fsubset.index.tolist()
start_findex = _fsubset["genarr_feature_start_index"].astype(int).min()
end_findex = _fsubset["genarr_feature_end_index"].astype(int).max()

# expand intervals
final_rows = []
for row in _fsubset.itertuples():
for i, _ in enumerate(range(int(row.genarr_feature_start_index), int(row.genarr_feature_end_index))):
final_rows.append(row._replace(starts=i + row.starts, ends=i + row.starts + 1))
_feature_df = pd.DataFrame(final_rows)

_msubset = self.get_matrix_subset((_feature_indices, _sample_indices))
_msubset = self.get_matrix_subset((list(range(start_findex, end_findex)), _sample_indices))

return GenomicArrayDatasetSlice(
_ssubset,
_fsubset,
_feature_df,
_msubset,
)

Expand Down Expand Up @@ -351,13 +343,9 @@ def __getitem__(
elif len(args) == 2:
return self.get_slice(args[0], args[1])
else:
raise ValueError(
f"`{type(self).__name__}` only supports 2-dimensional slicing."
)
raise ValueError(f"`{type(self).__name__}` only supports 2-dimensional slicing.")

raise TypeError(
"args must be a sequence or a scalar integer or string or a tuple of atmost 2 values."
)
raise TypeError("args must be a sequence or a scalar integer or string or a tuple of atmost 2 values.")

####
## Misc methods.
Expand Down
68 changes: 56 additions & 12 deletions src/genomicarrays/build_genomicarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,29 @@ def build_genomicarray(
missing = required_cols - set(input_intervals.columns)
raise ValueError(f"Missing required columns: {missing}")
else:
raise TypeError("'input_intervals' is not an expected type (either 'str' or 'Dataframe').")
raise TypeError(
"'input_intervals' is not an expected type (either 'str' or 'Dataframe')."
)

# append start index for each interval
input_intervals["widths"] = input_intervals["ends"] - input_intervals["starts"]
total_length = None
if feature_annotation_options.aggregate_function is None:
total_length = int(input_intervals["widths"].sum())
counter = input_intervals["widths"].shift(1)
counter[0] = 0
input_intervals["genarr_feature_start_index"] = counter.cumsum().astype(int)
else:
counter = [1] * len(input_intervals)
total_length = len(input_intervals)
counter[0] = 0
input_intervals["genarr_feature_start_index"] = (
pd.Series(counter).cumsum().astype(int)
)

ends = input_intervals["genarr_feature_start_index"].shift(-1)
ends.iloc[-1] = total_length
input_intervals["genarr_feature_end_index"] = ends.astype(int)

if not feature_annotation_options.skip:

Expand All @@ -187,13 +209,19 @@ def build_genomicarray(

input_intervals["sequences"] = sequences

_col_types = utf.infer_column_types(input_intervals, feature_annotation_options.column_types)
_col_types = utf.infer_column_types(
input_intervals, feature_annotation_options.column_types
)

if "genarr_feature_index" not in input_intervals.columns:
input_intervals["genarr_feature_index"] = range(0, len(input_intervals))

_feature_output_uri = f"{output_path}/{feature_annotation_options.tiledb_store_name}"
utf.create_tiledb_frame_from_dataframe(_feature_output_uri, input_intervals, column_types=_col_types)
_feature_output_uri = (
f"{output_path}/{feature_annotation_options.tiledb_store_name}"
)
utf.create_tiledb_frame_from_dataframe(
_feature_output_uri, input_intervals, column_types=_col_types
)

if optimize_tiledb:
uta.optimize_tiledb_array(_feature_output_uri)
Expand Down Expand Up @@ -223,10 +251,16 @@ def build_genomicarray(
raise TypeError("'sample_metadata' is not an expected type.")

if not sample_metadata_options.skip:
_col_types = utf.infer_column_types(sample_metadata, sample_metadata_options.column_types)
_col_types = utf.infer_column_types(
sample_metadata, sample_metadata_options.column_types
)

_sample_output_uri = f"{output_path}/{sample_metadata_options.tiledb_store_name}"
utf.create_tiledb_frame_from_dataframe(_sample_output_uri, sample_metadata, column_types=_col_types)
_sample_output_uri = (
f"{output_path}/{sample_metadata_options.tiledb_store_name}"
)
utf.create_tiledb_frame_from_dataframe(
_sample_output_uri, sample_metadata, column_types=_col_types
)

if optimize_tiledb:
uta.optimize_tiledb_array(_sample_output_uri)
Expand All @@ -242,7 +276,7 @@ def build_genomicarray(
x_dim_dtype=feature_annotation_options.dtype,
y_dim_dtype=sample_metadata_options.dtype,
matrix_dim_dtype=matrix_options.dtype,
x_dim_length=len(input_intervals),
x_dim_length=total_length,
y_dim_length=len(files),
is_sparse=False,
)
Expand All @@ -254,6 +288,7 @@ def build_genomicarray(
bwpath,
idx,
feature_annotation_options.aggregate_function,
total_length,
)
for idx, bwpath in enumerate(files)
]
Expand All @@ -276,16 +311,25 @@ def build_genomicarray(
)


def _write_intervals_to_tiledb(outpath, intervals, bwpath, bwidx, agg_func):
def _write_intervals_to_tiledb(
outpath, intervals, bwpath, bwidx, agg_func, total_length
):
"""Wrapper to extract the data for the given intervals from the bigwig file and write the output to the tiledb
file."""
data = ubw.extract_bw_intervals_as_vec(bwpath, intervals, agg_func)
data = ubw.wrapper_extract_bw_values(
bw_path=bwpath,
intervals=intervals,
agg_func=agg_func,
total_length=total_length,
)

if data is not None and len(data) > 0:
uta.write_frame_intervals_to_tiledb(outpath, data=data, y_idx=bwidx)


def _wrapper_extract_bwinfo(args):
"""Wrapper for multiprocessing multiple files and intervals."""
counts_uri, input_intervals, bwpath, idx, agg_func = args
return _write_intervals_to_tiledb(counts_uri, input_intervals, bwpath, idx, agg_func)
counts_uri, input_intervals, bwpath, idx, agg_func, total_length = args
return _write_intervals_to_tiledb(
counts_uri, input_intervals, bwpath, idx, agg_func, total_length
)
6 changes: 3 additions & 3 deletions src/genomicarrays/build_options.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataclasses import dataclass
from typing import Dict, Literal
from typing import Dict, Literal, Optional, Callable

import numpy as np

Expand Down Expand Up @@ -122,14 +122,14 @@ class FeatureAnnotationOptions:

aggregate_function:
A callable to summarize the values in a given
interval. Defaults to NumPy's `nanmean`.
interval. Defaults to None.
"""

skip: bool = False
dtype: np.dtype = np.uint32
tiledb_store_name: str = "feature_annotation"
column_types: Dict[str, np.dtype] = None
aggregate_function: callable = np.nanmean
aggregate_function: Optional[Callable] = None

def __post_init__(self):
if self.column_types is None:
Expand Down
83 changes: 79 additions & 4 deletions src/genomicarrays/utils_bw.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Tuple
from typing import Optional, Tuple

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -37,10 +37,28 @@ def extract_bw_values(
# )


def extract_bw_intervals_as_vec(
def wrapper_extract_bw_values(
bw_path: str,
intervals: pd.DataFrame,
agg_func: callable = np.nanmean,
agg_func: Optional[callable],
val_dtype: np.dtype = np.float32,
total_length: int = None,
) -> np.ndarray:
if total_length is None:
total_length = len(intervals)

if agg_func is not None:
return extract_bw_values_as_vec(bw_path=bw_path, intervals=intervals, agg_func=agg_func, val_dtype=val_dtype)
else:
return extract_bw_intervals_as_vec(
bw_path=bw_path, intervals=intervals, val_dtype=val_dtype, total_length=total_length
)


def extract_bw_values_as_vec(
bw_path: str,
intervals: pd.DataFrame,
agg_func: Optional[callable] = None,
val_dtype: np.dtype = np.float32,
) -> np.ndarray:
"""Extract data from BigWig for a given region and apply the aggregate function.
Expand All @@ -54,7 +72,7 @@ def extract_bw_intervals_as_vec(

agg_func:
Aggregate function to apply.
Defaults to np.nanmean.
Defaults to None.

val_dtype:
Dtype of the resulting array.
Expand All @@ -81,3 +99,60 @@ def extract_bw_intervals_as_vec(
results.append(np.nan)

return np.array(results, dtype=val_dtype)


def _get_empty_array(size, val_dtype):
out_array = np.empty(size, dtype=val_dtype)
out_array.fill(np.nan)

return out_array


def extract_bw_intervals_as_vec(
bw_path: str,
intervals: pd.DataFrame,
total_length: int,
val_dtype: np.dtype = np.float32,
) -> np.ndarray:
"""Extract data from BigWig for a given region.

Args:
bw_path:
Path to the BigWig file.

intervals:
List of intervals to extract.

total_length:
Size of all the regions.

val_dtype:
Dtype of the resulting array.

Returns:
A vector with length as the number of intervals,
a value if the file contains the data for the corresponding
region or ``np.nan`` if the region is not measured.
"""
bwfile = bw.open(bw_path)

out_array = _get_empty_array(total_length, val_dtype=val_dtype)

for row in intervals.itertuples():
if row.seqnames in bwfile.chroms():
try:
data = bwfile.intervals(row.seqnames, row.starts, row.ends)
tmp_out = _get_empty_array(row.ends - row.starts, val_dtype=val_dtype)
if data is not None and len(data) != 0:
for d in data:
_strt = max(0, d[0] - row.starts)
_end = min(d[1] - row.starts, row.ends)
tmp_out[_strt:_end] = d[2]

out_array[row.genarr_feature_index_start : row.genarr_feature_index_start + row.ends - row.starts] = (
tmp_out
)
except Exception as _:
pass

return out_array
Loading