Skip to content

Commit

Permalink
Aggregate function is now optional to store base pair level data (#10)
Browse files Browse the repository at this point in the history
  • Loading branch information
jkanche authored Nov 20, 2024
1 parent 5dc31dd commit d12588e
Show file tree
Hide file tree
Showing 6 changed files with 286 additions and 61 deletions.
62 changes: 25 additions & 37 deletions src/genomicarrays/GenomicArrayDataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,8 @@ def __init__(
self._dataset_path = dataset_path
# TODO: Maybe switch to on-demand loading of these objects
self._matrix_tdb_tdb = tiledb.open(f"{dataset_path}/{matrix_tdb_uri}", "r")
self._feature_annotation_tdb = tiledb.open(
f"{dataset_path}/{feature_annotation_uri}", "r"
)
self._sample_metadata_tdb = tiledb.open(
f"{dataset_path}/{sample_metadata_uri}", "r"
)
self._feature_annotation_tdb = tiledb.open(f"{dataset_path}/{feature_annotation_uri}", "r")
self._sample_metadata_tdb = tiledb.open(f"{dataset_path}/{sample_metadata_uri}", "r")

def __del__(self):
self._matrix_tdb_tdb.close()
Expand Down Expand Up @@ -110,9 +106,7 @@ def get_feature_annotation_index(self) -> List[str]:
res = qtd.get_a_column(self._feature_annotation_tdb, "genarr_feature_index")
return res["genarr_feature_index"].tolist()

def get_feature_subset(
self, subset: Union[slice, List[str], tiledb.QueryCondition], columns=None
) -> pd.DataFrame:
def get_feature_subset(self, subset: Union[slice, List[str], tiledb.QueryCondition], columns=None) -> pd.DataFrame:
"""Slice the ``feature_annotation`` store.
Args:
Expand Down Expand Up @@ -147,16 +141,12 @@ def get_feature_subset(
_not_avail.append(col)

if len(_not_avail) > 0:
raise ValueError(
f"Columns '{', '.join(_not_avail)}' are not available."
)
raise ValueError(f"Columns '{', '.join(_not_avail)}' are not available.")

if qtd._is_list_strings(subset):
subset = self._get_indices_for_gene_list(subset)

return qtd.subset_frame(
self._feature_annotation_tdb, subset=subset, columns=columns
)
return qtd.subset_frame(self._feature_annotation_tdb, subset=subset, columns=columns)

####
## Subset methods for the `sample_metadata` TileDB file.
Expand All @@ -183,9 +173,7 @@ def get_sample_metadata_column(self, column_name: str) -> pd.DataFrame:
res = qtd.get_a_column(self._sample_metadata_tdb, column_name=column_name)
return res[column_name]

def get_sample_subset(
self, subset: Union[slice, tiledb.QueryCondition], columns=None
) -> pd.DataFrame:
def get_sample_subset(self, subset: Union[slice, tiledb.QueryCondition], columns=None) -> pd.DataFrame:
"""Slice the ``sample_metadata`` store.
Args:
Expand Down Expand Up @@ -216,13 +204,9 @@ def get_sample_subset(
_not_avail.append(col)

if len(_not_avail) > 0:
raise ValueError(
f"Columns '{', '.join(_not_avail)}' are not available."
)
raise ValueError(f"Columns '{', '.join(_not_avail)}' are not available.")

return qtd.subset_frame(
self._sample_metadata_tdb, subset=subset, columns=columns
)
return qtd.subset_frame(self._sample_metadata_tdb, subset=subset, columns=columns)

####
## Subset methods for the `matrix` TileDB file.
Expand Down Expand Up @@ -266,16 +250,14 @@ def get_matrix_subset(self, subset: Union[int, Sequence, tuple]) -> pd.DataFrame
shape=(len(subset[0]), len(subset[1])),
)
else:
raise ValueError(
f"`{type(self).__name__}` only supports 2-dimensional slicing."
)
raise ValueError(f"`{type(self).__name__}` only supports 2-dimensional slicing.")

####
## Subset methods by cell and gene dimensions.
####
def get_slice(
self,
feature_subset: Union[slice, tiledb.QueryCondition],
feature_subset: Union[slice, int],
sample_subset: Union[slice, List[str], tiledb.QueryCondition],
) -> GenomicArrayDatasetSlice:
"""Subset a ``GenomicArrayDataset``.
Expand All @@ -297,14 +279,24 @@ def get_slice(
_ssubset = self.get_sample_subset(sample_subset)
_sample_indices = _ssubset.index.tolist()

if not isinstance(feature_subset, (int, slice)):
raise TypeError("feature indices must be continous; either a 'slice' or 'int' index.")
_fsubset = self.get_feature_subset(feature_subset)
_feature_indices = _fsubset.index.tolist()
start_findex = _fsubset["genarr_feature_start_index"].astype(int).min()
end_findex = _fsubset["genarr_feature_end_index"].astype(int).max()

# expand intervals
final_rows = []
for row in _fsubset.itertuples():
for i, _ in enumerate(range(int(row.genarr_feature_start_index), int(row.genarr_feature_end_index))):
final_rows.append(row._replace(starts=i + row.starts, ends=i + row.starts + 1))
_feature_df = pd.DataFrame(final_rows)

_msubset = self.get_matrix_subset((_feature_indices, _sample_indices))
_msubset = self.get_matrix_subset((list(range(start_findex, end_findex)), _sample_indices))

return GenomicArrayDatasetSlice(
_ssubset,
_fsubset,
_feature_df,
_msubset,
)

Expand Down Expand Up @@ -351,13 +343,9 @@ def __getitem__(
elif len(args) == 2:
return self.get_slice(args[0], args[1])
else:
raise ValueError(
f"`{type(self).__name__}` only supports 2-dimensional slicing."
)
raise ValueError(f"`{type(self).__name__}` only supports 2-dimensional slicing.")

raise TypeError(
"args must be a sequence or a scalar integer or string or a tuple of atmost 2 values."
)
raise TypeError("args must be a sequence or a scalar integer or string or a tuple of atmost 2 values.")

####
## Misc methods.
Expand Down
68 changes: 56 additions & 12 deletions src/genomicarrays/build_genomicarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,29 @@ def build_genomicarray(
missing = required_cols - set(input_intervals.columns)
raise ValueError(f"Missing required columns: {missing}")
else:
raise TypeError("'input_intervals' is not an expected type (either 'str' or 'Dataframe').")
raise TypeError(
"'input_intervals' is not an expected type (either 'str' or 'Dataframe')."
)

# append start index for each interval
input_intervals["widths"] = input_intervals["ends"] - input_intervals["starts"]
total_length = None
if feature_annotation_options.aggregate_function is None:
total_length = int(input_intervals["widths"].sum())
counter = input_intervals["widths"].shift(1)
counter[0] = 0
input_intervals["genarr_feature_start_index"] = counter.cumsum().astype(int)
else:
counter = [1] * len(input_intervals)
total_length = len(input_intervals)
counter[0] = 0
input_intervals["genarr_feature_start_index"] = (
pd.Series(counter).cumsum().astype(int)
)

ends = input_intervals["genarr_feature_start_index"].shift(-1)
ends.iloc[-1] = total_length
input_intervals["genarr_feature_end_index"] = ends.astype(int)

if not feature_annotation_options.skip:

Expand All @@ -187,13 +209,19 @@ def build_genomicarray(

input_intervals["sequences"] = sequences

_col_types = utf.infer_column_types(input_intervals, feature_annotation_options.column_types)
_col_types = utf.infer_column_types(
input_intervals, feature_annotation_options.column_types
)

if "genarr_feature_index" not in input_intervals.columns:
input_intervals["genarr_feature_index"] = range(0, len(input_intervals))

_feature_output_uri = f"{output_path}/{feature_annotation_options.tiledb_store_name}"
utf.create_tiledb_frame_from_dataframe(_feature_output_uri, input_intervals, column_types=_col_types)
_feature_output_uri = (
f"{output_path}/{feature_annotation_options.tiledb_store_name}"
)
utf.create_tiledb_frame_from_dataframe(
_feature_output_uri, input_intervals, column_types=_col_types
)

if optimize_tiledb:
uta.optimize_tiledb_array(_feature_output_uri)
Expand Down Expand Up @@ -223,10 +251,16 @@ def build_genomicarray(
raise TypeError("'sample_metadata' is not an expected type.")

if not sample_metadata_options.skip:
_col_types = utf.infer_column_types(sample_metadata, sample_metadata_options.column_types)
_col_types = utf.infer_column_types(
sample_metadata, sample_metadata_options.column_types
)

_sample_output_uri = f"{output_path}/{sample_metadata_options.tiledb_store_name}"
utf.create_tiledb_frame_from_dataframe(_sample_output_uri, sample_metadata, column_types=_col_types)
_sample_output_uri = (
f"{output_path}/{sample_metadata_options.tiledb_store_name}"
)
utf.create_tiledb_frame_from_dataframe(
_sample_output_uri, sample_metadata, column_types=_col_types
)

if optimize_tiledb:
uta.optimize_tiledb_array(_sample_output_uri)
Expand All @@ -242,7 +276,7 @@ def build_genomicarray(
x_dim_dtype=feature_annotation_options.dtype,
y_dim_dtype=sample_metadata_options.dtype,
matrix_dim_dtype=matrix_options.dtype,
x_dim_length=len(input_intervals),
x_dim_length=total_length,
y_dim_length=len(files),
is_sparse=False,
)
Expand All @@ -254,6 +288,7 @@ def build_genomicarray(
bwpath,
idx,
feature_annotation_options.aggregate_function,
total_length,
)
for idx, bwpath in enumerate(files)
]
Expand All @@ -276,16 +311,25 @@ def build_genomicarray(
)


def _write_intervals_to_tiledb(outpath, intervals, bwpath, bwidx, agg_func):
def _write_intervals_to_tiledb(
outpath, intervals, bwpath, bwidx, agg_func, total_length
):
"""Wrapper to extract the data for the given intervals from the bigwig file and write the output to the tiledb
file."""
data = ubw.extract_bw_intervals_as_vec(bwpath, intervals, agg_func)
data = ubw.wrapper_extract_bw_values(
bw_path=bwpath,
intervals=intervals,
agg_func=agg_func,
total_length=total_length,
)

if data is not None and len(data) > 0:
uta.write_frame_intervals_to_tiledb(outpath, data=data, y_idx=bwidx)


def _wrapper_extract_bwinfo(args):
"""Wrapper for multiprocessing multiple files and intervals."""
counts_uri, input_intervals, bwpath, idx, agg_func = args
return _write_intervals_to_tiledb(counts_uri, input_intervals, bwpath, idx, agg_func)
counts_uri, input_intervals, bwpath, idx, agg_func, total_length = args
return _write_intervals_to_tiledb(
counts_uri, input_intervals, bwpath, idx, agg_func, total_length
)
6 changes: 3 additions & 3 deletions src/genomicarrays/build_options.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataclasses import dataclass
from typing import Dict, Literal
from typing import Dict, Literal, Optional, Callable

import numpy as np

Expand Down Expand Up @@ -122,14 +122,14 @@ class FeatureAnnotationOptions:
aggregate_function:
A callable to summarize the values in a given
interval. Defaults to NumPy's `nanmean`.
interval. Defaults to None.
"""

skip: bool = False
dtype: np.dtype = np.uint32
tiledb_store_name: str = "feature_annotation"
column_types: Dict[str, np.dtype] = None
aggregate_function: callable = np.nanmean
aggregate_function: Optional[Callable] = None

def __post_init__(self):
if self.column_types is None:
Expand Down
83 changes: 79 additions & 4 deletions src/genomicarrays/utils_bw.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Tuple
from typing import Optional, Tuple

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -37,10 +37,28 @@ def extract_bw_values(
# )


def extract_bw_intervals_as_vec(
def wrapper_extract_bw_values(
bw_path: str,
intervals: pd.DataFrame,
agg_func: callable = np.nanmean,
agg_func: Optional[callable],
val_dtype: np.dtype = np.float32,
total_length: int = None,
) -> np.ndarray:
if total_length is None:
total_length = len(intervals)

if agg_func is not None:
return extract_bw_values_as_vec(bw_path=bw_path, intervals=intervals, agg_func=agg_func, val_dtype=val_dtype)
else:
return extract_bw_intervals_as_vec(
bw_path=bw_path, intervals=intervals, val_dtype=val_dtype, total_length=total_length
)


def extract_bw_values_as_vec(
bw_path: str,
intervals: pd.DataFrame,
agg_func: Optional[callable] = None,
val_dtype: np.dtype = np.float32,
) -> np.ndarray:
"""Extract data from BigWig for a given region and apply the aggregate function.
Expand All @@ -54,7 +72,7 @@ def extract_bw_intervals_as_vec(
agg_func:
Aggregate function to apply.
Defaults to np.nanmean.
Defaults to None.
val_dtype:
Dtype of the resulting array.
Expand All @@ -81,3 +99,60 @@ def extract_bw_intervals_as_vec(
results.append(np.nan)

return np.array(results, dtype=val_dtype)


def _get_empty_array(size, val_dtype):
out_array = np.empty(size, dtype=val_dtype)
out_array.fill(np.nan)

return out_array


def extract_bw_intervals_as_vec(
bw_path: str,
intervals: pd.DataFrame,
total_length: int,
val_dtype: np.dtype = np.float32,
) -> np.ndarray:
"""Extract data from BigWig for a given region.
Args:
bw_path:
Path to the BigWig file.
intervals:
List of intervals to extract.
total_length:
Size of all the regions.
val_dtype:
Dtype of the resulting array.
Returns:
A vector with length as the number of intervals,
a value if the file contains the data for the corresponding
region or ``np.nan`` if the region is not measured.
"""
bwfile = bw.open(bw_path)

out_array = _get_empty_array(total_length, val_dtype=val_dtype)

for row in intervals.itertuples():
if row.seqnames in bwfile.chroms():
try:
data = bwfile.intervals(row.seqnames, row.starts, row.ends)
tmp_out = _get_empty_array(row.ends - row.starts, val_dtype=val_dtype)
if data is not None and len(data) != 0:
for d in data:
_strt = max(0, d[0] - row.starts)
_end = min(d[1] - row.starts, row.ends)
tmp_out[_strt:_end] = d[2]

out_array[row.genarr_feature_index_start : row.genarr_feature_index_start + row.ends - row.starts] = (
tmp_out
)
except Exception as _:
pass

return out_array
Loading

0 comments on commit d12588e

Please sign in to comment.