From d12588e18015bde9c5a75b18fdbf5580ee478713 Mon Sep 17 00:00:00 2001 From: Jayaram Kancherla Date: Wed, 20 Nov 2024 08:13:54 -0800 Subject: [PATCH] Aggregate function is now optional to store base pair level data (#10) --- src/genomicarrays/GenomicArrayDataset.py | 62 ++++++--------- src/genomicarrays/build_genomicarray.py | 68 ++++++++++++++--- src/genomicarrays/build_options.py | 6 +- src/genomicarrays/utils_bw.py | 83 +++++++++++++++++++- tests/test_ingest.py | 32 +++++++- tests/test_query.py | 96 +++++++++++++++++++++++- 6 files changed, 286 insertions(+), 61 deletions(-) diff --git a/src/genomicarrays/GenomicArrayDataset.py b/src/genomicarrays/GenomicArrayDataset.py index 7bfcaf5..0488979 100644 --- a/src/genomicarrays/GenomicArrayDataset.py +++ b/src/genomicarrays/GenomicArrayDataset.py @@ -64,12 +64,8 @@ def __init__( self._dataset_path = dataset_path # TODO: Maybe switch to on-demand loading of these objects self._matrix_tdb_tdb = tiledb.open(f"{dataset_path}/{matrix_tdb_uri}", "r") - self._feature_annotation_tdb = tiledb.open( - f"{dataset_path}/{feature_annotation_uri}", "r" - ) - self._sample_metadata_tdb = tiledb.open( - f"{dataset_path}/{sample_metadata_uri}", "r" - ) + self._feature_annotation_tdb = tiledb.open(f"{dataset_path}/{feature_annotation_uri}", "r") + self._sample_metadata_tdb = tiledb.open(f"{dataset_path}/{sample_metadata_uri}", "r") def __del__(self): self._matrix_tdb_tdb.close() @@ -110,9 +106,7 @@ def get_feature_annotation_index(self) -> List[str]: res = qtd.get_a_column(self._feature_annotation_tdb, "genarr_feature_index") return res["genarr_feature_index"].tolist() - def get_feature_subset( - self, subset: Union[slice, List[str], tiledb.QueryCondition], columns=None - ) -> pd.DataFrame: + def get_feature_subset(self, subset: Union[slice, List[str], tiledb.QueryCondition], columns=None) -> pd.DataFrame: """Slice the ``feature_annotation`` store. Args: @@ -147,16 +141,12 @@ def get_feature_subset( _not_avail.append(col) if len(_not_avail) > 0: - raise ValueError( - f"Columns '{', '.join(_not_avail)}' are not available." - ) + raise ValueError(f"Columns '{', '.join(_not_avail)}' are not available.") if qtd._is_list_strings(subset): subset = self._get_indices_for_gene_list(subset) - return qtd.subset_frame( - self._feature_annotation_tdb, subset=subset, columns=columns - ) + return qtd.subset_frame(self._feature_annotation_tdb, subset=subset, columns=columns) #### ## Subset methods for the `sample_metadata` TileDB file. @@ -183,9 +173,7 @@ def get_sample_metadata_column(self, column_name: str) -> pd.DataFrame: res = qtd.get_a_column(self._sample_metadata_tdb, column_name=column_name) return res[column_name] - def get_sample_subset( - self, subset: Union[slice, tiledb.QueryCondition], columns=None - ) -> pd.DataFrame: + def get_sample_subset(self, subset: Union[slice, tiledb.QueryCondition], columns=None) -> pd.DataFrame: """Slice the ``sample_metadata`` store. Args: @@ -216,13 +204,9 @@ def get_sample_subset( _not_avail.append(col) if len(_not_avail) > 0: - raise ValueError( - f"Columns '{', '.join(_not_avail)}' are not available." - ) + raise ValueError(f"Columns '{', '.join(_not_avail)}' are not available.") - return qtd.subset_frame( - self._sample_metadata_tdb, subset=subset, columns=columns - ) + return qtd.subset_frame(self._sample_metadata_tdb, subset=subset, columns=columns) #### ## Subset methods for the `matrix` TileDB file. @@ -266,16 +250,14 @@ def get_matrix_subset(self, subset: Union[int, Sequence, tuple]) -> pd.DataFrame shape=(len(subset[0]), len(subset[1])), ) else: - raise ValueError( - f"`{type(self).__name__}` only supports 2-dimensional slicing." - ) + raise ValueError(f"`{type(self).__name__}` only supports 2-dimensional slicing.") #### ## Subset methods by cell and gene dimensions. #### def get_slice( self, - feature_subset: Union[slice, tiledb.QueryCondition], + feature_subset: Union[slice, int], sample_subset: Union[slice, List[str], tiledb.QueryCondition], ) -> GenomicArrayDatasetSlice: """Subset a ``GenomicArrayDataset``. @@ -297,14 +279,24 @@ def get_slice( _ssubset = self.get_sample_subset(sample_subset) _sample_indices = _ssubset.index.tolist() + if not isinstance(feature_subset, (int, slice)): + raise TypeError("feature indices must be continous; either a 'slice' or 'int' index.") _fsubset = self.get_feature_subset(feature_subset) - _feature_indices = _fsubset.index.tolist() + start_findex = _fsubset["genarr_feature_start_index"].astype(int).min() + end_findex = _fsubset["genarr_feature_end_index"].astype(int).max() + + # expand intervals + final_rows = [] + for row in _fsubset.itertuples(): + for i, _ in enumerate(range(int(row.genarr_feature_start_index), int(row.genarr_feature_end_index))): + final_rows.append(row._replace(starts=i + row.starts, ends=i + row.starts + 1)) + _feature_df = pd.DataFrame(final_rows) - _msubset = self.get_matrix_subset((_feature_indices, _sample_indices)) + _msubset = self.get_matrix_subset((list(range(start_findex, end_findex)), _sample_indices)) return GenomicArrayDatasetSlice( _ssubset, - _fsubset, + _feature_df, _msubset, ) @@ -351,13 +343,9 @@ def __getitem__( elif len(args) == 2: return self.get_slice(args[0], args[1]) else: - raise ValueError( - f"`{type(self).__name__}` only supports 2-dimensional slicing." - ) + raise ValueError(f"`{type(self).__name__}` only supports 2-dimensional slicing.") - raise TypeError( - "args must be a sequence or a scalar integer or string or a tuple of atmost 2 values." - ) + raise TypeError("args must be a sequence or a scalar integer or string or a tuple of atmost 2 values.") #### ## Misc methods. diff --git a/src/genomicarrays/build_genomicarray.py b/src/genomicarrays/build_genomicarray.py index fc69b8e..43d03e0 100644 --- a/src/genomicarrays/build_genomicarray.py +++ b/src/genomicarrays/build_genomicarray.py @@ -174,7 +174,29 @@ def build_genomicarray( missing = required_cols - set(input_intervals.columns) raise ValueError(f"Missing required columns: {missing}") else: - raise TypeError("'input_intervals' is not an expected type (either 'str' or 'Dataframe').") + raise TypeError( + "'input_intervals' is not an expected type (either 'str' or 'Dataframe')." + ) + + # append start index for each interval + input_intervals["widths"] = input_intervals["ends"] - input_intervals["starts"] + total_length = None + if feature_annotation_options.aggregate_function is None: + total_length = int(input_intervals["widths"].sum()) + counter = input_intervals["widths"].shift(1) + counter[0] = 0 + input_intervals["genarr_feature_start_index"] = counter.cumsum().astype(int) + else: + counter = [1] * len(input_intervals) + total_length = len(input_intervals) + counter[0] = 0 + input_intervals["genarr_feature_start_index"] = ( + pd.Series(counter).cumsum().astype(int) + ) + + ends = input_intervals["genarr_feature_start_index"].shift(-1) + ends.iloc[-1] = total_length + input_intervals["genarr_feature_end_index"] = ends.astype(int) if not feature_annotation_options.skip: @@ -187,13 +209,19 @@ def build_genomicarray( input_intervals["sequences"] = sequences - _col_types = utf.infer_column_types(input_intervals, feature_annotation_options.column_types) + _col_types = utf.infer_column_types( + input_intervals, feature_annotation_options.column_types + ) if "genarr_feature_index" not in input_intervals.columns: input_intervals["genarr_feature_index"] = range(0, len(input_intervals)) - _feature_output_uri = f"{output_path}/{feature_annotation_options.tiledb_store_name}" - utf.create_tiledb_frame_from_dataframe(_feature_output_uri, input_intervals, column_types=_col_types) + _feature_output_uri = ( + f"{output_path}/{feature_annotation_options.tiledb_store_name}" + ) + utf.create_tiledb_frame_from_dataframe( + _feature_output_uri, input_intervals, column_types=_col_types + ) if optimize_tiledb: uta.optimize_tiledb_array(_feature_output_uri) @@ -223,10 +251,16 @@ def build_genomicarray( raise TypeError("'sample_metadata' is not an expected type.") if not sample_metadata_options.skip: - _col_types = utf.infer_column_types(sample_metadata, sample_metadata_options.column_types) + _col_types = utf.infer_column_types( + sample_metadata, sample_metadata_options.column_types + ) - _sample_output_uri = f"{output_path}/{sample_metadata_options.tiledb_store_name}" - utf.create_tiledb_frame_from_dataframe(_sample_output_uri, sample_metadata, column_types=_col_types) + _sample_output_uri = ( + f"{output_path}/{sample_metadata_options.tiledb_store_name}" + ) + utf.create_tiledb_frame_from_dataframe( + _sample_output_uri, sample_metadata, column_types=_col_types + ) if optimize_tiledb: uta.optimize_tiledb_array(_sample_output_uri) @@ -242,7 +276,7 @@ def build_genomicarray( x_dim_dtype=feature_annotation_options.dtype, y_dim_dtype=sample_metadata_options.dtype, matrix_dim_dtype=matrix_options.dtype, - x_dim_length=len(input_intervals), + x_dim_length=total_length, y_dim_length=len(files), is_sparse=False, ) @@ -254,6 +288,7 @@ def build_genomicarray( bwpath, idx, feature_annotation_options.aggregate_function, + total_length, ) for idx, bwpath in enumerate(files) ] @@ -276,10 +311,17 @@ def build_genomicarray( ) -def _write_intervals_to_tiledb(outpath, intervals, bwpath, bwidx, agg_func): +def _write_intervals_to_tiledb( + outpath, intervals, bwpath, bwidx, agg_func, total_length +): """Wrapper to extract the data for the given intervals from the bigwig file and write the output to the tiledb file.""" - data = ubw.extract_bw_intervals_as_vec(bwpath, intervals, agg_func) + data = ubw.wrapper_extract_bw_values( + bw_path=bwpath, + intervals=intervals, + agg_func=agg_func, + total_length=total_length, + ) if data is not None and len(data) > 0: uta.write_frame_intervals_to_tiledb(outpath, data=data, y_idx=bwidx) @@ -287,5 +329,7 @@ def _write_intervals_to_tiledb(outpath, intervals, bwpath, bwidx, agg_func): def _wrapper_extract_bwinfo(args): """Wrapper for multiprocessing multiple files and intervals.""" - counts_uri, input_intervals, bwpath, idx, agg_func = args - return _write_intervals_to_tiledb(counts_uri, input_intervals, bwpath, idx, agg_func) + counts_uri, input_intervals, bwpath, idx, agg_func, total_length = args + return _write_intervals_to_tiledb( + counts_uri, input_intervals, bwpath, idx, agg_func, total_length + ) diff --git a/src/genomicarrays/build_options.py b/src/genomicarrays/build_options.py index b070591..491689c 100644 --- a/src/genomicarrays/build_options.py +++ b/src/genomicarrays/build_options.py @@ -1,5 +1,5 @@ from dataclasses import dataclass -from typing import Dict, Literal +from typing import Dict, Literal, Optional, Callable import numpy as np @@ -122,14 +122,14 @@ class FeatureAnnotationOptions: aggregate_function: A callable to summarize the values in a given - interval. Defaults to NumPy's `nanmean`. + interval. Defaults to None. """ skip: bool = False dtype: np.dtype = np.uint32 tiledb_store_name: str = "feature_annotation" column_types: Dict[str, np.dtype] = None - aggregate_function: callable = np.nanmean + aggregate_function: Optional[Callable] = None def __post_init__(self): if self.column_types is None: diff --git a/src/genomicarrays/utils_bw.py b/src/genomicarrays/utils_bw.py index 473f35b..c53de62 100644 --- a/src/genomicarrays/utils_bw.py +++ b/src/genomicarrays/utils_bw.py @@ -1,4 +1,4 @@ -from typing import Tuple +from typing import Optional, Tuple import numpy as np import pandas as pd @@ -37,10 +37,28 @@ def extract_bw_values( # ) -def extract_bw_intervals_as_vec( +def wrapper_extract_bw_values( bw_path: str, intervals: pd.DataFrame, - agg_func: callable = np.nanmean, + agg_func: Optional[callable], + val_dtype: np.dtype = np.float32, + total_length: int = None, +) -> np.ndarray: + if total_length is None: + total_length = len(intervals) + + if agg_func is not None: + return extract_bw_values_as_vec(bw_path=bw_path, intervals=intervals, agg_func=agg_func, val_dtype=val_dtype) + else: + return extract_bw_intervals_as_vec( + bw_path=bw_path, intervals=intervals, val_dtype=val_dtype, total_length=total_length + ) + + +def extract_bw_values_as_vec( + bw_path: str, + intervals: pd.DataFrame, + agg_func: Optional[callable] = None, val_dtype: np.dtype = np.float32, ) -> np.ndarray: """Extract data from BigWig for a given region and apply the aggregate function. @@ -54,7 +72,7 @@ def extract_bw_intervals_as_vec( agg_func: Aggregate function to apply. - Defaults to np.nanmean. + Defaults to None. val_dtype: Dtype of the resulting array. @@ -81,3 +99,60 @@ def extract_bw_intervals_as_vec( results.append(np.nan) return np.array(results, dtype=val_dtype) + + +def _get_empty_array(size, val_dtype): + out_array = np.empty(size, dtype=val_dtype) + out_array.fill(np.nan) + + return out_array + + +def extract_bw_intervals_as_vec( + bw_path: str, + intervals: pd.DataFrame, + total_length: int, + val_dtype: np.dtype = np.float32, +) -> np.ndarray: + """Extract data from BigWig for a given region. + + Args: + bw_path: + Path to the BigWig file. + + intervals: + List of intervals to extract. + + total_length: + Size of all the regions. + + val_dtype: + Dtype of the resulting array. + + Returns: + A vector with length as the number of intervals, + a value if the file contains the data for the corresponding + region or ``np.nan`` if the region is not measured. + """ + bwfile = bw.open(bw_path) + + out_array = _get_empty_array(total_length, val_dtype=val_dtype) + + for row in intervals.itertuples(): + if row.seqnames in bwfile.chroms(): + try: + data = bwfile.intervals(row.seqnames, row.starts, row.ends) + tmp_out = _get_empty_array(row.ends - row.starts, val_dtype=val_dtype) + if data is not None and len(data) != 0: + for d in data: + _strt = max(0, d[0] - row.starts) + _end = min(d[1] - row.starts, row.ends) + tmp_out[_strt:_end] = d[2] + + out_array[row.genarr_feature_index_start : row.genarr_feature_index_start + row.ends - row.starts] = ( + tmp_out + ) + except Exception as _: + pass + + return out_array diff --git a/tests/test_ingest.py b/tests/test_ingest.py index cb69f70..4b6e9a6 100644 --- a/tests/test_ingest.py +++ b/tests/test_ingest.py @@ -4,7 +4,7 @@ import pandas as pd import tiledb -from genomicarrays import build_genomicarray +from genomicarrays import build_genomicarray, FeatureAnnotationOptions __author__ = "Jayaram Kancherla" __copyright__ = "Jayaram Kancherla" @@ -33,6 +33,36 @@ def test_ingest_bigwigs(): assert len(features) == 15 + assert np.all(np.isnan(cfp.multi_index[:5, 0]["data"].flatten())) + assert np.all(np.isnan(cfp.multi_index[:5, 1]["data"].flatten())) + + sfp = tiledb.open(f"{tempdir}/sample_metadata", "r") + samples = sfp.df[:] + assert len(samples) == 2 + +def test_ingest_bigwigs_agg(): + tempdir = tempfile.mkdtemp() + + strts = np.arange(300, 600, 20) + features = pd.DataFrame( + {"seqnames": ["chr1"] * 15, "starts": strts, "ends": strts + 15} + ) + + build_genomicarray( + output_path=tempdir, + files=["tests/data/test1.bw", "tests/data/test2.bw"], + features=features, + genome_fasta="tests/data/test.fa", + feature_annotation_options=FeatureAnnotationOptions(aggregate_function=np.nanmean) + ) + + cfp = tiledb.open(f"{tempdir}/coverage", "r") + ffp = tiledb.open(f"{tempdir}/feature_annotation", "r") + + features = ffp.df[:] + + assert len(features) == 15 + assert np.allclose( cfp.multi_index[0:5, 0]["data"], np.repeat(1, 5), diff --git a/tests/test_query.py b/tests/test_query.py index c31dded..1f87c56 100644 --- a/tests/test_query.py +++ b/tests/test_query.py @@ -5,14 +5,18 @@ import pandas as pd import pytest import tiledb -from genomicarrays import GenomicArrayDataset, build_genomicarray, MatrixOptions +from genomicarrays import ( + GenomicArrayDataset, + build_genomicarray, + FeatureAnnotationOptions, +) __author__ = "Jayaram Kancherla" __copyright__ = "Jayaram Kancherla" __license__ = "MIT" -def test_query(): +def test_query_agg(): tempdir = tempfile.mkdtemp() strts = np.arange(300, 600, 20) @@ -24,7 +28,10 @@ def test_query(): output_path=tempdir, files=["tests/data/test1.bw", "tests/data/test2.bw"], features=features, - genome_fasta="tests/data/test.fa" + genome_fasta="tests/data/test.fa", + feature_annotation_options=FeatureAnnotationOptions( + aggregate_function=np.nanmean + ), ) assert dataset is not None @@ -70,7 +77,16 @@ def test_query(): assert len(cd.get_sample_subset("genarr_sample == 'sample_1'")) == 1 assert sorted(cd.get_feature_annotation_columns()) == sorted( - ["seqnames", "starts", "ends", "genarr_feature_index", "sequences"] + [ + "ends", + "genarr_feature_index", + "genarr_feature_start_index", + "genarr_feature_end_index", + "seqnames", + "sequences", + "starts", + "widths", + ] ) assert len(cd.get_feature_annotation_column("genarr_feature_index")) == 15 assert len(cd.get_feature_subset("genarr_feature_index == 1")) == 1 @@ -81,3 +97,75 @@ def test_query(): assert result1.to_anndata() is not None assert result1.to_rangedsummarizedexperiment() is not None + + +def test_query_noagg(): + tempdir = tempfile.mkdtemp() + + strts = np.arange(300, 600, 20) + features = pd.DataFrame( + {"seqnames": ["chr1"] * 15, "starts": strts, "ends": strts + 15} + ) + + dataset = build_genomicarray( + output_path=tempdir, + files=["tests/data/test1.bw", "tests/data/test2.bw"], + features=features, + genome_fasta="tests/data/test.fa", + ) + + assert dataset is not None + assert isinstance(dataset, GenomicArrayDataset) + + cd = GenomicArrayDataset(dataset_path=tempdir) + + ffp = tiledb.open(f"{tempdir}/feature_annotation", "r") + + features_rt = ffp.df[:] + assert np.allclose(np.array(features["starts"]), np.array(features_rt["starts"])) + assert np.allclose(np.array(features["ends"]), np.array(features_rt["ends"])) + assert all([a == b for a, b in zip(features["seqnames"], features_rt["seqnames"])]) + assert "genarr_feature_index" in features_rt.columns + + assert cd.shape == (15, 2) + assert len(cd) == 15 + + result1 = cd[:, 0] + + assert result1 is not None + assert result1.matrix.shape == (225, 1) + + assert np.all(np.isnan(result1.matrix.flatten())) + + result2 = cd[:, 1] + assert result2.matrix.shape == (225, 1) + + assert np.all(np.isnan(result2.matrix.flatten())) + + assert cd.get_sample_metadata_columns() == [ + "genarr_sample", + ] + assert len(cd.get_sample_metadata_column("genarr_sample")) == 2 + assert len(cd.get_sample_subset("genarr_sample == 'sample_1'")) == 1 + + assert sorted(cd.get_feature_annotation_columns()) == sorted( + [ + "ends", + "genarr_feature_index", + "genarr_feature_start_index", + "genarr_feature_end_index", + "seqnames", + "sequences", + "starts", + "widths", + ] + ) + assert len(cd.get_feature_annotation_column("genarr_feature_index")) == 15 + assert len(cd.get_feature_subset("genarr_feature_index == 1")) == 1 + + result1 = cd.get_slice(slice(0, 5), slice(None)) + assert result1 is not None + assert result1.matrix.shape == (90, 2) + + assert result1.to_anndata() is not None + assert result1.to_rangedsummarizedexperiment() is not None