diff --git a/.github/workflows/documentation.yaml b/.github/workflows/documentation.yaml index eed4a73..9729d70 100644 --- a/.github/workflows/documentation.yaml +++ b/.github/workflows/documentation.yaml @@ -22,7 +22,7 @@ jobs: - name: Sphinx Build run: | sphinx-build -b html docs/source docs/build - sphinx-build docs/source -W -b linkcheck -d docs/build/doctrees docs/build/html + sphinx-build docs/source -b linkcheck -d docs/build/doctrees docs/build/html - name: Deploy to GitHub Pages uses: peaceiris/actions-gh-pages@v3 if: ${{ github.event_name == 'push' && github.ref == 'refs/heads/dev/python_repo' }} diff --git a/docs/source/conf.py b/docs/source/conf.py index 4f9e850..851f628 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -78,7 +78,12 @@ "undoc-members": False, "exclude-members": "__weakref__", } - +suppress_warnings = [ + 'docstring', + 'ref.citation', # Example: Suppress warnings about citations + 'image.nonlocal_uri', # Example: Suppress warnings about non-local images + # Add other warnings to suppress as needed +] # Add any paths that contain templates here, relative to this directory. templates_path = ["_templates"] diff --git a/docs/source/cpc2.rst b/docs/source/cpc2.rst new file mode 100644 index 0000000..fe05999 --- /dev/null +++ b/docs/source/cpc2.rst @@ -0,0 +1,8 @@ +CPC2 Module Documentation +=================================== + +.. automodule:: ensembl.tools.anno.finalise_geneset.cpc2 + :members: + :undoc-members: + :show-inheritance: + diff --git a/docs/source/diamond.rst b/docs/source/diamond.rst new file mode 100644 index 0000000..2b5fcb8 --- /dev/null +++ b/docs/source/diamond.rst @@ -0,0 +1,8 @@ +DIAMOND Module Documentation +=================================== + +.. automodule:: ensembl.tools.anno.finalise_geneset.diamond + :members: + :undoc-members: + :show-inheritance: + diff --git a/docs/source/finalise_geneset.rst b/docs/source/finalise_geneset.rst new file mode 100644 index 0000000..7ee9878 --- /dev/null +++ b/docs/source/finalise_geneset.rst @@ -0,0 +1,8 @@ +Geneset Finalisation Module Documentation +=================================== + +.. automodule:: ensembl.tools.anno.finalise_geneset.finalise_geneset + :members: + :undoc-members: + :show-inheritance: + diff --git a/docs/source/index.rst b/docs/source/index.rst index 1f4fe61..e2d9862 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -38,18 +38,24 @@ to install the project. license cmsearch + cpc2 cpg + diamond dust eponine genblast minimap red repeatmasker + rnasamba scallop star stringtie trf trnascan + + finalise genset + load into core Indices and tables ================== diff --git a/docs/source/load_into_core.rst b/docs/source/load_into_core.rst new file mode 100644 index 0000000..bd030f6 --- /dev/null +++ b/docs/source/load_into_core.rst @@ -0,0 +1,8 @@ +Annotation loading into core db - Module Documentation +=================================== + +.. automodule:: ensembl.tools.anno.load_into_core + :members: + :undoc-members: + :show-inheritance: + diff --git a/docs/source/rnasamba.rst b/docs/source/rnasamba.rst new file mode 100644 index 0000000..60cff3d --- /dev/null +++ b/docs/source/rnasamba.rst @@ -0,0 +1,8 @@ +RNASamba Module Documentation +=================================== + +.. automodule:: ensembl.tools.anno.finalise_geneset.rnasamba + :members: + :undoc-members: + :show-inheritance: + diff --git a/pyproject.toml b/pyproject.toml index 6656e13..e9b6a4c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -150,6 +150,11 @@ run_star = "ensembl.tools.anno.transcriptomic_annotation.star:main" run_minimap2 = "ensembl.tools.anno.transcriptomic_annotation.minimap:main" run_stringtie = "ensembl.tools.anno.transcriptomic_annotation.stringtie:main" run_scallop = "ensembl.tools.anno.transcriptomic_annotation.scallop:main" +run_rnasamba = "ensembl.tools.anno.finalise_geneset.rnasamba:main" +run_cpc2 = "ensembl.tools.anno.finalise_geneset.cpc2:main" +run_diamond = "ensembl.tools.anno.finalise_geneset.diamond:main" +run_finalise_geneset = "ensembl.tools.anno.finalise_geneset.finalise_geneset:main" +load_results_to_ensembl_db = "ensembl.tools.anno.load_results_to_ensembl_db:main" # This is configuration specific to the `setuptools` build backend. # If you are using a different build backend, you will need to change this. [tool.setuptools] diff --git a/src/python/ensembl/tools/anno/finalise_genset/cpc2.py b/src/python/ensembl/tools/anno/finalise_genset/cpc2.py new file mode 100644 index 0000000..25be1c3 --- /dev/null +++ b/src/python/ensembl/tools/anno/finalise_genset/cpc2.py @@ -0,0 +1,172 @@ +# See the NOTICE file distributed with this work for additional information +# regarding copyright ownership. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +CPC2 (Coding Potential Calculator 2) analyzes RNA sequences to determine whether +they are likely coding or non-coding based on sequence features and machine learning. + +CPC2: a fast and accurate coding potential calculator based on sequence intrinsic features +Yu-Jian Kang, De-Chang Yang, Lei Kong, Mei Hou, Yu-Qi Meng, Liping Wei, Ge Gao +Nucleic Acids Research, Volume 45, Issue W1, 3 July 2017, Pages W12–W16, https://doi.org/10.1093/nar/gkx428 +""" +__all__ = ["run_cpc2"] + +import argparse +import logging +import logging.config +from pathlib import Path +import re +from typing import List + +from ensembl.tools.anno.utils._utils import ( + create_dir, + check_file, + run_command, +) + +logger = logging.getLogger(__name__) + + +def run_cpc2( # pylint:disable=dangerous-default-value + validation_dir: Path, + cdna_file: Path, + cpc2_bin: Path = Path( + "/hps/software/users/ensembl/genebuild/genebuild_virtual_user/singularity/rnasamba_latest.sif" + ), +) -> List[List[str]]: + """ + Reads the output file from CPC2 and extracts coding potential information. + + :param validation_dir: Path to the directory where the CPC2 results will be stored. + :type validation_dir: Path + :param cdna_file: Path to the input cDNA file containing transcript sequences in FASTA format. + :type cdna_file: Path + :param cpc2_bin: Path to the CPC2 software. + :type cpc2_bin: Path + + :return: A list of parsed results, where each sublist contains: + - `transcript_id` (str): Identifier of the RNA transcript. + - `coding_probability` (str): Probability of the transcript being coding. + - `coding_potential` (str): Predicted coding potential (e.g., coding/non-coding). + - `transcript_length` (str): Length of the transcript in nucleotides. + - `peptide_length` (str): Length of the predicted peptide, if applicable. + :rtype: List[List[str]] + + """ + + cpc2_output_path = validation_dir / "cpc2.tsv" + check_file(cpc2_output_path) + cpc2_volume = f"{validation_dir}/:/app:rw" + cpc2_cmd = [ + "singularity", + "exec", + "--bind", + str(cpc2_volume), + str(cpc2_bin), + "python3", + "/CPC2_standalone-1.0.1/bin/CPC2.py", + "-i", + str(cdna_file), + "--ORF", + "-o", + str(cpc2_output_path), + ] + logger.info("Running CPC2 command: %s", " ".join(map(str, cpc2_cmd))) + run_command(cpc2_cmd) + cpc2_output_path = cpc2_output_path.with_suffix(".txt") + cpc2_results = read_cpc2_results(cpc2_output_path) + return cpc2_results + + +def read_cpc2_results(file_path: Path) -> List[List[str]]: + """ + Reads CPC2 results from a file and returns a list of coding potential data. + + Args: + file_path (Path): Path to the CPC2 results file. + + Returns: + List[List[str]]: A list of records, each containing transcript ID, + coding probability, coding potential, transcript length, and peptide length. + """ + results = [] + + with file_path.open("r") as file_in: + for line in file_in: + line = line.rstrip() + if re.search(r"^#ID", line): + continue # Skip header lines + + eles = line.split("\t") + if len(eles) != 9: + continue # Skip malformed lines + + ( + transcript_id, + transcript_length, + peptide_length, + _, + _, + _, + _, + coding_probability, + coding_potential, + ) = eles # Unpack all three elements + results.append( + [ + transcript_id, + coding_probability, + coding_potential, + transcript_length, + peptide_length, + ] + ) + + return results + + +def parse_args(): + """Parse command line arguments.""" + parser = argparse.ArgumentParser(description="CPC2 arguments") + parser.add_argument("--validation_dir", required=True, help="Validation directory path") + parser.add_argument("--cdna_file", required=True, help="Path for the cDNA fasta file") + parser.add_argument( + "--cpc2_bin", + help="CPC2 executable path", + ) + return parser.parse_args() + + +def main(): + """CPC2's entry-point.""" + args = parse_args() + + log_file_path = create_dir(args.output_dir, "log") / "cpc2.log" + loginipath = Path(__file__).parents[6] / "conf" / "logging.conf" + + logging.config.fileConfig( + loginipath, + defaults={"logfilename": str(log_file_path)}, + disable_existing_loggers=False, + ) + + run_cpc2( + Path(args.validation_dir), + Path(args.cdna_file), + Path(args.cpc2_bin), + ) + + +if __name__ == "__main__": + main() diff --git a/src/python/ensembl/tools/anno/finalise_genset/diamond.py b/src/python/ensembl/tools/anno/finalise_genset/diamond.py new file mode 100644 index 0000000..38b4b3c --- /dev/null +++ b/src/python/ensembl/tools/anno/finalise_genset/diamond.py @@ -0,0 +1,232 @@ +# See the NOTICE file distributed with this work for additional information +# regarding copyright ownership. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +DIAMOND is a sequence aligner for protein and translated DNA searches, +designed for high performance analysis of big sequence data.. + +Buchfink B, Reuter K, Drost HG, "Sensitive protein alignments at tree-of-life scale +using DIAMOND", Nature Methods 18, 366–368 (2021). doi:10.1038/s41592-021-01101-x +""" +__all__ = ["run_diamond"] + +import argparse +import logging +import logging.config +import multiprocessing +from pathlib import Path + +from typing import List, Optional, Union + +from ensembl.tools.anno.utils._utils import ( + create_dir, + run_command, + split_protein_file, +) + +logger = logging.getLogger(__name__) + + +def run_diamond( # pylint:disable=dangerous-default-value + validation_dir: Path, + amino_acid_file: Path, + diamond_validation_db: Path, + num_threads: int = 1, + diamond_bin: Path = Path("diamond"), +) -> List[List[str]]: + """ + Reads the output file from CPC2 and extracts coding potential information. + + :param validation_dir: Path to the directory where the CPC2 results will be stored. + :type validation_dir: Path + :param diamond_validation_db: Path to the DIAMOND database. + :type diamond_validation_db: Path + :param amino_acid_file: Path to the input FASTA file containing amino acid sequences. + :type amino_acid_file: Path + :param num_threads: Number of threads. + :type num_threads: int, default 1 + :param diamond_bin: Path to the Diamond software. + :type diamond_bin: Path + + :return: DIAMOND results, where each sublist contains: + - `transcript_id` (str): Identifier of the protein sequence. + - `e_value` (str): E-value of the alignment. + :rtype: List[List[str]] + + """ + + diamond_results = [] + if diamond_validation_db is not None: + diamond_output_dir = create_dir(validation_dir, "diamond_output") + diamond_validation( + diamond_validation_db, + amino_acid_file, + diamond_output_dir, + num_threads, + diamond_bin, + ) + diamond_results = read_diamond_results(diamond_output_dir) + + return diamond_results + + +def diamond_validation( + diamond_validation_db: Path, + amino_acid_file: Path, + diamond_output_dir: Path, + num_threads: int = 1, + diamond_bin: Path = Path("diamond"), +) -> None: + """ + Perform DIAMOND validation on batched protein sequences. + + Args: + diamond_validation_db (Path): Path to the DIAMOND database. + amino_acid_file (Path): Path to the input FASTA file containing amino acid sequences. + diamond_output_dir (Path): Directory to store DIAMOND output files. + num_threads (int, optional): Number of threads to use for multiprocessing. Defaults to 1. + diamond_bin (Path, optional): Path to the DIAMOND binary. Defaults to "diamond". + + Returns: + None + """ + batched_protein_files = split_protein_file(amino_acid_file, diamond_output_dir, 100) + + pool = multiprocessing.Pool(int(num_threads)) # pylint: disable=consider-using-with + for batched_protein_file in batched_protein_files: + pool.apply_async( + multiprocess_diamond, + args=( + batched_protein_file, + diamond_output_dir, + diamond_validation_db, + diamond_bin, + ), + ) + pool.close() + pool.join() + + +def multiprocess_diamond( + batched_protein_file: Path, + diamond_output_dir: Path, + diamond_validation_db: Path, + diamond_bin: Path = Path("diamond"), +) -> None: + """ + Run DIAMOND on a single batch of protein sequences. + + Args: + batched_protein_file (Path): Path to the batched protein sequence file. + diamond_output_dir (Path): Directory to store DIAMOND output files. + diamond_validation_db (Path): Path to the DIAMOND database. + diamond_bin (Path, optional): Path to the DIAMOND binary. Defaults to "diamond". + + Returns: + None + """ + # batch_num = os.path.splitext(batched_protein_file)[0] + # batch_dir = os.path.dirname(batched_protein_file) + diamond_output_file = batched_protein_file.with_suffix("dmdout") + logger.info("Running diamond on %s :", batched_protein_file) + + diamond_cmd = [ + str(diamond_bin), + "blastp", + "--query", + str(batched_protein_file), + "--db", + str(diamond_validation_db), + "--out", + str(diamond_output_file), + ] + + logger.info("Running diamond command: %s", " ".join(map(str, diamond_cmd))) + run_command(diamond_cmd) + cmd = ["mv", diamond_output_file, diamond_output_dir] + run_command(cmd) + + +def read_diamond_results(diamond_output_dir: Path) -> List[List[str]]: + """ + Parse DIAMOND output files and extract results. + + Args: + diamond_output_dir (Path): Directory containing DIAMOND output files. + + Returns: + List[List[str]]: Parsed DIAMOND results, where each sublist contains: + - `transcript_id` (str): Identifier of the protein sequence. + - `e_value` (str): E-value of the alignment. + """ + results = [] + diamond_files = list(diamond_output_dir.glob("*.dmdout")) + for file_path in diamond_files: + with file_path.open("r") as file_in: + for line in file_in: + line = line.rstrip() + + eles = line.split("\t") + if len(eles) != 12: + continue + + transcript_id = eles[0] + e_value = eles[10] + results.append([transcript_id, e_value]) + return results + + +def parse_args(): + """Parse command line arguments.""" + parser = argparse.ArgumentParser(description="CPC2 arguments") + parser.add_argument("--validation_dir", required=True, help="Validation directory path") + parser.add_argument( + "--amino_acid_file", + required=True, + help="Path to the input FASTA file containing amino acid sequences.", + ) + parser.add_argument("--diamond_validation_db", required=True,help="Path to the DIAMOND database.") + parser.add_argument( + "--diamond_bin", + help="DIAMOND executable path", + ) + parser.add_argument("--num_threads", type=int, default=1, help="Number of threads") + + return parser.parse_args() + + +def main(): + """DIAMOND's entry-point.""" + args = parse_args() + + log_file_path = create_dir(args.output_dir, "log") / "diamond.log" + loginipath = Path(__file__).parents[6] / "conf" / "logging.conf" + + logging.config.fileConfig( + loginipath, + defaults={"logfilename": str(log_file_path)}, + disable_existing_loggers=False, + ) + + run_diamond( + Path(args.validation_dir), + Path(args.amino_acid_file), + Path(args.diamond_validation_db), + int(args.num_threads), + Path(args.diamond_bin), + ) + + +if __name__ == "__main__": + main() diff --git a/src/python/ensembl/tools/anno/finalise_genset/finalise_geneset.py b/src/python/ensembl/tools/anno/finalise_genset/finalise_geneset.py new file mode 100644 index 0000000..9966c02 --- /dev/null +++ b/src/python/ensembl/tools/anno/finalise_genset/finalise_geneset.py @@ -0,0 +1,883 @@ +# See the NOTICE file distributed with this work for additional information +# regarding copyright ownership. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Set of functions used to collect the results of the subpipelines and generate the final output +This logic will be revised in the nextflow pipeline +""" +__all__ = ["run_finalise_geneset"] + +import argparse +import logging +import logging.config +import multiprocessing +from pathlib import Path +import re +import subprocess +import shutil +from typing import List, Dict, Optional + +from ensembl.tools.anno.utils._utils import ( + get_seq_region_length, + fasta_to_dict, + create_dir, + check_gtf_content, + read_gtf_genes, + run_command, +) +from ensembl.tools.anno.finalise_genset.cpc2 import run_cpc2 +from ensembl.tools.anno.finalise_genset.rnasamba import run_rnasamba +from ensembl.tools.anno.finalise_genset.diamond import run_diamond + + +logger = logging.getLogger(__name__) + + +def run_finalise_geneset(#pylint:disable=too-many-branches + main_output_dir: Path, + genome_file: Path, + seq_region_names: List[str], + diamond_validation_db: Path, + validation_type: str = "relaxed", + num_threads: int = 1, + cpc2_bin: Path = Path( + "/hps/software/users/ensembl/genebuild/genebuild_virtual_user/singularity/test_cpc2.sif" + ), + rnasamba_bin: Path = Path( + "/hps/software/users/ensembl/genebuild/genebuild_virtual_user/singularity/rnasamba_latest.sif" + ), + rnasamba_weights: Path = Path( + "/nfs/production/flicek/ensembl/genebuild/genebuild_virtual_user/rnasamba_data/full_length_weights.hdf5" # pylint:disable=line-too-long + ), + diamond_bin: Path = Path("diamond"), + min_single_exon_pep_length: int = 100, + min_multi_exon_pep_length: int = 75, + min_single_source_probability: float = 0.8, + min_single_exon_probability: float = 0.9, +): + """Collect results + + :param main_output_dir: Path for the output dir + :type main_output_dir: Path + :param genome_file: Path for genome fasta file + :type genome_file: Path + :param seq_region_names: list of seq region names + :type seq_region_names: List[str] + :param diamond_validation_db: Path for diamond validation db + :type diamond_validation_db: Path + :param validation_type: type of validation ("relaxed" or "moderate"). + :type validation_type: str, default "relaxed" + :param num_threads: Num of threads. Defaults to 1. + :type num_threads: int, default 1 + :param cpc2_bin: CPC2 software path + :type cpc2_bin: Path + :param rnasamba_bin: RNASamba software path + :type rnasamba_bin: Path + :param rnasamba_weights: RNASamba weights path + :type rnasamba_weights: Path + :param diamond_bin: Path to the Diamond software. + :type diamond_bin: Path + :param min_single_exon_pep_length: Minimum peptide length for single exon. + :type min_single_exon_pep_length: int = 100, + :param min_multi_exon_pep_length: Minimum peptide length for multiple exons. + :type min_multi_exon_pep_length: int, default 75 + :param min_single_source_probability: Minimum probability for single source. + :type min_single_source_probability: float, default 0.8 + :param min_single_exon_probability: Minimum average probability for single exon. + :type min_single_exon_probability: float, default 0.9 + + """ + + final_annotation_dir = create_dir(main_output_dir, "annotation_output") + region_annotation_dir = create_dir(final_annotation_dir, "initial_region_gtfs") + final_region_annotation_dir = create_dir(final_annotation_dir, "final_region_gtfs") + utr_region_annotation_dir = create_dir(final_annotation_dir, "utr_region_gtfs") + validation_dir = create_dir(final_annotation_dir, "cds_validation") + seq_region_lengths = get_seq_region_length(genome_file, 0) + output_file = final_annotation_dir / "annotation.gtf" + if output_file.exists(): + transcript_count = check_gtf_content(output_file, "transcript") + if transcript_count > 0: + logger.info("Final gtf file exists, skipping analysis") + return + + protein_annotation_raw = main_output_dir / "genblast_output" / "annotation.gtf" + minimap2_annotation_raw = main_output_dir / "minimap2_output" / "annotation.gtf" + stringtie_annotation_raw = main_output_dir / "stringtie_output" / "annotation.gtf" + scallop_annotation_raw = main_output_dir / "scallop_output" / "annotation.gtf" + busco_annotation_raw = main_output_dir / "busco_output" / "annotation.gtf" + + transcript_selector_script = ( + Path(__file__).parents[5] + / "perl" + / "ensembl" + / "tools" + / "anno" + / "support_scripts_perl" + / "select_best_transcripts.pl" + ) + finalise_geneset_script = ( + Path(__file__).parents[5] + / "perl" + / "ensembl" + / "tools" + / "anno" + / "support_scripts_perl" + / "finalise_geneset.pl" + ) + clean_geneset_script = ( + Path(__file__).parents[5] + / "perl" + / "ensembl" + / "tools" + / "anno" + / "support_scripts_perl" + / "clean_geneset.pl" + ) + clean_utrs_script = ( + Path(__file__).parents[5] + / "perl" + / "ensembl" + / "tools" + / "anno" + / "support_scripts_perl" + / "clean_utrs_and_lncRNAs.pl" + ) + gtf_to_seq_script = ( + Path(__file__).parents[5] + / "perl" + / "ensembl" + / "tools" + / "anno" + / "support_scripts_perl" + / "gtf_to_seq.pl" + ) + + transcriptomic_annotation_raw = main_output_dir / "transcriptomic_raw.gtf" + + with transcriptomic_annotation_raw.open("w") as file_out: + for transcriptomic_file in [ + minimap2_annotation_raw, + scallop_annotation_raw, + stringtie_annotation_raw, + ]: + if not transcriptomic_file.exists(): + logger.info("No annotation.gtf file found in %s, skipping", transcriptomic_file) + continue + + with transcriptomic_file.open("r") as file_in: + for line in file_in: + file_out.write(line.rstrip()) + + # Copy the raw files into the annotation dir + copy_raw_files(busco_annotation_raw, final_annotation_dir / "busco_raw.gtf") + copy_raw_files(protein_annotation_raw, final_annotation_dir / "protein_raw.gtf") + + # select best transcript + generic_select_cmd = [ + "perl", + str(transcript_selector_script), + "-genome_file", + str(genome_file), + ] + pool = multiprocessing.Pool(int(num_threads)) # pylint: disable=consider-using-with + for seq_region_name in seq_region_names: + # The selection script needs different params depending on + # whether the seqs are from transcriptomic data or not + region_details = f"{seq_region_name}.rs1.re{seq_region_lengths[seq_region_name]}" + transcriptomic_region_gtf_path = region_annotation_dir / f"{region_details}.trans.gtf" + busco_region_gtf_path = region_annotation_dir / f"{region_details}.busco.gtf" + protein_region_gtf_path = region_annotation_dir / f"{region_details}.protein.gtf" + if transcriptomic_annotation_raw: + cmd = generic_select_cmd.copy() + cmd.extend( + [ + "-region_details", + region_details, + "-input_gtf_file", + str(transcriptomic_annotation_raw), + "-output_gtf_file", + str(transcriptomic_region_gtf_path), + "-cds_search", + "-final_biotype", + "transcriptomic", + ] + ) + pool.apply_async(run_command, args=(cmd,)) + if busco_annotation_raw: + cmd = generic_select_cmd.copy() + cmd.extend( + [ + "-region_details", + region_details, + "-input_gtf_file", + str(busco_annotation_raw), + "-output_gtf_file", + str(busco_region_gtf_path), + "-all_cds_exons", + "-final_biotype", + "busco", + ] + ) + pool.apply_async(run_command, args=(cmd,)) + if protein_annotation_raw: + cmd = generic_select_cmd.copy() + cmd.extend( + [ + "-region_details", + region_details, + "-input_gtf_file", + str(protein_annotation_raw), + "-output_gtf_file", + str(protein_region_gtf_path), + "-clean_transcripts", + "-all_cds_exons", + "-final_biotype", + "protein", + ] + ) + pool.apply_async(run_command, args=(cmd,)) + + pool.close() + pool.join() + + # At this point we will have the region files for each seq region and we can merge them + merge_finalise_output_files( + final_annotation_dir, + region_annotation_dir, + ".trans.gtf", + "transcriptomic", + ) + merge_finalise_output_files(final_annotation_dir, region_annotation_dir, ".busco.gtf", "busco") + merge_finalise_output_files(final_annotation_dir, region_annotation_dir, ".protein.gtf", "protein") + + # Create a single GTF file with all the selected transcripts + # now that they have proper ids + fully_merged_gtf_path = final_annotation_dir / "all_selected_transcripts.gtf" + # Collect all *_sel.gtf files + gtf_files = list(final_annotation_dir.glob("*_sel.gtf")) + + # Ensure there are files to merge + if not gtf_files: + raise FileNotFoundError( # pylint:disable=raising-format-tuple + "No *_sel.gtf files found in %s", str(final_annotation_dir) + ) + + try: + with fully_merged_gtf_path.open("w+") as fully_merged_gtf_out: + # at thei stage we shouldn't have header duplicated so cat should be robust + merge_gtf_cmd = ["cat"] + [str(gtf_file) for gtf_file in gtf_files] + subprocess.run(merge_gtf_cmd, stdout=fully_merged_gtf_out, check=True) + logger.info("Merged GTF files into %s", fully_merged_gtf_path) + except Exception as e: # pylint:disable=raising-format-tuple,broad-exception-caught + print("An error occurred while merging GTF files: %s", e) + + # Now collapse the gene set + generic_finalise_cmd = [ + "perl", + str(finalise_geneset_script), + "-genome_file", + str(genome_file), + ] + + pool = multiprocessing.Pool(int(num_threads)) # pylint:disable=consider-using-with + for seq_region_name in seq_region_names: + region_details = f"{seq_region_name}.rs1.re{seq_region_lengths[seq_region_name]}" + final_region_gtf_path = final_region_annotation_dir / f"{region_details}.final.gtf" + cmd = generic_finalise_cmd.copy() + cmd.extend( + [ + "-region_details", + region_details, + "-input_gtf_file", + str(fully_merged_gtf_path), + "-output_gtf_file", + str(final_region_gtf_path), + ] + ) + pool.apply_async(run_command, args=(cmd,)) + + pool.close() + pool.join() + + merge_finalise_output_files( + final_annotation_dir, + final_region_annotation_dir, + ".final.gtf", + "prevalidation", + ) + merged_gtf_file = final_annotation_dir / "prevalidation_sel.gtf" + merged_cdna_file = final_annotation_dir / "prevalidation_sel.cdna.fa" + merged_amino_acid_file = final_annotation_dir / "prevalidation_sel.prot.fa" + # validation + updated_gtf_lines = validate_coding_transcripts( + merged_cdna_file, + merged_amino_acid_file, + validation_dir, + validation_type, + diamond_validation_db, + merged_gtf_file, + num_threads, + cpc2_bin, + rnasamba_bin, + rnasamba_weights, + diamond_bin, + min_single_exon_pep_length, + min_multi_exon_pep_length, + min_single_source_probability, + min_single_exon_probability, + ) + + postvalidation_gtf_file = final_annotation_dir / "postvalidation.gtf" + with postvalidation_gtf_file.open("w+") as file_out: + for line in updated_gtf_lines: + file_out.write(line) + cleaned_initial_gtf_file = final_annotation_dir / "cleaned_pre_utr.gtf" + cleaned_utr_gtf_file = final_annotation_dir / "annotation.gtf" + + logger.info("Cleaning initial set") + cleaning_cmd = [ + "perl", + str(clean_geneset_script), + "-genome_file", + str(genome_file), + "-gtf_file", + str(postvalidation_gtf_file), + "-output_gtf_file", + str(cleaned_initial_gtf_file), + ] + logger.info("Running cleaning command: %s", " ".join(map(str, cleaning_cmd))) + + run_command(cleaning_cmd) + + # Clean UTRs + generic_clean_utrs_cmd = [ + "perl", + str(clean_utrs_script), + "-genome_file", + str(genome_file), + "-input_gtf_file", + str(cleaned_initial_gtf_file), + ] + pool = multiprocessing.Pool(int(num_threads)) # pylint:disable=consider-using-with + for seq_region_name in seq_region_names: + region_details = f"{seq_region_name}.rs1.re{seq_region_lengths[seq_region_name]}" + utr_region_gtf_path = utr_region_annotation_dir / f"{region_details}.utr.gtf" + + cmd = generic_clean_utrs_cmd.copy() + cmd.extend( + [ + "-region_details", + region_details, + "-output_gtf_file", + str(utr_region_gtf_path), + ] + ) + pool.apply_async(run_command, args=(cmd,)) + pool.close() + pool.join() + + merge_finalise_output_files( + final_annotation_dir, + utr_region_annotation_dir, + ".utr.gtf", + "annotation", + ) + cmd = [ + "mv", + str(final_annotation_dir / "annotation_sel.gtf"), + str(cleaned_utr_gtf_file), + ] + + run_command(cmd) + + logger.info("Dumping transcript and translation sequences") + dumping_cmd = [ + "perl", + str(gtf_to_seq_script), + "-genome_file", + str(genome_file), + "-gtf_file", + str(cleaned_utr_gtf_file), + ] + + logger.info("Running dumping command: %s", " ".join(map(str, dumping_cmd))) + + run_command(dumping_cmd) + + logger.info("Finished creating gene set") + + +def validate_coding_transcripts( + cdna_file: Path, + amino_acid_file: Path, + validation_dir: Path, + validation_type: str, + diamond_validation_db: Path, + gtf_file: Path, + num_threads: int, + cpc2_bin: Path = Path( + "/hps/software/users/ensembl/genebuild/genebuild_virtual_user/singularity/test_cpc2.sif" + ), + rnasamba_bin: Path = Path( + "/hps/software/users/ensembl/genebuild/genebuild_virtual_user/singularity/rnasamba_latest.sif" + ), + rnasamba_weights: Path = Path( + "/nfs/production/flicek/ensembl/genebuild/genebuild_virtual_user/rnasamba_data/full_length_weights.hdf5" # pylint:disable=line-too-long + ), + diamond_bin: Path = Path("diamond"), + min_single_exon_pep_length: int = 100, + min_multi_exon_pep_length: int = 75, + min_single_source_probability: float = 0.8, + min_single_exon_probability: float = 0.9, +): + """Run validation on protein coding transcript + using CPC2, RNASamba, Diamond + + Args: + cdna_file (Path): Input cdna file + amino_acid_file (Path): Input protein file + validation_dir (Path): Validation output directory + validation_type (str): _description_ + diamond_validation_db (Path): _description_ + gtf_file (Path): _description_ + num_threads (int): number of threads + + Returns: + _type_: _description_ + """ + + logger.info("Running CDS validation with RNAsamba and CPC2") + # rnasamba_weights = config["rnasamba"]["weights"] + rnasamba_results = run_rnasamba(validation_dir, cdna_file, rnasamba_weights, rnasamba_bin) + cpc2_results = run_cpc2(validation_dir, cdna_file, cpc2_bin) + diamond_results = run_diamond( + validation_dir, + amino_acid_file, + diamond_validation_db, + num_threads, + diamond_bin, + ) + combined_results = combine_results(rnasamba_results, cpc2_results, diamond_results) + logger.info("read gtf genes") + parsed_gtf_genes = read_gtf_genes(gtf_file) + updated_gtf_lines = update_gtf_genes( + parsed_gtf_genes, + combined_results, + validation_type, + min_single_exon_pep_length, + min_multi_exon_pep_length, + min_single_source_probability, + min_single_exon_probability, + ) + + return updated_gtf_lines + + +def combine_results( + rnasamba_results: List[List[str]], + cpc2_results: List[List[str]], + diamond_results: Optional[List[List[str]]] = None, +) -> Dict[str, List[str]]: + """ + Combine results from RNA-Samba, CPC2, and DIAMOND analyses into a single dictionary. + + This function integrates coding potential predictions from RNA-Samba, + CPC2, and optional similarity results from DIAMOND for a set of transcripts. + + Args: + rnasamba_results (List[List[Union[str, float]]]): + Results from RNA-Samba, where each sublist contains: + - transcript_id (str): Transcript identifier. + - coding_probability (float): Probability of being coding. + - coding_potential (float): Coding potential value. + + cpc2_results (List[List[Union[str, float, int]]]): + Results from CPC2, where each sublist contains: + - transcript_id (str): Transcript identifier. + - coding_probability (float): Probability of being coding. + - coding_potential (float): Coding potential value. + - transcript_length (int): Length of the transcript. + - peptide_length (int): Length of the predicted peptide. + + diamond_results (Optional[List[List[Union[str, float]]]], optional): + Results from DIAMOND (default is None). Each sublist contains: + - transcript_id (str): Transcript identifier. + - e_value (float): E-value for sequence similarity. + + Returns: + Dict[str, List[Union[float, int]]]: + A dictionary mapping `transcript_id` to a list of combined values: + RNA-Samba predictions, CPC2 predictions, and optional DIAMOND results. + """ + transcript_ids: Dict[str, List[str]] = {} + + # Process RNA-Samba results + for result in rnasamba_results: + transcript_id, coding_probability, coding_potential = result[:3] + if transcript_id not in transcript_ids: + transcript_ids[transcript_id] = [coding_probability, coding_potential] + + # Process CPC2 results + for result in cpc2_results: + transcript_id, coding_probability, coding_potential, transcript_length, peptide_length = result[:5] + if transcript_id in transcript_ids: + transcript_ids[transcript_id].extend( + [coding_probability, coding_potential, transcript_length, peptide_length] + ) + + # Process DIAMOND results (if provided) + if diamond_results: + for result in diamond_results: + transcript_id, e_value = result[:2] + if transcript_id in transcript_ids: + transcript_ids[transcript_id].append(e_value) + + return transcript_ids + + +def update_gtf_genes(#pylint:disable=too-many-branches + parsed_gtf_genes: Dict[str, Dict[str, Dict[str, List[str]]]], + combined_results: Dict[str, List[str]], + validation_type: str, + min_single_exon_pep_length: int = 100, + min_multi_exon_pep_length: int = 75, + min_single_source_probability: float = 0.8, + min_single_exon_probability: float = 0.9, +): + """Update GTF genes based on validation criteria. + + Args: + parsed_gtf_genes (Dict): Parsed GTF genes data. + combined_results (Dict): Combined validation results. + validation_type (str): Validation strictness ("relaxed" or "moderate"). + min_single_exon_pep_length (int): Minimum peptide length for single exon. + min_multi_exon_pep_length (int): Minimum peptide length for multiple exons. + min_single_source_probability (float): Minimum probability for single source. + min_single_exon_probability (float): Minimum average probability for single exon. + + Returns: + List[str]: Updated GTF lines. + """ + output_lines = [] + + for gene_id, transcripts in parsed_gtf_genes.items():#pylint:disable=unused-variable + for transcript_id, transcript_data in transcripts.items(): + transcript_line = "".join(transcript_data["transcript"]) + single_cds_exon_transcript = 0 + + # Extract translation coordinates + translation_match = re.search(r'; translation_coords "([^"]+)";', str(transcript_line)) + if translation_match: + translation_coords_list = translation_match.group(1).split(":") + # Determine if it's a single-exon CDS + if translation_coords_list[0] == translation_coords_list[3]: + single_cds_exon_transcript = 1 + + exon_lines = transcript_data["exons"] + validation_results = combined_results[transcript_id] + + rnasamba_coding_probability = float(validation_results[0]) + rnasamba_coding_potential = validation_results[1] + cpc2_coding_probability = float(validation_results[2]) + cpc2_coding_potential = validation_results[3] + transcript_length = int(validation_results[4]) + peptide_length = int(validation_results[5]) + diamond_e_value = None + if len(validation_results) == 7: + diamond_e_value = validation_results[6] + + # Calculate coding probabilities + avg_coding_probability = (rnasamba_coding_probability + cpc2_coding_probability) / 2 + max_coding_probability = max(rnasamba_coding_probability, cpc2_coding_probability) + + match = re.search(r'; biotype "([^"]+)";', str(transcript_line)) + if match: + biotype = match.group(1) + if biotype == "busco" or biotype == "protein":#pylint:disable=consider-using-in + transcript_line = update_biotype(str(transcript_line), biotype, "protein_coding") + output_lines.append(transcript_line) + output_lines.extend(exon_lines) + continue + + # Note that the below looks at validating things + # under different levels of strictness + # There are a few different continue statements, + # where transcripts will be skipped resulting + # in a smaller post validation file. It mainly + # removes single coding exon genes with no real + # support or for multi-exon lncRNAs that are less than 200bp long + if single_cds_exon_transcript == 1 and validation_type == "relaxed": + if diamond_e_value is not None: + transcript_line = update_biotype(transcript_line, biotype, "protein_coding") + + elif ( + rnasamba_coding_potential == "coding" + and cpc2_coding_potential == "coding" + and peptide_length >= min_single_exon_pep_length + ): + transcript_line = update_biotype(transcript_line, biotype, "protein_coding") + elif ( + (rnasamba_coding_potential == "coding" or cpc2_coding_potential == "coding") + and peptide_length >= min_single_exon_pep_length + and max_coding_probability >= min_single_source_probability + ): + transcript_line = update_biotype(transcript_line, biotype, "protein_coding") + else: + continue + elif single_cds_exon_transcript == 1 and validation_type == "moderate": + if diamond_e_value is not None and peptide_length >= min_single_exon_pep_length: + transcript_line = update_biotype(transcript_line, biotype, "protein_coding") + elif ( + (rnasamba_coding_potential == "coding" and cpc2_coding_potential == "coding") + and peptide_length >= min_single_exon_pep_length + and avg_coding_probability >= min_single_exon_probability + ): + transcript_line = update_biotype(transcript_line, biotype, "protein_coding") + else: + continue + else: + if diamond_e_value is not None: + transcript_line = update_biotype(transcript_line, biotype, "protein_coding") + elif ( + rnasamba_coding_potential == "coding" + and cpc2_coding_potential == "coding" + and peptide_length >= min_multi_exon_pep_length + ): + transcript_line = update_biotype(transcript_line, biotype, "protein_coding") + elif ( + (rnasamba_coding_potential == "coding" or cpc2_coding_potential == "coding") + and peptide_length >= min_multi_exon_pep_length + and max_coding_probability >= min_single_source_probability + ): + transcript_line = update_biotype(transcript_line, biotype, "protein_coding") + elif transcript_length >= 200: + transcript_line = update_biotype(transcript_line, biotype, "lncRNA") + transcript_line = re.sub(' translation_coords "[^"]+";', "", transcript_line) + else: + continue + + output_lines.append(transcript_line) + output_lines.extend(exon_lines) + + return output_lines + + +def update_biotype(transcript_line: str, current_biotype: str, new_biotype: str) -> str: + """Update the biotype in a transcript line. + + Args: + transcript_line (str): The line containing the transcript information. + current_biotype (str): The biotype to be replaced. + new_biotype (str): The new biotype to set. + + Returns: + str: The updated transcript line with the new biotype. + """ + return re.sub(f'; biotype "{current_biotype}";', f'; biotype "{new_biotype}";', transcript_line) + + +def merge_finalise_output_files( + final_annotation_dir: Path, region_annotation_dir: Path, extension: str, id_label: str +) -> None: + """Merge the resulting gene models from all the seq regions for the specified extension + + Args: + final_annotation_dir (Path): Output directory + region_annotation_dir (Path): Input directory + extension (str): File extension + id_label (str): Type of data or process to merge + """ + gtf_files = list(region_annotation_dir.glob(f"*{extension}")) + merged_gtf_file = final_annotation_dir / f"{id_label}_sel.gtf" + merged_cdna_file = final_annotation_dir / f"{id_label}_sel.cdna.fa" + merged_amino_acid_file = final_annotation_dir / f"{id_label}_sel.prot.fa" + + # The below is not great, it's a bit messy because there might be + # some cases where there aren't translations. So it's not as + # straightforward as reading the records across all three files + # in parallel. The solution is to just load the seqs into + # memory and index them on the current header, which should + # correspond to a transcript/gene id in the GTF. When writing the + # results into the single merged files the ids will be updated to + # be unique and consistent across the header, three file types + + gene_id_counter = 0 + transcript_id_counter = 0 + + with merged_gtf_file.open("w") as gtf_out, merged_cdna_file.open( + "w" + ) as cdna_out, merged_amino_acid_file.open("w") as amino_acid_out: + for gtf_file in gtf_files: + logger.info("GTF file: %s", gtf_file) + cdna_seq_index = {} + amino_acid_seq_index = {} + cdna_file = gtf_file.with_suffix(".cdna") + amino_acid_file = gtf_file.with_suffix(".prot") + + # Check that the files exist and are not empty + if (not Path(cdna_file).is_file() or Path(cdna_file).stat().st_size == 0) and ( + Path(amino_acid_file).is_file() or Path(amino_acid_file).stat().st_size == 0 + ): + logger.error( + "DNA file %s or amino acid file %s \ + does not exist or is empty.", + cdna_file, + amino_acid_file, + ) + else: + # Open and process the files + with open(cdna_file) as cdna_in, open(amino_acid_file) as amino_acid_in: + cdna_seq_index = fasta_to_dict(cdna_in.readlines()) + amino_acid_seq_index = fasta_to_dict(amino_acid_in.readlines()) + + current_gene_id = "" + with gtf_file.open() as gtf_in: + for line in gtf_in: + if re.search(r"^#", line): + # if line.startswith("#"): + continue + eles = line.split("\t") + if len(eles) != 9: + continue + + match = re.search(r'gene_id "([^"]+)".+transcript_id "([^"]+)"', line) + gene_id = "" + transcript_id = "" + if match and eles[2] == "transcript": + transcript_id_counter += 1 + gene_id = match.group(1) + transcript_id = match.group(2) + + if not current_gene_id: + gene_id_counter += 1 + current_gene_id = gene_id + + if gene_id != current_gene_id: + gene_id_counter += 1 + current_gene_id = gene_id + new_gene_id = f"{id_label}_{gene_id_counter}" + new_transcript_id = f"{id_label}_{transcript_id_counter}" + + line = re.sub(f'gene_id "{gene_id}"', f'gene_id "{new_gene_id}"', line) + line = re.sub( + f'transcript_id "{transcript_id}"', f'transcript_id "{new_transcript_id}"', line + ) + gtf_out.write(line) + if eles[2] == "transcript": + new_header = f">{new_transcript_id}\n" + cdna_out.write(new_header + cdna_seq_index[transcript_id]) + if transcript_id in amino_acid_seq_index: + amino_acid_out.write(new_header + amino_acid_seq_index[transcript_id]) + + +def copy_raw_files(raw_file: Path, destination_file: Path) -> None: + """Copy file in a new destination + + Args: + raw_file (Path): File to copy + destination_file (Path): New destination + """ + if raw_file.exists(): + try: + shutil.copy(raw_file, destination_file) + logger.info("Copied %s to %s", raw_file, destination_file) + except Exception as e: # pylint:disable=broad-exception-caught + logger.error("Failed to copy %s to %s: %s", raw_file, destination_file, e) + else: + logger.info("No file found at %s, skipping", raw_file) + + +def parse_args(): + """Parse command line arguments.""" + parser = argparse.ArgumentParser(description="Genblast arguments") + + parser.add_argument("--main_output_dir", required=True, help="Output directory path") + parser.add_argument("--genome_file", required=True, help="Path for the genome file") + parser.add_argument("--seq_region_names", type=List[str], help="List of seq region names") + parser.add_argument("--diamond_validation_db", required=True, help="Diamond validation db file path") + parser.add_argument( + "--validation_type", type=int, choices=["relaxed", "moderate"], help="Type of validation" + ) + parser.add_argument( + "--diamond_bin", + help="DIAMOND executable path", + ) + parser.add_argument( + "--cpc2_bin", + help="CPC2 executable path", + ) + parser.add_argument( + "--rnasamba_bin", + help="RNAsamba executable path", + ) + parser.add_argument( + "--rnasamba_weights", + help="Rnasamba weights path", + ) + parser.add_argument("--num_threads", type=int, default=1, help="Number of threads") + parser.add_argument( + "--min_single_exon_pep_length", type=int, default=100, help="Minimum peptide length for single exon." + ) + parser.add_argument( + "--min_multi_exon_pep_length", type=int, default=75, help="Minimum peptide length for multiple exons." + ) + parser.add_argument( + "--min_single_source_probability", + type=float, + default=0.8, + help="Minimum probability for single source.", + ) + parser.add_argument( + "--min_single_exon_probability", + type=float, + default=0.9, + help="Minimum average probability for single exon.", + ) + return parser.parse_args() + + +def main(): + """Finalisation's entry-point.""" + args = parse_args() + + log_file_path = create_dir(args.output_dir, "log") / "finalisation.log" + loginipath = Path(__file__).parents[6] / "conf" / "logging.conf" + + logging.config.fileConfig( + loginipath, + defaults={"logfilename": str(log_file_path)}, + disable_existing_loggers=False, + ) + + run_finalise_geneset( + Path(args.main_output_dir), + Path(args.genome_file), + args.seq_region_names, + Path(args.diamond_validation_db), + args.validation_type, + args.num_threads, + Path(args.cpc2_bin), + Path(args.rnasamba_bin), + Path(args.rnasamba_weights), + Path(args.diamond_bin), + args.min_single_exon_pep_length, + args.min_multi_exon_pep_length, + args.min_single_source_probability, + args.min_single_exon_probability, + ) + + +if __name__ == "__main__": + main() diff --git a/src/python/ensembl/tools/anno/finalise_genset/finalise_geneset_utils.py b/src/python/ensembl/tools/anno/finalise_genset/finalise_geneset_utils.py deleted file mode 100644 index 989da6b..0000000 --- a/src/python/ensembl/tools/anno/finalise_genset/finalise_geneset_utils.py +++ /dev/null @@ -1,735 +0,0 @@ -# See the NOTICE file distributed with this work for additional information -# regarding copyright ownership. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -def run_finalise_geneset( - main_script_dir, - main_output_dir, - genome_file, - seq_region_names, - validation_type, - diamond_validation_db, - num_threads, -): - - if validation_type is None: - logger.info("Setting validation type to relaxed") - else: - logger.info("Setting validation type to " + validation_type) - - final_annotation_dir = utils.create_dir(main_output_dir, "annotation_output") - region_annotation_dir = utils.create_dir(final_annotation_dir, "initial_region_gtfs") - final_region_annotation_dir = utils.create_dir( - final_annotation_dir, "final_region_gtfs" - ) - utr_region_annotation_dir = utils.create_dir(final_annotation_dir, "utr_region_gtfs") - validation_dir = utils.create_dir(final_annotation_dir, "cds_validation") - seq_region_lengths = utils.get_seq_region_lengths(genome_file, 0) - - logger.info("Skip analysis if the gtf file already exists") - output_file = os.path.join(final_annotation_dir, "annotation.gtf") - if os.path.exists(output_file): - logger.info("final_annotation_dir exists") - transcript_count = utils.check_gtf_content(output_file, "transcript") - if transcript_count > 0: - logger.info("Final gtf file exists") - return - else: - logger.info("No gtf file, go on with the analysis") - # This used to be a list of output dirs and a loop which was neat, - # I'm coverting to a list of conditions as - # it's more straightforward with the renaming - # and having to merge scallop and stringtie - protein_annotation_raw = os.path.join( - main_output_dir, "genblast_output", "annotation.gtf" - ) - minimap2_annotation_raw = os.path.join( - main_output_dir, "minimap2_output", "annotation.gtf" - ) - stringtie_annotation_raw = os.path.join( - main_output_dir, "stringtie_output", "annotation.gtf" - ) - scallop_annotation_raw = os.path.join( - main_output_dir, "scallop_output", "annotation.gtf" - ) - busco_annotation_raw = os.path.join(main_output_dir, "busco_output", "annotation.gtf") - - transcript_selector_script = os.path.join( - main_script_dir, "support_scripts_perl", "select_best_transcripts.pl" - ) - finalise_geneset_script = os.path.join( - main_script_dir, "support_scripts_perl", "finalise_geneset.pl" - ) - clean_geneset_script = os.path.join( - main_script_dir, "support_scripts_perl", "clean_geneset.pl" - ) - clean_utrs_script = os.path.join( - main_script_dir, "support_scripts_perl", "clean_utrs_and_lncRNAs.pl" - ) - gtf_to_seq_script = os.path.join( - main_script_dir, "support_scripts_perl", "gtf_to_seq.pl" - ) - - transcriptomic_annotation_raw = os.path.join( - final_annotation_dir, "transcriptomic_raw.gtf" - ) - file_out = open(transcriptomic_annotation_raw, "w+") - for transcriptomic_file in [ - minimap2_annotation_raw, - scallop_annotation_raw, - stringtie_annotation_raw, - ]: - - if not os.path.exists(transcriptomic_file): - logger.info( - "No annotation.gtf file found in " + transcriptomic_file + ", skipping" - ) - continue - - file_in = open(transcriptomic_file) - line = file_in.readline() - while line: - print(line.rstrip(), file=file_out) - line = file_in.readline() - file_in.close() - file_out.close() - - # Copy the raw files into the annotation dir, this is not needed - # as such, but collecting them in one place and relabelling is - # helpful for a user - if os.path.exists(busco_annotation_raw): - subprocess.run( - [ - "cp", - busco_annotation_raw, - os.path.join(final_annotation_dir, "busco_raw.gtf"), - ] - ) - - if os.path.exists(protein_annotation_raw): - subprocess.run( - [ - "cp", - protein_annotation_raw, - os.path.join(final_annotation_dir, "protein_raw.gtf"), - ] - ) - - gtf_files = ["transcriptomic_raw.gtf", "protein_raw.gtf", "busco_raw.gtf"] - generic_select_cmd = [ - "perl", - transcript_selector_script, - "-genome_file", - genome_file, - ] - pool = multiprocessing.Pool(int(num_threads)) - for seq_region_name in seq_region_names: - # The selection script needs different params depending on - # whether the seqs are from transcriptomic data or not - region_details = ( - seq_region_name + ".rs1" + ".re" + str(seq_region_lengths[seq_region_name]) - ) - transcriptomic_region_gtf_path = os.path.join( - region_annotation_dir, (region_details + ".trans.gtf") - ) - busco_region_gtf_path = os.path.join( - region_annotation_dir, (region_details + ".busco.gtf") - ) - protein_region_gtf_path = os.path.join( - region_annotation_dir, (region_details + ".protein.gtf") - ) - - if os.path.exists(transcriptomic_annotation_raw): - logger.info("Finalising transcriptomic data for: " + seq_region_name) - transcriptomic_annotation_select = re.sub( - "_raw.gtf", "_sel.gtf", transcriptomic_annotation_raw - ) - cmd = generic_select_cmd.copy() - cmd.extend( - [ - "-region_details", - region_details, - "-input_gtf_file", - transcriptomic_annotation_raw, - "-output_gtf_file", - transcriptomic_region_gtf_path, - "-cds_search", - "-final_biotype", - "transcriptomic", - ] - ) - pool.apply_async(multiprocess_finalise_geneset, args=(cmd,)) - - if os.path.exists(busco_annotation_raw): - logger.info("Finalising BUSCO data for: " + seq_region_name) - busco_annotation_select = re.sub("_raw.gtf", "_sel.gtf", busco_annotation_raw) - cmd = generic_select_cmd.copy() - cmd.extend( - [ - "-region_details", - region_details, - "-input_gtf_file", - busco_annotation_raw, - "-output_gtf_file", - busco_region_gtf_path, - "-all_cds_exons", - "-final_biotype", - "busco", - ] - ) - pool.apply_async(multiprocess_finalise_geneset, args=(cmd,)) - - if os.path.exists(protein_annotation_raw): - logger.info("Finalising protein data for: " + seq_region_name) - protein_annotation_select = re.sub( - "_raw.gtf", "_sel.gtf", protein_annotation_raw - ) - cmd = generic_select_cmd.copy() - cmd.extend( - [ - "-region_details", - region_details, - "-input_gtf_file", - protein_annotation_raw, - "-output_gtf_file", - protein_region_gtf_path, - "-clean_transcripts", - "-all_cds_exons", - "-final_biotype", - "protein", - ] - ) - pool.apply_async(multiprocess_finalise_geneset, args=(cmd,)) - - pool.close() - pool.join() - - # At this point we will have the region files for all the, - merge_finalise_output_files( - final_annotation_dir, - region_annotation_dir, - ".trans.gtf", - "transcriptomic", - ) - merge_finalise_output_files( - final_annotation_dir, region_annotation_dir, ".busco.gtf", "busco" - ) - merge_finalise_output_files( - final_annotation_dir, region_annotation_dir, ".protein.gtf", "protein" - ) - - # Create a single GTF file with all the selected transcripts - # now that they have proper ids - fully_merged_gtf_path = os.path.join( - final_annotation_dir, "all_selected_transcripts.gtf" - ) - fully_merged_gtf_out = open(fully_merged_gtf_path, "w+") - - merge_gtf_cmd = ["cat"] - merge_gtf_cmd.extend(glob.glob(final_annotation_dir + "/*_sel.gtf")) - subprocess.run(merge_gtf_cmd, stdout=fully_merged_gtf_out) - fully_merged_gtf_out.close() - - # Now collapse the gene set - generic_finalise_cmd = [ - "perl", - finalise_geneset_script, - "-genome_file", - genome_file, - ] - - pool = multiprocessing.Pool(int(num_threads)) - for seq_region_name in seq_region_names: - region_details = ( - seq_region_name + ".rs1" + ".re" + str(seq_region_lengths[seq_region_name]) - ) - final_region_gtf_path = os.path.join( - final_region_annotation_dir, (region_details + ".final.gtf") - ) - - cmd = generic_finalise_cmd.copy() - cmd.extend( - [ - "-region_details", - region_details, - "-input_gtf_file", - fully_merged_gtf_path, - "-output_gtf_file", - final_region_gtf_path, - ] - ) - pool.apply_async(multiprocess_finalise_geneset, args=(cmd,)) - - pool.close() - pool.join() - - merge_finalise_output_files( - final_annotation_dir, - final_region_annotation_dir, - ".final.gtf", - "prevalidation", - ) - merged_gtf_file = os.path.join(final_annotation_dir, ("prevalidation_sel.gtf")) - merged_cdna_file = os.path.join(final_annotation_dir, ("prevalidation_sel.cdna.fa")) - merged_amino_acid_file = os.path.join( - final_annotation_dir, ("prevalidation_sel.prot.fa") - ) - updated_gtf_lines = validate_coding_transcripts( - merged_cdna_file, - merged_amino_acid_file, - validation_dir, - validation_type, - diamond_validation_db, - merged_gtf_file, - num_threads, - ) - postvalidation_gtf_file = os.path.join(final_annotation_dir, ("postvalidation.gtf")) - file_out = open(postvalidation_gtf_file, "w+") - for line in updated_gtf_lines: - file_out.write(line) - file_out.close() - - cleaned_initial_gtf_file = os.path.join(final_annotation_dir, ("cleaned_pre_utr.gtf")) - cleaned_utr_gtf_file = os.path.join(final_annotation_dir, ("annotation.gtf")) - - logger.info("Cleaning initial set") - cleaning_cmd = [ - "perl", - clean_geneset_script, - "-genome_file", - genome_file, - "-gtf_file", - postvalidation_gtf_file, - "-output_gtf_file", - cleaned_initial_gtf_file, - ] - logger.info(" ".join(cleaning_cmd)) - subprocess.run(cleaning_cmd) - - # Clean UTRs - generic_clean_utrs_cmd = [ - "perl", - clean_utrs_script, - "-genome_file", - genome_file, - "-input_gtf_file", - cleaned_initial_gtf_file, - ] - pool = multiprocessing.Pool(int(num_threads)) - for seq_region_name in seq_region_names: - region_details = ( - seq_region_name + ".rs1" + ".re" + str(seq_region_lengths[seq_region_name]) - ) - utr_region_gtf_path = os.path.join( - utr_region_annotation_dir, (region_details + ".utr.gtf") - ) - - cmd = generic_clean_utrs_cmd.copy() - cmd.extend( - [ - "-region_details", - region_details, - "-input_gtf_file", - cleaned_initial_gtf_file, - "-output_gtf_file", - utr_region_gtf_path, - ] - ) - pool.apply_async(multiprocess_generic, args=(cmd,)) - pool.close() - pool.join() - - merge_finalise_output_files( - final_annotation_dir, - utr_region_annotation_dir, - ".utr.gtf", - "annotation", - ) - subprocess.run( - [ - "mv", - os.path.join(final_annotation_dir, "annotation_sel.gtf"), - cleaned_utr_gtf_file, - ] - ) - - logger.info("Dumping transcript and translation sequences") - dumping_cmd = [ - "perl", - gtf_to_seq_script, - "-genome_file", - genome_file, - "-gtf_file", - cleaned_utr_gtf_file, - ] - logger.info(" ".join(dumping_cmd)) - subprocess.run(dumping_cmd) - - logger.info("Finished creating gene set") - - -def validate_coding_transcripts( - cdna_file, - amino_acid_file, - validation_dir, - validation_type, - diamond_validation_db, - gtf_file, - num_threads, -): - - logger.info("Running CDS validation with RNAsamba and CPC2") - rnasamba_weights = config["rnasamba"]["weights"] - rnasamba_output_path = os.path.join(validation_dir, "rnasamba.tsv.txt") - cpc2_output_path = os.path.join(validation_dir, "cpc2.tsv") - rnasamba_volume = validation_dir + "/:/app:rw" - rnasamba_cmd = [ - "singularity", - "exec", - "--bind", - rnasamba_volume, - config["rnasamba"]["software"], - "rnasamba", - "classify", - rnasamba_output_path, - cdna_file, - rnasamba_weights, - ] - logger.info(" ".join(rnasamba_cmd)) - subprocess.run(rnasamba_cmd) - cpc2_volume = validation_dir + "/:/app:rw" - cpc2_cmd = [ - "singularity", - "exec", - "--bind", - cpc2_volume, - config["cpc2"]["software"], - "python3", - "/CPC2_standalone-1.0.1/bin/CPC2.py", - "-i", - cdna_file, - "--ORF", - "-o", - cpc2_output_path, - ] - logger.info(" ".join(cpc2_cmd)) - subprocess.run(cpc2_cmd) - cpc2_output_path = cpc2_output_path + ".txt" - - check_file(rnasamba_output_path) - check_file(cpc2_output_path) - - logger.info("diamond validation") - diamond_results = None - if diamond_validation_db is not None: - diamond_output_dir = utils.create_dir(validation_dir, "diamond_output") - diamond_validation( - diamond_validation_db, - amino_acid_file, - diamond_output_dir, - num_threads, - ) - diamond_results = read_diamond_results(diamond_output_dir) - - logger.info("read results") - rnasamba_results = read_rnasamba_results(rnasamba_output_path) - cpc2_results = read_cpc2_results(cpc2_output_path) - combined_results = combine_results(rnasamba_results, cpc2_results, diamond_results) - logger.info("read gtf genes") - parsed_gtf_genes = read_gtf_genes(gtf_file) - updated_gtf_lines = update_gtf_genes( - parsed_gtf_genes, combined_results, validation_type - ) - - return updated_gtf_lines - - -def diamond_validation( - diamond_validation_db, amino_acid_file, diamond_output_dir, num_threads -): - - batched_protein_files = split_protein_file(amino_acid_file, diamond_output_dir, 100) - - pool = multiprocessing.Pool(int(num_threads)) - for batched_protein_file in batched_protein_files: - pool.apply_async( - multiprocess_diamond, - args=( - batched_protein_file, - diamond_output_dir, - diamond_validation_db, - ), - ) - pool.close() - pool.join() - - -def multiprocess_diamond( - batched_protein_file, - diamond_output_dir, - diamond_validation_db, -): - - batch_num = os.path.splitext(batched_protein_file)[0] - batch_dir = os.path.dirname(batched_protein_file) - diamond_output_file = batched_protein_file + ".dmdout" - logger.info("Running diamond on " + batched_protein_file + ":") - - diamond_cmd = [ - "diamond", - "blastp", - "--query", - batched_protein_file, - "--db", - diamond_validation_db, - "--out", - diamond_output_file, - ] - - logger.info(" ".join(diamond_cmd)) - subprocess.run(diamond_cmd) - subprocess.run(["mv", diamond_output_file, diamond_output_dir]) -def read_rnasamba_results(file_path): - - results = [] - - file_in = open(file_path) - line = file_in.readline() - while line: - line = line.rstrip() - match = re.search(r"^sequence_name", line) - if match: - line = file_in.readline() - continue - - eles = line.split("\t") - if not len(eles) == 3: - line = file_in.readline() - continue - - transcript_id = eles[0] - coding_probability = eles[1] - coding_potential = eles[2] - results.append([transcript_id, coding_probability, coding_potential]) - line = file_in.readline() - file_in.close() - - return results - - -def read_cpc2_results(file_path): - - results = [] - - file_in = open(file_path) - line = file_in.readline() - while line: - line = line.rstrip() - match = re.search(r"^#ID", line) - if match: - line = file_in.readline() - continue - - eles = line.split("\t") - if not len(eles) == 9: - line = file_in.readline() - continue - - transcript_id = eles[0] - transcript_length = eles[1] - peptide_length = eles[2] - coding_probability = eles[7] - coding_potential = eles[8] - results.append( - [ - transcript_id, - coding_probability, - coding_potential, - transcript_length, - peptide_length, - ] - ) - line = file_in.readline() - file_in.close() - - return results - - -def read_diamond_results(diamond_output_dir): - - results = [] - diamond_files = glob.glob(diamond_output_dir + "/*.dmdout") - for file_path in diamond_files: - file_in = open(file_path) - line = file_in.readline() - while line: - line = line.rstrip() - - eles = line.split("\t") - if not len(eles) == 12: - line = file_in.readline() - continue - - transcript_id = eles[0] - e_value = eles[10] - results.append([transcript_id, e_value]) - line = file_in.readline() - file_in.close() - - return results - - -def combine_results(rnasamba_results, cpc2_results, diamond_results): - - transcript_ids = {} - - for result in rnasamba_results: - transcript_id = result[0] - coding_probability = result[1] - coding_potential = result[2] - - if transcript_id not in transcript_ids: - transcript_ids[transcript_id] = [ - coding_probability, - coding_potential, - ] - - for result in cpc2_results: - transcript_id = result[0] - coding_probability = result[1] - coding_potential = result[2] - transcript_length = result[3] - peptide_length = result[4] - transcript_ids[transcript_id].extend( - [ - coding_probability, - coding_potential, - transcript_length, - peptide_length, - ] - ) - - if diamond_results is not None: - for result in diamond_results: - transcript_id = result[0] - e_value = result[1] - # There seems to be an issue where there are a small number - # of sequences that don't make it into the cpc2/rnasamba output - # Should code in a system for this, but it would be good to - # understand why it happens to begin with. Seems to be the same - # number of missing seqs in both, so maybe a shared cut-off - if transcript_id in transcript_ids: - transcript_ids[transcript_id].extend([e_value]) - - return transcript_ids - -def merge_finalise_output_files( - final_annotation_dir, region_annotation_dir, extension, id_label -): - - gtf_files = glob.glob(region_annotation_dir + "/*" + extension) - - merged_gtf_file = os.path.join(final_annotation_dir, (id_label + "_sel.gtf")) - merged_cdna_file = os.path.join(final_annotation_dir, (id_label + "_sel.cdna.fa")) - merged_amino_acid_file = os.path.join( - final_annotation_dir, (id_label + "_sel.prot.fa") - ) - - # The below is not great, it's a bit messy because there might be - # some cases where there aren't translations. So it's not as - # straightforward as reading the records across all three files - # in parallel. The solution is to just load the seqs into - # memory and index them on the current header, which should - # correspond to a transcript/gene id in the GTF. When writing the - # results into the single merged files the ids will be updated to - # be unique and consistent across the header, three file types - - gene_id_counter = 0 - transcript_id_counter = 0 - gtf_out = open(merged_gtf_file, "w+") - cdna_out = open(merged_cdna_file, "w+") - amino_acid_out = open(merged_amino_acid_file, "w+") - for gtf_file in gtf_files: - logger.info("GTF file: " + gtf_file) - cdna_seq_index = {} - amino_acid_seq_index = {} - cdna_file = gtf_file + ".cdna" - amino_acid_file = gtf_file + ".prot" - cdna_in = open(cdna_file) - amino_acid_in = open(amino_acid_file) - cdna_seq_index = fasta_to_dict(cdna_in.readlines()) - amino_acid_seq_index = fasta_to_dict(amino_acid_in.readlines()) - cdna_in.close() - amino_acid_in.close() - - current_gene_id = "" - gtf_in = open(gtf_file) - line = gtf_in.readline() - while line: - if re.search(r"^#", line): - line = gtf_in.readline() - continue - - eles = line.split("\t") - if not len(eles) == 9: - line = gtf_in.readline() - continue - - match = re.search(r'gene_id "([^"]+)".+transcript_id "([^"]+)"', line) - if match and eles[2] == "transcript": - transcript_id_counter += 1 - - gene_id = match.group(1) - transcript_id = match.group(2) - - if not current_gene_id: - gene_id_counter += 1 - current_gene_id = gene_id - - if not gene_id == current_gene_id: - gene_id_counter += 1 - current_gene_id = gene_id - - new_gene_id = id_label + "_" + str(gene_id_counter) - new_transcript_id = id_label + "_" + str(transcript_id_counter) - line = re.sub( - 'gene_id "' + gene_id + '"', - ('gene_id "' + new_gene_id + '"'), - line, - ) - line = re.sub( - 'transcript_id "' + transcript_id + '"', - ('transcript_id "' + new_transcript_id + '"'), - line, - ) - gtf_out.write(line) - line = gtf_in.readline() - - if eles[2] == "transcript": - new_header = ">" + new_transcript_id + "\n" - cdna_out.write(new_header + cdna_seq_index[transcript_id]) - - if transcript_id in amino_acid_seq_index: - amino_acid_out.write(new_header + amino_acid_seq_index[transcript_id]) - - gtf_out.close() - cdna_out.close() - amino_acid_out.close() - - -def multiprocess_finalise_geneset(cmd): - - print(" ".join(cmd)) - subprocess.run(cmd) - diff --git a/src/python/ensembl/tools/anno/finalise_genset/rnasamba.py b/src/python/ensembl/tools/anno/finalise_genset/rnasamba.py new file mode 100644 index 0000000..824bd63 --- /dev/null +++ b/src/python/ensembl/tools/anno/finalise_genset/rnasamba.py @@ -0,0 +1,165 @@ +# See the NOTICE file distributed with this work for additional information +# regarding copyright ownership. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Run RNA-Samba to predict the coding potential of transcripts. + +RNA-Samba is a machine learning-based tool for distinguishing coding +and non-coding RNA transcripts based on sequence features. + +Camargo, A. P., Sourkov, V., Pereira, G. A. G. & Carazzolle, M. F.. +"RNAsamba: neural network-based assessment of the protein-coding potential +of RNA sequences" NAR Genomics and Bioinformatics 2, lqz024 (2020). +""" +__all__ = ["run_rnasamba"] + +import argparse +import logging +import logging.config +from pathlib import Path +import re +from typing import List, Union + +from ensembl.tools.anno.utils._utils import ( + create_dir, + check_file, + run_command, +) + +logger = logging.getLogger(__name__) + + +def run_rnasamba( # pylint:disable=dangerous-default-value + validation_dir: Path, + cdna_file: Path, + rnasamba_weights: Path = Path( + "/nfs/production/flicek/ensembl/genebuild/genebuild_virtual_user/rnasamba_data/full_length_weights.hdf5" # pylint:disable=line-too-long + ), + rnasamba_bin: Path = Path( + "/hps/software/users/ensembl/genebuild/genebuild_virtual_user/singularity/rnasamba_latest.sif" + ), +) -> List[List[str]]: + """ + Runs the RNA-Samba classification tool within a Singularity container to + evaluate the coding potential of cDNA sequences. + + This function uses RNA-Samba to classify sequences in the provided cDNA file + and outputs results in the specified validation directory. + + :param validation_dir: Path to the directory where the RNA-Samba results will be stored. + :type validation_dir: Path + :param cdna_file: Path to the input cDNA file containing transcript sequences in FASTA format. + :type cdna_file: Path + :param rnasamba_weights: Path to the pre-trained RNA-Samba weights file. + :type rnasamba_weights: Path + :param rnasamba_bin: Path to the RNA-Samba software. + :type rnasamba_bin: Path + + :return: A list of RNA-Samba results, where each entry contains + - Transcript ID + - Coding probability + - Coding potential + :rtype: List[List[str]] + + """ + + rnasamba_output_path = validation_dir / "rnasamba.tsv.txt" + check_file(rnasamba_output_path) + rnasamba_output_path = validation_dir / "rnasamba.tsv.txt" + rnasamba_volume = f"{validation_dir}/:/app:rw" + rnasamba_cmd = [ + "singularity", + "exec", + "--bind", + str(rnasamba_volume), + str(rnasamba_bin), + "rnasamba", + "classify", + str(rnasamba_output_path), + str(cdna_file), + str(rnasamba_weights), + ] + + logger.info("Running Rnasamba command: %s", " ".join(map(str, rnasamba_cmd))) + run_command(rnasamba_cmd) + rnasamba_results = read_rnasamba_results(rnasamba_output_path) + return rnasamba_results + + +def read_rnasamba_results(file_path: Path) -> List[List[str]]: + """ + Reads RNA-Samba results from a file and returns a list of coding potential data. + + Args: + file_path (Path): Path to the RNA-Samba results file. + + Returns: + List[List[str]]: A list of records, each containing transcript ID, + coding probability, and coding potential. + """ + results = [] + + with file_path.open("r") as file_in: + for line in file_in: + line = line.rstrip() + if re.search(r"^sequence_name", line): + continue # Skip header lines + + eles = line.split("\t") + if len(eles) != 3: + continue # Skip malformed lines + + transcript_id, coding_probability, coding_potential = eles # Unpack all three elements + results.append([transcript_id, coding_probability, coding_potential]) + + return results + + +def parse_args(): + """Parse command line arguments.""" + parser = argparse.ArgumentParser(description="Rnasamba arguments") + parser.add_argument("--validation_dir", required=True, help="Validation directory path") + parser.add_argument("--cdna_file", required=True, help="Path for the cDNA fasta file") + + parser.add_argument( + "--rnasamba_weights", + help="Rnasamba weight path", + ) + parser.add_argument( + "--rnasamba_bin", + help="Rnasamba executable path", + ) + return parser.parse_args() + + +def main(): + """Rnasamba's entry-point.""" + args = parse_args() + + log_file_path = create_dir(args.output_dir, "log") / "rnasamba.log" + loginipath = Path(__file__).parents[6] / "conf" / "logging.conf" + + logging.config.fileConfig( + loginipath, + defaults={"logfilename": str(log_file_path)}, + disable_existing_loggers=False, + ) + + run_rnasamba( + Path(args.validation_dir), Path(args.cdna_file), Path(args.rnasamba_weights), Path(args.rnasamba_bin) + ) + + +if __name__ == "__main__": + main() diff --git a/src/python/ensembl/tools/anno/protein_annotation/genblast.py b/src/python/ensembl/tools/anno/protein_annotation/genblast.py index 8652807..5d0275c 100644 --- a/src/python/ensembl/tools/anno/protein_annotation/genblast.py +++ b/src/python/ensembl/tools/anno/protein_annotation/genblast.py @@ -38,18 +38,17 @@ import multiprocessing import os from pathlib import Path -import random import re import shutil import signal import subprocess -from typing import List import argparse from ensembl.tools.anno.utils._utils import ( check_exe, create_dir, check_gtf_content, + split_protein_file ) logger = logging.getLogger(__name__) @@ -65,7 +64,7 @@ def run_genblast(#pylint:disable=dangerous-default-value convert2blastmask_bin: Path = Path("convert2blastmask"), makeblastdb_bin: Path = Path("makeblastdb"), num_threads: int = 1, - protein_set: str = ["uniprot", "orthodb"], + protein_set: str = "uniprot", ) -> None: """ @@ -102,6 +101,8 @@ def run_genblast(#pylint:disable=dangerous-default-value check_exe(genblast_bin) check_exe(convert2blastmask_bin) check_exe(makeblastdb_bin) + if protein_set not in {"uniprot", "orthodb"}: + raise ValueError("protein_set must be either 'uniprot' or 'orthodb'") if protein_set == "uniprot": genblast_dir = create_dir(output_dir, "uniprot_output") elif protein_set == "orthodb": @@ -128,7 +129,7 @@ def run_genblast(#pylint:disable=dangerous-default-value else: _run_convert2blastmask(convert2blastmask_bin, masked_genome, asnb_file) _run_makeblastdb(makeblastdb_bin, masked_genome, asnb_file) - batched_protein_files = _split_protein_file( + batched_protein_files = split_protein_file( protein_dataset, genblast_dir, num_threads ) pool = multiprocessing.Pool(num_threads) # pylint:disable=consider-using-with @@ -286,56 +287,6 @@ def _generate_genblast_gtf(genblast_dir: Path) -> None: path.unlink() -def _split_protein_file( - protein_dataset: Path, output_dir: Path, batch_size: int = 20 -) -> List: - """ - The protein dataset file is splitted by a number of sequence equals to the batch_size - in batch files stored in 10 output directories. - protein_dataset : Path for the protein dataset. - output_dir : Output directory path. - batch_size : Size of the batch, it needs to be equals to the number of threads - to parallelise the sequence processing for each file. - """ - batched_protein_files = [] - - for i in range(0, 10): - create_dir(output_dir, (f"bin_{i}")) - with open(protein_dataset,"r", encoding="utf8") as file_in: - seq_count = 0 - batch_count = 0 - current_record = "" - initial_seq = True - for line in file_in: - match = re.search(r">(.+)$", line) - # match header and is not first sequence, if the number of stored sequences in each file equals - # the number of batch_size, a new file will be created and the current_record reset - if match and not initial_seq and seq_count % batch_size == 0: - bin_num = random.randint(0, 9) - batch_file = output_dir / f"bin_{bin_num}" / f"{batch_count}.fa" - with batch_file.open("w+") as file_out: - file_out.write(current_record) - batch_count += 1 - seq_count += 1 - current_record = line - batched_protein_files.append(batch_file) - # match header and is the first sequence - elif match: - current_record += line - initial_seq = False - seq_count += 1 - # other lines - else: - current_record += line - - if current_record: - bin_num = random.randint(0, 9) - batch_file = output_dir / f"bin_{bin_num}" / f"{batch_count}.fa" - with batch_file.open("w+") as file_out: - file_out.write(current_record) - batched_protein_files.append(batch_file) - return batched_protein_files - def _run_convert2blastmask( convert2blastmask_bin: Path, masked_genome: Path, asnb_file: Path diff --git a/src/python/ensembl/tools/anno/transcriptomic_annotation/star.py b/src/python/ensembl/tools/anno/transcriptomic_annotation/star.py old mode 100644 new mode 100755 index b9d7f42..d7eed34 --- a/src/python/ensembl/tools/anno/transcriptomic_annotation/star.py +++ b/src/python/ensembl/tools/anno/transcriptomic_annotation/star.py @@ -12,15 +12,14 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - """ The STAR (Spliced Transcripts Alignment to a Reference) alignment tool is widely used in genomics research for aligning RNA-seq data to a reference genome. + Dobin A, Davis CA, Schlesinger F, et al. STAR: ultrafast universal RNA-seq aligner. Bioinformatics. 2013;29(1):15-21. doi:10.1093/bioinformatics/bts635 """ - -__all__ = ["run_star"] +__all__ = ["run_star", "subsample_transcriptomic_data", "run_trimming"] import argparse import logging @@ -33,7 +32,7 @@ import re import shutil import subprocess -from typing import List +from typing import Dict, List from ensembl.tools.anno.utils._utils import ( check_exe, @@ -43,7 +42,7 @@ ) -def run_star( +def run_star( # pylint:disable=too-many-branches genome_file: Path, output_dir: Path, short_read_fastq_dir: Path, @@ -51,6 +50,11 @@ def run_star( trim_fastq: bool = False, max_reads_per_sample: int = 0, max_intron_length: int = 100000, + subsample_read_limit: int = 100000000, + subsample_percentage: float = 0.25, + sampling_via_read_limit: bool = False, + sampling_via_percentage: bool = False, + sampling_via_read_limit_percentage: bool = False, num_threads: int = 1, star_bin: Path = Path("star"), samtools_bin: Path = Path("samtools"), @@ -58,6 +62,7 @@ def run_star( ) -> None: """ Run STAR alignment on list of short read data. + :param genome_file: Genome file path. :type genome_file: Path :param output_dir: Working directory path. @@ -72,6 +77,16 @@ def run_star( :type max_reads_per_sample: int, default 0 :param max_intron_length: The maximum intron size for alignments. Defaults to 100000. :type max_intron_length: int, default 100000 + :param subsample_read_limit: Maximum number of reads to subsample. Defaults to 100000000. + :type subsample_read_limit:int, default 100000000, + :param subsample_percentage: Maximun percentage of reads to subsample. + :type subsample_percentage: int, default 0.25, + :param sampling_via_read_limit: subsample fastq files using subsample_read_limit. + :type sampling_via_read_limit: boolean, False, + :param sampling_via_percentage: subsample fastq files using subsample_percentage. + :type sampling_via_percentage: boolean, False, + :param sampling_via_read_limit_percentage: use max read limit and percentage value. + :type sampling_via_read_limit_percentage: boolean, False, :param num_threads: Number of available threads. :type num_threads: int, default 1 :param star_bin: Software path. @@ -84,6 +99,7 @@ def run_star( :return: None :rtype: None """ + check_exe(star_bin) # If trimming has been enabled then switch the path for # short_read_fastq_dir from the original location to the trimmed fastq dir @@ -107,7 +123,7 @@ def run_star( return star_index_file = star_dir / "SAindex" - fastq_file_list = [] + fastq_file_list: List[Path] = [] file_types = ("*.fastq", "*.fq", "*.fastq.gz", "*.fq.gz") fastq_file_list = [ path for file_type in file_types for path in Path(short_read_fastq_dir).rglob(file_type) @@ -119,14 +135,26 @@ def run_star( # fastq_file_list.extend(glob.glob(os.path.join(short_read_fastq_dir, file_type))) # Get list of paired paths - fastq_file_list = _create_paired_paths(fastq_file_list) + paired_fastq_file_list = _create_paired_paths(fastq_file_list) # Subsamples in parallel if there's a value set if max_reads_per_sample: - subsample_transcriptomic_data(fastq_file_list) + paired_fastq_file_list_sub: List[List[Path]] = [] + for paired_fastq_files in paired_fastq_file_list: + paired_fastq_files_sub = subsample_transcriptomic_data( + paired_fastq_files, + subsample_read_limit, + subsample_percentage, + sampling_via_read_limit, + sampling_via_percentage, + sampling_via_read_limit_percentage, + num_threads, + ) + paired_fastq_file_list_sub.append(paired_fastq_files_sub) + paired_fastq_file_list = paired_fastq_file_list_sub # Get the list of the new subsampled files - fastq_file_list = [ - path for file_type in file_types for path in Path(short_read_fastq_dir).rglob(file_type) - ] + # fastq_file_list = [ + # path for file_type in file_types for path in Path(short_read_fastq_dir).rglob(file_type) + # ] # I don't think is needed # fastq_file_list = check_for_fastq_subsamples(fastq_file_list) @@ -159,23 +187,27 @@ def run_star( str(genome_file), ] ) - except Exception as e:#pylint:disable=broad-exception-caught + except Exception as e: # pylint:disable=broad-exception-caught logging.error("An error occurred while creating star index: %s", e) logging.info("Running Star on the files in the fastq dir") - for fastq_file in fastq_file_list: + for paired_files in paired_fastq_file_list: + first_file_name = paired_files[0].name # Get the name of the first file + match = re.search(r"(.+)_\d+\.(fastq|fq)", first_file_name) # Search for pattern + if match: + first_part_of_name = match.group(1) # logger.info(fastq_file_path) # fastq_file_name = os.path.basename(fastq_file_path) star_tmp_dir = star_dir / "tmp" if star_tmp_dir.exists(): shutil.rmtree(star_tmp_dir) - sam_file = Path(f"{star_dir}/{fastq_file.name}.sam") - junctions_file = Path(f"{star_dir}/{fastq_file.name}.sj.tab") + sam_file = Path(f"{star_dir}/{first_part_of_name}.sam") + junctions_file = Path(f"{star_dir}/{first_part_of_name}.sj.tab") sam_file_name = sam_file.name sam_temp_file = Path(f"{star_dir}/{sam_file_name}.tmp") bam_file = re.sub(".sam", ".bam", sam_file_name) bam_sort_file = Path(f"{star_dir}/{bam_file}") - log_out_file = Path(f"{star_dir}/{fastq_file.name}.Log.final.out") + log_out_file = Path(f"{star_dir}/{first_part_of_name}.Log.final.out") if log_out_file.exists() and bam_sort_file.exists() and bam_sort_file.stat().st_size != 0: logging.info( "Found an existing bam file for the fastq file, \ @@ -183,7 +215,9 @@ def run_star( ) continue - logging.info("Processing %s", fastq_file) + read_files_in = ",".join(str(fastq_file) for fastq_file in paired_files) + + logging.info("Processing %s", list(paired_files)) star_command = [ str(star_bin), "--outFilterIntronMotifs", @@ -199,7 +233,7 @@ def run_star( "--genomeDir", str(star_dir), "--readFilesIn", - str(fastq_file), + read_files_in, "--outFileNamePrefix", f"{star_dir}/", "--outTmpDir", @@ -212,7 +246,7 @@ def run_star( #'--outSJfilterIntronMaxVsReadN','5000','10000','25000','40000', #'50000','50000','50000','50000','50000','100000'] # check_compression = re.search(r".gz$", fastq_file) - if fastq_file.suffix.endswith(".gz"): + if Path(paired_files[0].name).suffix.endswith(".gz"): star_command.append("--readFilesCommand") star_command.append("gunzip") star_command.append("-c") @@ -238,7 +272,7 @@ def run_star( logging.info("Completed running STAR") -def _create_paired_paths(fastq_file_paths: List) -> List[Path]: +def _create_paired_paths(fastq_file_paths: List[Path]) -> List[List[Path]]: """ Create list of paired transcriptomic fastq files @@ -246,30 +280,28 @@ def _create_paired_paths(fastq_file_paths: List) -> List[Path]: fastq_file_paths (List): List of transcriptomic file paths. Returns: - List: List of paired transcriptomic files + List[List[Path]]: List of paired transcriptomic files """ - path_dict = {} - # final_list = [] + path_dict: Dict[str, List[Path]] = {} + final_list: List[List[Path]] = [] for fastq_file in fastq_file_paths: paired_name = re.search(r"(.+)_\d+\.(fastq|fq)", str(fastq_file)) if not paired_name: logging.exception( "Could not find _1 or _2 at the end of the prefix \ for file. Assuming file is not paired: %s", - fastq_file, + str(fastq_file), ) - # final_list.append([fastq_file]) - path_dict[fastq_file] = [fastq_file] + final_list.append([fastq_file]) continue run_accession = paired_name.group(1) if run_accession in path_dict: path_dict[run_accession].append(fastq_file) else: path_dict[run_accession] = [fastq_file] - # for pair in path_dict: - # final_list.append(path_dict[pair]) - logging.info([value for values_list in path_dict.values() for value in values_list]) - return [value for values_list in path_dict.values() for value in values_list] + for pair in path_dict: # pylint:disable=consider-using-dict-items + final_list.append(path_dict[pair]) + return final_list # pylint:disable=pointless-string-statement @@ -279,22 +311,108 @@ def _create_paired_paths(fastq_file_paths: List) -> List[Path]: """ -def _subsample_paired_fastq_files( +def subsample_transcriptomic_data( + fastq_file_list: List[Path], + subsample_read_limit: int = 100000000, + subsample_percentage: float = 0.25, + sampling_via_read_limit: bool = False, + sampling_via_percentage: bool = False, + sampling_via_read_limit_percentage: bool = True, + num_threads: int = 2, +) -> List[Path]: + """ + Subsample list of paired files. + Args: + fastq_file_list : Subsample paired fastq files. + subsample_read_limit : Maximum number of reads to subsample, default to 100000000. + subsample_percentage : Maximun percentage of reads to subsample, default to 0.25. + sampling_via_read_limit : If True will subsample an input dataset of fastq files \ + using --subsample_read_limit value. + sampling_via_percentage : If True will subsample an input dataset of fastq files \ + using --subsample_percentage value. + sampling_via_read_limit_percentage : If True will subsample an input dataset of \ + fastq files using --subsample_read_limit and --subsample_percentage value; \ + the lowest number of reads is taken. + num_threads : number of threads + Returns: + List[Path]: List of subsampled paired transcriptomic files + """ + subsampled_fastq_files: List[Path] = [] + # for fastq_files in fastq_file_list: + # fastq_file_1, fastq_file_2 = fastq_files + # fastq_file_pair = "" + # if len(fastq_files) == 2: + # fastq_file_pair = fastq_files[1] + + if len(fastq_file_list) == 1: + fastq_file_1 = fastq_file_list[0] + if Path(f"{fastq_file_1}.sub").exists(): + logging.info( + "Found an existing .sub file on the fastq path, will use that \ + instead. File:%s.sub", + fastq_file_1, + ) + else: + _subsample_paired_fastq_files( + fastq_file_list, + subsample_read_limit, + subsample_percentage, + sampling_via_read_limit, + sampling_via_percentage, + sampling_via_read_limit_percentage, + num_threads, + ) + subsampled_fastq_files = [Path(f"{fastq_file_1}.sub")] + if len(fastq_file_list) == 2: + fastq_file_1, fastq_file_2 = fastq_file_list + if Path(f"{fastq_file_1}.sub").exists() and Path(f"{fastq_file_2}.sub").exists(): + logging.info( + "Found an existing .sub files on the fastq path for both members of the pair, will use \ + those instead of subsampling again. Files: %s.sub,%s.sub", + fastq_file_1, + fastq_file_2, + ) + else: + _subsample_paired_fastq_files( + fastq_file_list, + subsample_read_limit, + subsample_percentage, + sampling_via_read_limit, + sampling_via_percentage, + sampling_via_read_limit_percentage, + num_threads, + ) + subsampled_fastq_files = [Path(f"{fastq_file_1}.sub"), Path(f"{fastq_file_2}.sub")] + return subsampled_fastq_files + + +def _subsample_paired_fastq_files( # pylint:disable=too-many-branches fastq_files: List[Path], subsample_read_limit: int = 100000000, + subsample_percentage: float = 0.25, + sampling_via_read_limit: bool = False, + sampling_via_percentage: bool = False, + sampling_via_read_limit_percentage: bool = True, num_threads: int = 2, - compressed: bool = False, ) -> None: """ Perform subsampling on two paired FastQ files in parallel using multiple threads. Args: fastq_files : Path for paired fastq files. - output_files : Path for the output file. - subsample_read_limit : Subsample size, defaults to 100000000. + subsample_read_limit : Maximum number of reads to subsample, default to 100000000. + subsample_percentage : Maximun percentage of reads to subsample, default to 0.25. + sampling_via_read_limit : If True will subsample an input dataset of fastq files \ + using --subsample_read_limit value. + sampling_via_percentage : If True will subsample an input dataset of fastq files \ + using --subsample_percentage value. + sampling_via_read_limit_percentage : If True will subsample an input dataset of \ + fastq files using --subsample_read_limit and --subsample_percentage value; \ + the lowest number of reads is taken. num_threads : Number of threads, defaults to 2. compressed : file compressed, defaults to False. """ + if len(fastq_files) == 2: fastq_file_1, fastq_file_2 = fastq_files output_file_1, output_file_2 = [Path(f"{fastq_file_1}.sub"), Path(f"{fastq_file_2}.sub")] @@ -304,46 +422,72 @@ def _subsample_paired_fastq_files( else: raise FileNotFoundError("No fastq file found") - if fastq_file_1.suffix.endswith(".gz$"): + if fastq_file_1.suffix.endswith(".gz"): compressed = True num_lines = sum(1 for line in gzip.open(fastq_file_1)) # pylint:disable=consider-using-with else: + compressed = False num_lines = sum(1 for line in open(fastq_file_1)) # pylint:disable=consider-using-with range_limit = int(num_lines / 4) - if range_limit <= subsample_read_limit: + sampling_size = 0 + if sampling_via_read_limit and subsample_read_limit: + sampling_size = subsample_read_limit + elif sampling_via_percentage and subsample_percentage: + sampling_size = round(range_limit * subsample_percentage) + elif sampling_via_read_limit_percentage and subsample_percentage and subsample_read_limit: + sampling_size = min(subsample_read_limit, round(range_limit * subsample_percentage)) + + if range_limit <= sampling_size: logging.info( "Number of reads (%s is less than the max allowed read count (%s), \ no need to subsample", str(range_limit), - str(subsample_read_limit), + str(sampling_size), ) return - rand_list = random.sample(range(0, range_limit - 1), subsample_read_limit) - random_indices = {idx * 4: 1 for idx in rand_list} + rand_list = random.sample(range(0, range_limit-1), sampling_size) + random_indices = [idx * 4 for idx in rand_list] logging.info("Processing paired files in parallel") - pool = multiprocessing.Pool(int(num_threads)) # pylint:disable=consider-using-with - pool.apply_async( - _subsample_fastq_subset, - args=( + if num_threads >= 2: + pool = multiprocessing.Pool(int(num_threads)) # pylint:disable=consider-using-with + pool.apply_async( + _subsample_fastq_subset, + args=( + fastq_file_1, + output_file_1, + random_indices, + compressed, + ), + ) + if len(fastq_files) == 2: + pool.apply_async( + _subsample_fastq_subset, + args=( + fastq_file_2, + output_file_2, + random_indices, + compressed, + ), + ) + + pool.close() + pool.join() + else: + _subsample_fastq_subset( fastq_file_1, output_file_1, random_indices, compressed, - ), - ) - pool.apply_async( - _subsample_fastq_subset, - args=( - fastq_file_2, - output_file_2, - random_indices, - compressed, - ), - ) - pool.close() - pool.join() + ) + if len(fastq_files) == 2: + _subsample_fastq_subset( + fastq_file_2, + output_file_2, + random_indices, + compressed, + ) def _subsample_fastq_subset( @@ -351,7 +495,6 @@ def _subsample_fastq_subset( ) -> None: """ Selecting specific sets of four lines from an input FastQ file and writing them to an output file. - Args: fastq_file : Path for the fastq file. output_file : Path for the output file. @@ -359,6 +502,29 @@ def _subsample_fastq_subset( compressed : the files is compressed """ line_index = 0 + read_block = [] + with gzip.open(fastq_file, "rt") if compressed else open(fastq_file) as file_in, open( + output_file, "w+" + ) as file_out: + for line in file_in: + read_block.append(line) + if len(read_block) == 4: + if line_index in random_indices: + file_out.writelines(read_block) + read_block = [] + line_index += 4 + #lines = [file_in.readline() for _ in range(4)] + """ + while lines[3]: + #lines = [file_in.readline() for _ in range(4)] + # Break if the end of the file is reached + if len(lines) < 4 : # No more lines to read + break + # Write to output if current index is in random_indices + if line_index in random_indices: + file_out.writelines(lines) + line_index += 4 + lines = [file_in.readline() for _ in range(4)] with gzip.open(fastq_file, "rt") if compressed else open(fastq_file) as file_in, open( output_file, "w+" @@ -369,44 +535,7 @@ def _subsample_fastq_subset( file_out.writelines(lines) line_index += 4 lines = [file_in.readline() for _ in range(4)] - - -def subsample_transcriptomic_data(fastq_file_list: List, num_threads: int = 2) -> None: """ - Subsample paired fastq files. - - Args: - fastq_file_list : List of fastq file path to process. - num_threads : number of threads - """ - for fastq_files in fastq_file_list: - fastq_file_1, fastq_file_2 = fastq_files - # fastq_file_pair = "" - # if len(fastq_files) == 2: - # fastq_file_pair = fastq_files[1] - - if len(fastq_files) == 1: - fastq_file_1 = fastq_files[0] - if Path(f"{fastq_file_1}.sub").exists(): - logging.info( - "Found an existing .sub file on the fastq path, will use that instead. File:%s.sub", - fastq_file_1, - ) - else: - _subsample_paired_fastq_files(fastq_files, compressed=True, num_threads=num_threads) - - elif len(fastq_files) == 2: - fastq_file_1, fastq_file_2 = fastq_files - if Path(f"{fastq_file_1}.sub").exists() and Path(f"{fastq_file_2}.sub").exists(): - logging.info( - "Found an existing .sub files on the fastq path for both members of the pair, will use \ - those instead of subsampling again. Files: %s.sub,%s.sub", - fastq_file_1, - fastq_file_2, - ) - elif Path(f"{fastq_file_2}.sub").exists(): - _subsample_paired_fastq_files(fastq_files, compressed=True, num_threads=num_threads) - def run_trimming( output_dir: Path, @@ -427,12 +556,12 @@ def run_trimming( check_exe(trim_galore_bin) trim_dir = create_dir(output_dir, "trim_galore_output") - fastq_file_list = [] + fastq_file_list: List[Path] = [] file_types = ("*.fastq", "*.fq", "*.fastq.gz", "*.fq.gz") fastq_file_list = [ path for file_type in file_types for path in Path(short_read_fastq_dir).rglob(file_type) ] - fastq_file_list = _create_paired_paths(fastq_file_list) + paired_fastq_file_list = _create_paired_paths(fastq_file_list) trim_galore_cmd = [ str(trim_galore_bin), @@ -446,9 +575,11 @@ def run_trimming( ] pool = multiprocessing.Pool(int(num_threads)) # pylint:disable=consider-using-with - for fastq_file in fastq_file_list: + for fastq_file in paired_fastq_file_list: + file1, file2 = fastq_file if delete_pre_trim_fastq: - fastq_file.unlink() + file1.unlink() + file2.unlink() pool.apply_async( multiprocess_trim_galore, args=( @@ -505,9 +636,9 @@ def multiprocess_trim_galore(trim_galore_cmd: List, fastq_paired_files: List[Pat def parse_args(): """Parse command line arguments.""" parser = argparse.ArgumentParser(description="STAR's arguments") - parser.add_argument("--genome_file", required=True, help="Genome file path") - parser.add_argument("--output_dir", required=True, help="Output directory path") - parser.add_argument("--short_read_fastq_dir", required=True, help="Short read directory path") + parser.add_argument("--genome_file", help="Genome file path") + parser.add_argument("--output_dir", help="Output directory path") + parser.add_argument("--short_read_fastq_dir", help="Short read directory path") parser.add_argument( "--delete_pre_trim_fastq", action="store_true", @@ -527,6 +658,74 @@ def parse_args(): parser.add_argument("--star_bin", default="STAR", help="Star software path") parser.add_argument("--samtools_bin", default="samtools", help="Samtools software path") parser.add_argument("--trim_galore_bin", default="trim_galore", help="Trim Galore software path") + parser.add_argument( + "--subsample_read_limit", + type=int, + default=100000000, + help="Maximum number of reads to subsample. Default 1 hundred million reads", + required=False, + ) + parser.add_argument( + "--subsample_percentage", + type=float, + default=0.25, + help="Maximun percentage of reads to subsample (0 to 1)", + required=False, + ) + parser.add_argument( + "--sampling_via_read_limit", + type=bool, + default=False, + help="If True will subsample an input dataset of fastq files using --subsample_read_limit value.", + required=False, + ) + parser.add_argument( + "--sampling_via_percentage", + type=bool, + default=False, + help="If True will subsample an input dataset of fastq files using --subsample_percentage value.", + required=False, + ) + parser.add_argument( + "--sampling_via_read_limit_percentage", + type=bool, + default=True, + help="If True will subsample an input dataset of fastq files using --subsample_read_limit \ + and --subsample_percentage value; the lowest number of reads is taken.", + required=False, + ) + parser.add_argument( + "--paired_file_1", + help="Path for single or paired fastq file; used when --run_subsampling \ + or --run_trimming are enabled.", + required=False, + ) + parser.add_argument( + "--paired_file_2", + help="Path for single or paired fastq file; used when --run_subsampling \ + or --run_trimming are enabled.", + required=False, + ) + parser.add_argument( + "--run_star", + type=bool, + help="If True will run STAR alignment given an input dataset of fastq files.", + required=False, + ) + parser.add_argument( + "--run_subsampling", + type=bool, + default=False, + help="If True will subsample an input dataset of fastq files.", + required=False, + ) + parser.add_argument( + "--run_trimming", + type=bool, + default=False, + help="If True will trim input dataset of fastq files using TrimGalore suite.", + required=False, + ) return parser.parse_args() @@ -542,20 +741,44 @@ def main(): defaults={"logfilename": str(log_file_path)}, disable_existing_loggers=False, ) - - run_star( - args.genome_file, - args.output_dir, - args.short_read_fastq_dir, - args.delete_pre_trim_fastq, - args.trim_fastq, - args.max_reads_per_sample, - args.max_intron_length, - args.num_threads, - args.star_bin, - args.samtools_bin, - args.trim_galore_bin, - ) + if args.run_star: + run_star( + args.genome_file, + args.output_dir, + args.short_read_fastq_dir, + args.delete_pre_trim_fastq, + args.trim_fastq, + args.max_reads_per_sample, + args.max_intron_length, + args.subsample_read_limit, + args.subsample_percentage, + args.sampling_via_read_limit, + args.sampling_via_percentage, + args.sampling_via_read_limit_percentage, + args.num_threads, + args.star_bin, + args.samtools_bin, + args.trim_galore_bin, + ) + if args.run_subsampling: + fastq_file_list = [Path(args.paired_file_1), Path(args.paired_file_2)] + subsample_transcriptomic_data( + fastq_file_list, + args.subsample_read_limit, + args.subsample_percentage, + args.sampling_via_read_limit, + args.sampling_via_percentage, + args.sampling_via_read_limit_percentage, + args.num_threads, + ) + if args.run_trimming: + run_trimming( + args.output_dir, + args.short_read_fastq_dir, + args.delete_pre_trim_fastq, + args.num_threads, + args.trim_galore_bin, + ) if __name__ == "__main__": diff --git a/src/python/ensembl/tools/anno/utils/_utils.py b/src/python/ensembl/tools/anno/utils/_utils.py index dfcb38b..4915d37 100644 --- a/src/python/ensembl/tools/anno/utils/_utils.py +++ b/src/python/ensembl/tools/anno/utils/_utils.py @@ -1403,4 +1403,4 @@ def check_transcriptomic_output(main_output_dir): "Found " + str(total_lines) + " total lines across the transcriptomic files. Checks passed" - ) + ) \ No newline at end of file diff --git a/src/python/ensembl/tools/anno/utils/_utils_copy.py b/src/python/ensembl/tools/anno/utils/_utils_copy.py new file mode 100644 index 0000000..b4bbc72 --- /dev/null +++ b/src/python/ensembl/tools/anno/utils/_utils_copy.py @@ -0,0 +1,899 @@ +# pylint: disable=missing-module-docstring +# See the NOTICE file distributed with this work for additional information +# regarding copyright ownership. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import errno +import io +import logging +import os +from os import PathLike +from pathlib import Path +import random +import re +import subprocess +import shutil +import tempfile +from typing import Dict, List, Optional + +logger = logging.getLogger("__name__." + __name__) + +# with open(Path(__file__).parents[6] / "conf/config.json", "r", encoding="utf8") as f: +# config = json.load(f) + + +def create_dir(input_dir: Path, dir_name: str) -> Path: + """ + Create directory or subdirectory and log operations. + Args: + main_output_dir: str main output directory path + dir_name: str optional subdirectory to be created + Returns: + str Path to the created directory + """ + if dir_name: + target_dir = Path(input_dir) / str(dir_name) + else: + target_dir = input_dir + + if os.path.exists(target_dir): + logger.warning("Directory already exists, will not create again") + return target_dir + + logger.info("Attempting to create target dir: %s", target_dir) + + try: + os.mkdir(target_dir) + except OSError: + logger.error("Creation of the dir failed, path used: %s", target_dir) + else: + logger.info("Successfully created the dir on the following path: %s", target_dir) + return target_dir + + +def check_exe(exe_bin: PathLike) -> None: + """ + Check executable path + Args: + exe_bin: Executable path + + Raises: + OSError: Executable path does not exist + """ + if not shutil.which(exe_bin): + raise OSError(f"Exe does not exist. Path checked: {exe_bin}") + + +def check_gtf_content(gtf_file: PathLike, content_obj: str) -> int: + """ + Check number of transcript lines in the GTF + + Arg: + gtf_file: GTF file path + content_obj: Object to detect and count in the gtf i.e transcript, repeat + + Return: Number of occurences + """ + obj_count = 0 + with open(gtf_file) as gtf_in: + for line in gtf_in: + gtf_raw = line.split("\t") + if not len(gtf_raw) == 9: + continue + if gtf_raw[2] == content_obj: + obj_count += 1 + logger.info("%s; Number of %s detected: %d", gtf_file, content_obj, int(obj_count)) + return obj_count + + +def get_seq_region_length(genome_file: PathLike, min_seq_length: int = 0) -> Dict: + """ + Split the genome file according to the header and store in a dictionary + all the sequences whose length is greater than min_seq_length. + Args: + genome_file: Genome file path. + min_seq_length: Minimum slice length. + Return: Dictionary of sequence headers and the corresponding sequence length + """ + current_header = "" + current_seq = "" + + seq_region_to_length = {} + with open(genome_file) as file_in: + for line in file_in: + match = re.search(r">(.+)$", line) + if match and current_header: + if len(current_seq) > min_seq_length: + seq_region_to_length[current_header] = len(current_seq) + + current_seq = "" + current_header = match.group(1) + elif match: + current_header = match.group(1) + else: + current_seq += line.rstrip() + + if len(current_seq) > min_seq_length: + seq_region_to_length[current_header] = len(current_seq) + # logging.info("length of the seq region %s", seq_region_to_length) + return seq_region_to_length + + +def get_slice_id( + seq_region_to_length: Dict, + slice_size: int = 1000000, + overlap: int = 0, + min_length: int = 0, +) -> List: + """ + Get list of ids for a genomic slice + Arg: + seq_region_to_length: Dictionary with the sequence headers as keys and the sequence lengths as values + slice_size: Size of the slice + overlap: Overlap length between two slices + min_length: Min length of the slice + Return: List of IDs for each genomic slice + """ + + slice_ids_per_region = [] + for seq_region in seq_region_to_length: + seq_region_length = int(seq_region_to_length[seq_region]) + if seq_region_length < min_length: + continue + + if seq_region_length <= slice_size: + slice_ids_per_region.append([seq_region, 1, seq_region_length]) + continue + + start = 1 + end = start + slice_size - 1 + while end < seq_region_length: + start = start - overlap + start = max(start, 1) + end = start + slice_size - 1 + end = min(end, seq_region_length) + if (end - start + 1) >= min_length: + slice_ids_per_region.append([seq_region, start, end]) + start = end + 1 + # logging.info("list of slice ids %s", str(slice_ids_per_region)) + return slice_ids_per_region + + +def update_string(text, lookup_text, new_text) -> str: + """ + Update substring + Args: + text : Text. + lookup_text : String to look for. + new_text : String to substitute. + Return + Text with string substitution + """ + text = re.sub( + lookup_text, + new_text, + text, + ) + return text + + +# slice_output_to_gtf(repeatmasker_dir, "repeat_id", "repeatmask", True, ".rm.gtf") +def slice_output_to_gtf( # pylint: disable=too-many-branches + output_dir: Path, + feature_id_label: str = "", + new_id_prefix: str = "", + unique_ids: bool = True, + file_extension: str = ".gtf", +) -> None: + """ + Collect all the gtf files per file extension and merge them in a final gtf file assigning unique ids. + + This holds keys of the current slice details with the gene id to form unique keys. + Each time a new key is added the overall gene counter is incremented + and the value of the key is set to the new gene id. Any subsequent + lines with the same region/gene id key will then just get + the new id without incrementing the counter. + + Args: + output_dir : Output directory. + feature_id_label : Feature identifier. + new_id_prefix : New feature identifier. + unique_ids : If True assign unique ids for the same feature type. + file_extension : Input file extension. + """ + feature_types = ["exon", "transcript", "repeat", "simple_feature"] + # Initialise gene and feature counter + gene_counter = 1 + feature_counter = 1 + # Initialise dictionaries that will store the list of gene and transcript indexes + gene_id_collection = {} + gene_transcript_id_collection = {} + transcript_id_count_gene_id = {} # one gene can have multiple transcripts + input_files = output_dir.glob(str("*" + file_extension)) + with open(output_dir / "annotation.gtf", "w+") as output_file: + for input_file in input_files: + if os.stat(input_file).st_size == 0: + logger.info("File is empty, will skip %s", input_file.name) + continue + match = re.search(r"\.rs(\d+)\.re(\d+)\.", input_file.name) + assert match is not None + start_offset = int(match.group(1)) + with open(input_file, "r") as gtf_in: + for line in gtf_in: + values = line.split("\t") + if len(values) == 9 and (values[2] in feature_types) and unique_ids: + # each slice start from 0 so we need to add the offset to get the real coordinates + values[3] = str(int(values[3]) + (start_offset - 1)) + values[4] = str(int(values[4]) + (start_offset - 1)) + # Unique id based on gene and transcript ids + attribs = values[8] + # Get the slice details, gene id and transcript id. + match_gene_type = re.search( + r'(gene_id +"([^"]+)").+(transcript_id +"([^"]+)")', + line, + ) + # gene_id "1"; transcript_id "1"; biotype "tRNA_pseudogene"; + if match_gene_type: + full_gene_id_string = match_gene_type.group(1) + current_gene_id = match_gene_type.group(2) + full_transcript_id_string = match_gene_type.group(3) + current_transcript_id = match_gene_type.group(4) + # Example key KS8000.rs1.re1000000.1 + gene_id_slice = input_file.name + "." + str(current_gene_id) + # Example key KS8000.rs1.re1000000.1.transcript.1 + transcript_id_slice = gene_id_slice + "." + str(current_transcript_id) + # If there is no existing entry, the gene key will be added + # and the gene counter is incremented. + # gene_id "gene1"; transcript_id "gene1.t1" + if gene_id_slice not in gene_id_collection: + new_gene_id = f"gene{gene_counter}" + gene_id_collection[gene_id_slice] = new_gene_id + transcript_id_count_gene_id[gene_id_slice] = 1 + gene_counter += 1 + else: + # If there is a key then the gene id will be replaced with the new gene id + new_gene_id = gene_id_collection[gene_id_slice] + attribs = re.sub( + full_gene_id_string, + 'gene_id "' + new_gene_id + '"', + attribs, + ) + # If there is no existing entry, the transcript key will be added + # and the transcript counter is incremented. + if transcript_id_slice not in gene_transcript_id_collection: + new_transcript_id = ( + gene_id_collection[gene_id_slice] + + ".t" + + str(transcript_id_count_gene_id[gene_id_slice]) + ) + gene_transcript_id_collection[transcript_id_slice] = new_transcript_id + transcript_id_count_gene_id[gene_id_slice] += 1 + else: + # If a transcript of the same set is already present, + # the new id with the incremented counter is added + new_transcript_id = gene_transcript_id_collection[transcript_id_slice] + attribs = re.sub( + full_transcript_id_string, + 'transcript_id "' + new_transcript_id + '"', + attribs, + ) + values[8] = attribs + logger.info("FINAL GTF %s", values) + output_file.write("\t".join(values)) + # Unique id based on the feature type + else: + if new_id_prefix == "repeatmask": + match_feature_type = re.search( + r"(" + feature_id_label + r"\d+)", + line, + ) + else: + match_feature_type = ( + re.search( # repeat_id\d+ r"(" + feature_id_label + ' +"([^"]+)")', + r"(" + feature_id_label + ' +"([^"]+)")', # "\d+)", + line, + ) + ) + if match_feature_type: + full_feature_id = match_feature_type.group(1) + new_feature_id = new_id_prefix + str(feature_counter) + attribs = re.sub( + full_feature_id, + feature_id_label + ' "' + new_feature_id + '"', + attribs, + ) + feature_counter += 1 + values[8] = attribs + output_file.write("\t".join(values)) + else: + logger.info( + "Feature type not recognised, will skip. Feature type: %s", + values[2], + ) + + +def get_sequence( + seq_region: str, + start: int, + end: int, + strand: int, + fasta_file: Path, + output_dir: Path, +) -> str: + """ + Creates a tempfile and writes the bed info to it based on whatever information + has been passed in about the sequence. Then runs bedtools getfasta. The fasta file + should have a faidx. This can be created with the create_faidx static method prior + to fetching sequence + + Arg: + seq_region: str region name + start: int region start + end: int region end + strand: int strand of the sequence + fasta_file: genome FASTA file with indexing + output_dir: str working dir + Return: str sequence + """ + start -= 1 + logger.info( + "get_sequence %s", + f"{seq_region}\t{start}\t{end}\t{strand}\t{fasta_file}\t{output_dir}", + ) + with tempfile.NamedTemporaryFile(mode="w+t", delete=False, dir=str(output_dir)) as bed_temp_file: + bed_temp_file.writelines(f"{seq_region}\t{start}\t{end}") + bed_temp_file.close() + bedtools_command = [ + "bedtools", + "getfasta", + "-fi", + str(fasta_file), + "-bed", + str(bed_temp_file.name), + ] + bedtools_output = subprocess.Popen(#pylint:disable=consider-using-with + bedtools_command, stdout=subprocess.PIPE + ) # pylint:disable=consider-using-with + for idx, line in enumerate(io.TextIOWrapper(bedtools_output.stdout, encoding="utf-8")): # type: ignore + if idx == 1: + if strand == 1: + sequence = line.rstrip() + else: + sequence = reverse_complement(line.rstrip()) + + os.remove(bed_temp_file.name) + # logger.info(f"sequence : {sequence}") + return sequence + + +def reverse_complement(sequence: str) -> str: + """ + Get the reverse complement of a nucleotide sequence. + Args: + sequence: The nucleotide sequence + Returns: + The reverse complement of the sequence + """ + rev_matrix = str.maketrans("atgcATGC", "tacgTACG") + return sequence.translate(rev_matrix)[::-1] + + +def check_file(file_path: Path) -> None: + """ + Raise an error when the file doesn't exist + Args: + file_path: File path + """ + if not file_path.is_file(): + raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), file_path) + + +def fasta_to_dict(fasta_lines: List[str]) -> Dict[str, str]: + """ + Save fastq sequences in a dictionary + fasta_lines: List[str]: List of sequences + """ + index: Dict[str, str] = {} + header: Optional[str] = None + seq: List[str] = [] + + for line in fasta_lines: + line = line.strip() + header_match = re.match(r">(.+)\n$", line) + if header_match: + if header: + index[header] = "".join(seq) + header = None + header = header_match.group(1) + seq = [] + else: + seq.append(line) + # Add the last sequence + if header: + index[header] = "".join(seq) + return index + + +# def fasta_to_dict(fasta_lines:List[str]): +# index = {} +# for line in fasta_lines: +# #it = iter(fasta_lines) +# #for header in it: +# match = re.search(r">(.+)\n$", line) +# header = match.group(1) +# seq = next(line) +# index[header] = seq +# return index + + +def split_protein_file(protein_dataset: Path, output_dir: Path, batch_size: int = 20) -> List: + """ + The protein dataset file is splitted by a number of sequence equals to the batch_size + in batch files stored in 10 output directories. + protein_dataset : Path for the protein dataset. + output_dir : Output directory path. + batch_size : Size of the batch, it needs to be equals to the number of threads + to parallelise the sequence processing for each file. + """ + batched_protein_files = [] + + for i in range(0, 10): + create_dir(output_dir, (f"bin_{i}")) + with open(protein_dataset, "r", encoding="utf8") as file_in: + seq_count = 0 + batch_count = 0 + current_record = "" + initial_seq = True + for line in file_in: + match = re.search(r">(.+)$", line) + # match header and is not first sequence, if the number of stored sequences in each file equals + # the number of batch_size, a new file will be created and the current_record reset + if match and not initial_seq and seq_count % batch_size == 0: + bin_num = random.randint(0, 9) + batch_file = output_dir / f"bin_{bin_num}" / f"{batch_count}.fa" + with batch_file.open("w+") as file_out: + file_out.write(current_record) + batch_count += 1 + seq_count += 1 + current_record = line + batched_protein_files.append(batch_file) + # match header and is the first sequence + elif match: + current_record += line + initial_seq = False + seq_count += 1 + # other lines + else: + current_record += line + + if current_record: + bin_num = random.randint(0, 9) + batch_file = output_dir / f"bin_{bin_num}" / f"{batch_count}.fa" + with batch_file.open("w+") as file_out: + file_out.write(current_record) + batched_protein_files.append(batch_file) + return batched_protein_files + + +def run_command(cmd) -> None: + """Submit command in multiprocessing action + + Args: + cmd (List[str]): command to execute + """ + try: + result = subprocess.run(cmd, check=True, capture_output=True, text=True) + logger.info("Command executed successfully: %s", cmd) + logger.info("Output: %s", result.stdout) + except subprocess.CalledProcessError as e: + logger.error("Command failed: %s", cmd) + logger.error("Error: %s", e.stderr) + + +def read_gtf_genes(gtf_file: Path) -> Dict[str, Dict[str, Dict[str, List[str]]]]: + """ + Parses a GTF file to extract gene, transcript, and exon information. + + Args: + gtf_file: Path to the GTF file. + + Returns: + A dictionary containing gene information, organized by gene_id, transcript_id, and features. + """ + gtf_genes: Dict[str, Dict[str, Dict[str, List[str]]]] = {} + + with open(gtf_file, "r") as gtf_in: + for line in gtf_in: + eles = line.strip().split("\t") + if len(eles) != 9: + continue + + match = re.search(r'gene_id "([^"]+)".+transcript_id "([^"]+)"', eles[8]) + if not match: + continue + + gene_id = match.group(1) + transcript_id = match.group(2) + feature_type = eles[2] + + if gene_id not in gtf_genes: + gtf_genes[gene_id] = {} + + if feature_type == "transcript": + gtf_genes[gene_id][transcript_id] = {"transcript": [line.strip()], "exons": []} + elif feature_type == "exon": + if transcript_id in gtf_genes[gene_id]: + gtf_genes[gene_id][transcript_id]["exons"].append(line.strip()) + + return gtf_genes + + +def check_transcriptomic_output(main_output_dir: Path) -> None: + """Check across the various transcriptomic if there is a reasonable amount of data + + Args: + main_output_dir (Path): Output dir path + Raises: + IOError: If no lines or fewer than the minimum expected lines are found in the output files. + """ + + transcriptomic_dirs = [ + "scallop_output", + "stringtie_output", + "minimap2_output", + ] + total_lines = 0 + min_lines = 100000 + for transcriptomic_dir in transcriptomic_dirs: + full_file_path = main_output_dir / transcriptomic_dir / "annotation.gtf" + if not full_file_path.exists(): + logger.warning( + "Warning, no annotation.gtf found for %s. This might be fine, \ + e.g. no long read data were provided", + transcriptomic_dir, + ) + continue + num_lines = sum(1 for _ in full_file_path.open("r")) + total_lines += num_lines + logging.info("For %s found a total of in the annotation.gtf file", num_lines) + if total_lines == 0: + raise IOError( + "Anno was run with transcriptomic mode enabled,\ + but the transcriptomic annotation files are empty" + ) + if total_lines <= min_lines: + raise IOError( + "Anno was run with transcriptomic mode enabled, \ + but the total number of lines in the output \ + files were less than the min expected value" + + "\n" + "Found: " + str(total_lines) + "\n" + "Min allowed: " + str(min_lines) + ) + + + +# def run_find_orfs(genome_file, main_output_dir): +# min_orf_length = 600 + +# orf_output_dir = create_dir(main_output_dir, "orf_output") +# seq_region_lengths = get_seq_region_lengths(genome_file, 5000) +# for region_name in seq_region_lengths: +# region_length = seq_region_lengths[region_name] +# seq = get_sequence(region_name, 1, region_length, 1, genome_file, orf_output_dir) +# for phase in range(0, 6): +# find_orf_phased_region(region_name, seq, phase, min_orf_length, orf_output_dir) + + +# def find_orf_phased_region(region_name, seq, phase, min_orf_length, orf_output_dir): +# current_index = phase +# orf_counter = 1 +# if phase > 2: +# seq = reverse_complement(seq) +# current_index = current_index % 3 + +# orf_file_path = os.path.join(orf_output_dir, (region_name + ".phase" + str(phase) + ".orf.fa")) +# orf_out = open(orf_file_path, "w+") + +# while current_index < len(seq): +# codon = seq[current_index : current_index + 3] +# if codon == "ATG": +# orf_seq = codon +# for j in range(current_index + 3, len(seq), 3): +# next_codon = seq[j : j + 3] +# if next_codon == "TAA" or next_codon == "TAG" or next_codon == "TGA": +# orf_seq += next_codon +# if len(orf_seq) >= min_orf_length: +# orf_out.write( +# ">" + region_name + "_phase" + str(phase) + "_orf" + str(orf_counter) + "\n" +# ) +# orf_out.write(orf_seq + "\n") +# orf_counter += 1 +# orf_seq = "" +# break + +# # If there's another met in phase, then put i to the start of +# # the codon after j so that only the longest ORF is found +# if next_codon == "ATG": +# current_index = j + 3 +# orf_seq += next_codon +# current_index += 3 +# orf_out.close() + + +# def bed_to_gff(input_dir, hints_file): +# gff_out = open(hints_file, "w+") +# exons_dict = {} +# for bed_file in glob.glob(input_dir + "/*.bed"): +# logger.info("Processing file for hints:") +# logger.info(bed_file) +# bed_in = open(bed_file) +# bed_lines = bed_in.readlines() +# for line in bed_lines: +# line = line.rstrip() +# elements = line.split("\t") +# seq_region_name = elements[0] +# offset = int(elements[1]) +# hit_name = elements[3] +# strand = elements[5] +# block_sizes = elements[10].split(",") +# block_sizes = list(filter(None, block_sizes)) +# block_starts = elements[11].split(",") +# block_starts = list(filter(None, block_starts)) +# exons = bed_block_to_exons(block_sizes, block_starts, offset) +# for i, element in enumerate(exons): +# exon_coords = exons[i] +# exon_key = seq_region_name + ":" + exon_coords[0] + ":" + exon_coords[1] + ":" + strand +# if exon_key in exons_dict: +# exons_dict[exon_key][5] += 1 +# else: +# gff_list = [ +# seq_region_name, +# "CDNA", +# "exon", +# exon_coords[0], +# exon_coords[1], +# 1.0, +# strand, +# ".", +# ] +# exons_dict[exon_key] = gff_list + +# for exon_key, gff_list in exons_dict.items(): +# gff_list[5] = str(gff_list[5]) +# gff_line = "\t".join(gff_list) + "\tsrc=W;mul=" + gff_list[5] + ";\n" +# gff_out.write(gff_line) + +# gff_out.close() + +# sorted_hints_out = open((hints_file + ".srt"), "w+") +# subprocess.run( +# ["sort", "-k1,1", "-k7,7", "-k4,4", "-k5,5", hints_file], +# stdout=sorted_hints_out, +# ) +# sorted_hints_out.close() + + +# def splice_junction_to_gff(input_dir, hints_file): +# sjf_out = open(hints_file, "w+") + +# for sj_tab_file in glob.glob(input_dir + "/*.sj.tab"): +# sjf_in = open(sj_tab_file) +# sjf_lines = sjf_in.readlines() +# for line in sjf_lines: +# elements = line.split("\t") +# strand = "+" +# # If the strand is undefined then skip, Augustus expects a strand +# if elements[3] == "0": +# continue +# elif elements[3] == "2": +# strand = "-" + +# junction_length = int(elements[2]) - int(elements[1]) + 1 +# if junction_length < 100: +# continue + +# if not elements[4] and elements[7] < 10: +# continue + +# # For the moment treat multimapping and single mapping things as a combined score +# score = float(elements[6]) + float(elements[7]) +# score = str(score) +# output_line = [ +# elements[0], +# "RNASEQ", +# "intron", +# elements[1], +# elements[2], +# score, +# strand, +# ".", +# ("src=W;mul=" + score + ";"), +# ] +# sjf_out.write("\t".join(output_line) + "\n") + +# sjf_out.close() + + +# def split_genome(genome_file, target_dir, min_seq_length): +# # This is the lazy initial way of just splitting into a dir +# # of files based on the toplevel sequence with a min sequence length filter +# # There are a couple of obvious improvements: +# # 1) Instead of making files for all seqs, just process N seqs +# # parallel, where N = num_threads. Then you could clean up the seq file +# # after each seq finishes, thus avoiding potentially +# # having thousands of file in a dir +# # 2) Split the seq into even slices and process these in parallel +# # (which the same cleanup as in 1). For sequences smaller than the +# # target slice size, bundle them up together into a single file. Vastly +# # more complex, partially implemented in the splice_genome method +# # Allows for more consistency with parallelisation (since there should +# # be no large outliers). But require a mapping strategy for the +# # coords and sequence names and all the hints will need to be adjusted +# current_header = "" +# current_seq = "" + +# file_in = open(genome_file) +# line = file_in.readline() +# while line: +# match = re.search(r">(.+)$", line) +# if match and current_header: +# if len(current_seq) > min_seq_length: +# file_out_name = os.path.join(target_dir, (current_header + ".split.fa")) +# if not os.path.exists(file_out_name): +# file_out = open(file_out_name, "w+") +# file_out.write(">" + current_header + "\n" + current_seq + "\n") +# file_out.close() + +# else: +# print("Found an existing split file, so will not overwrite. File found:") +# print(file_out_name) + +# current_seq = "" +# current_header = match.group(1) +# elif match: +# current_header = match.group(1) +# else: +# current_seq += line.rstrip() + +# line = file_in.readline() + +# if len(current_seq) > min_seq_length: +# file_out_name = os.path.join(target_dir, (current_header + ".split.fa")) +# if not os.path.exists(file_out_name): +# file_out = open(file_out_name, "w+") +# file_out.write(">" + current_header + "\n" + current_seq + "\n") +# file_out.close() + +# else: +# logger.info("Found an existing split file, so will not overwrite. File found:") +# logger.info(file_out_name) + +# file_in.close() + + +# # def merge_gtf_files(file_paths,id_label): + +# # gtf_file_path = os.path.join(output_dir,'annotation.gtf') +# # gtf_out = open(gtf_file_path,'w+') +# # for gtf_file_path in gtf_files: +# # gtf_file_name = os.path.basename(gtf_file_path) +# # match = re.search(r'\.rs(\d+)\.re(\d+)\.',gtf_file_name) +# # start_offset = int(match.group(1)) +# # gtf_in = open(gtf_file_path,'r') +# # line = gtf_in.readline() +# # while line: +# # values = line.split("\t") +# # if len(values) == 9 and (values[2] in feature_types): +# # values[3] = str(int(values[3]) + (start_offset - 1)) +# # values[4] = str(int(values[4]) + (start_offset - 1)) +# # gtf_out.write("\t".join(values)) +# # line = gtf_in.readline() +# # gtf_in.close() +# # gtf_out.close() + + +# def multiprocess_generic(cmd): +# print(" ".join(cmd)) +# subprocess.run(cmd) + + +# def seq_region_names(genome_file): +# region_list = [] + +# file_in = open(genome_file) +# line = file_in.readline() +# while line: +# match = re.search(r">([^\s]+)", line) +# if match: +# region_name = match.group(1) +# if region_name == "MT": +# logger.info("Skipping region named MT") +# line = file_in.readline() +# continue +# else: +# region_list.append(match.group(1)) +# line = file_in.readline() + +# return region_list + + +# def slice_genome(genome_file, target_dir, target_slice_size): +# # The below is sort of tested +# # Without the +# target_seq_length = 50000000 +# min_seq_length = 1000 +# current_header = "" +# current_seq = "" +# seq_dict = {} +# for line in seq: # pylint:disable=used-before-assignment +# match = re.search(r">(.+)$", line) +# if match and current_header: +# seq_dict[current_header] = current_seq +# current_seq = "" +# current_header = match.group(1) +# elif match: +# current_header = match.group(1) +# else: +# current_seq += line.rstrip() + +# seq_dict[current_header] = current_seq + +# seq_buffer = 0 +# file_number = 0 +# file_name = "genome_file_" + str(file_number) + +# for header in seq_dict: +# seq_iterator = 0 +# seq = seq_dict[header] + +# while len(seq) > target_seq_length: +# file_out = open(os.path.join(target_dir, file_name), "w+") +# subseq = seq[0:target_seq_length] +# file_out.write(">" + header + "_sli" + str(seq_iterator) + "\n" + subseq + "\n") +# file_out.close() +# seq = seq[target_seq_length:] +# seq_iterator += 1 +# file_number += 1 +# file_name = "genome_file_" + str(file_number) + +# if len(seq) >= min_seq_length: +# file_name = "genome_file_" + str(file_number) +# file_out = open(os.path.join(file_name), "w+") +# file_out.write(">" + header + "_sli" + str(seq_iterator) + "\n" + seq + "\n") +# file_out.close() +# file_number += 1 +# file_name = "genome_file_" + str(file_number) + + +# def coallate_results(main_output_dir): +# results_dir = create_dir(main_output_dir, "results") +# output_dirs = [ +# "augustus_output", +# "cpg_output", +# "dust_output", +# "eponine_output", +# "red_output", +# "rfam_output", +# "trf_output", +# "trnascan_output", +# ] +# for output_dir in output_dirs: +# match = re.search(r"(.+)_output", output_dir) +# result_type = match.group(1) +# results_path = os.path.join(main_output_dir, output_dir, "annotation.gtf") +# copy_path = os.path.join(results_dir, (result_type + ".gtf")) +# if os.path.exists(results_path): +# cpy_cmd = ["cp", results_path, copy_path] +# subprocess.run(cpy_cmd) diff --git a/src/python/ensembl/tools/anno/utils/load_into_core.py b/src/python/ensembl/tools/anno/utils/load_into_core.py new file mode 100644 index 0000000..10b930e --- /dev/null +++ b/src/python/ensembl/tools/anno/utils/load_into_core.py @@ -0,0 +1,435 @@ +# pylint: disable=missing-module-docstring +# See the NOTICE file distributed with this work for additional information +# regarding copyright ownership. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Loads GTF records for multiple features into an Ensembl database. +""" +__all__ = ["load_results_to_ensembl_db"] + +import argparse +import logging +import logging.config +import multiprocessing +from pathlib import Path +import gc +import re +import subprocess +from typing import Generator, List, Dict +import tempfile +from ensembl.tools.anno.utils._utils import ( + create_dir, +) + + +def load_results_to_ensembl_db( + genome_file: Path, + main_output_dir: Path, + db_details: str, + features: Dict[str, tuple[int, str, str]], + num_threads: int = 1, + single_transcript_genes_loading: bool = False, +) -> None: + """ + Loads GTF records for multiple features into an Ensembl database. + + This function processes each feature defined in `features`, batches the associated GTF records, + and then loads them into the Ensembl database using the `generic_load_records_to_ensembl_db` function. + + :param genome_file: Path to the genome file used for the loading process. + :type genome_file: Path + :param main_output_dir: The directory where the feature directories and GTF files are stored. + :type main_output_dir: Path + :param db_details: Comma-separated database connection details \ + (db_name, db_host, db_port, db_user, db_pass). + :type db_details: str + :param features: A dictionary with feature names as keys, and tuples \ + with batch size, load type (e.g., 'gene'), and analysis name (e.g., 'ensembl'). + :type features: (Dict[str, Tuple[int, str, str]]) + :param num_threads: The number of threads to use for loading. + :type num_threads: int, default 1. + :param single_transcript_genes_loading: Flag to indicate whether to load single transcript genes. + :type single_transcript_genes_loading: bool, default False. + + """ + db_loading_script = ( + Path(__file__).parents[5] + / "perl" + / "ensembl" + / "tools" + / "anno" + / "support_scripts_perl" + / "load_gtf_ensembl.pl" + ) + db_loading_dir = create_dir(main_output_dir, "db_loading") + # Process each selected feature + for feature in features: + # Retrieve settings for the current feature + batch_size, load_type, analysis_name = features[feature] + + # Construct the path to the GTF file for the feature + annotation_results_gtf_file = main_output_dir / feature / "annotation.gtf" + + # Check if the file exists and load the feature + if annotation_results_gtf_file.exists(): + logging.info("Loading %s geneset to db", feature) + gtf_records = batch_gtf_records(annotation_results_gtf_file, batch_size, load_type) + generic_load_records_to_ensembl_db( + single_transcript_genes_loading, + db_loading_script, + genome_file, + db_details, + db_loading_dir, + load_type, + analysis_name, + gtf_records, + num_threads, + ) + else: + logging.error( + "Did not find the %s annotation file, so not loading. Path checked: %s", + feature, + annotation_results_gtf_file, + ) + + +def generic_load_records_to_ensembl_db( + single_transcript_genes_loading: bool, + db_loading_script: Path, + genome_file: Path, + db_details: str, + db_loading_dir: Path, + load_type: str, + analysis_name: str, + gtf_records: List[List[str]], + num_threads: int, +) -> None: + """ + Loads GTF records into the Ensembl database using multiple threads. + + This function processes the GTF records in batches and loads them asynchronously + into the database using multiprocessing. The function delegates the actual loading + of records to `multiprocess_load_records_to_ensembl_db` for each batch. + + Args: + single_transcript_genes_loading (str): Option for loading transcript separately without + defining the canonical. + db_loading_script (Path): Path to the Perl script used for loading records into Ensembl. + genome_file (Path): Path to the genome file. + db_details (str): Database details. + db_loading_dir (Path): Directory for storing temporary files related to DB loading. + load_type (str): Type of the records being loaded (e.g., "gene", "transcript"). + analysis_name (str): Name of the analysis being performed. + gtf_records (List[List[str]]): A list of GTF record batches to be loaded. + num_threads (int): Number of threads to use for loading records. + + Returns: + None: This function performs an asynchronous operation and doesn't return any value. + """ + pool = multiprocessing.Pool(int(num_threads))#pylint:disable=consider-using-with + for record_batch in gtf_records: + pool.apply_async( + multiprocess_load_records_to_ensembl_db, + args=( + single_transcript_genes_loading, + db_loading_script, + genome_file, + db_details, + db_loading_dir, + load_type, + analysis_name, + record_batch, + ), + ) + try: + pool.close() + pool.join() + except Exception as e:#pylint:disable=broad-exception-caught + logging.error("Error during pool shutdown: %s", e) + finally: + pool.terminate() + + +def multiprocess_load_records_to_ensembl_db( + single_transcript_genes_loading: bool, + db_loading_script: str, + genome_file: str, + db_details: str, + output_dir: str, + load_type: str, + analysis_name: str, + record_batch: List[str], +) -> None: + """ + Loads a batch of GTF records into an Ensembl database using a temporary GTF file. + + This function writes the provided `record_batch` to a temporary GTF file, constructs + a command to load the records into the database using a Perl script, and executes it + via a subprocess. After execution, the temporary GTF file is removed. + + Args: + single_transcript_genes_loading (bool): Flag to indicate if single transcript genes + should be loaded. + db_loading_script (str): Path to the Perl script used to load records into the database. + genome_file (str): Path to the genome file to be used by the loading script. + db_details (str): Comma-separated string containing database connection details + (db_name, db_host, db_port, db_user, db_pass). + output_dir (str): Directory where the temporary GTF file will be stored. + load_type (str): Type of the record being loaded (e.g., "gene"). + analysis_name (str): Name of the analysis being performed (e.g., "ensembl"). + record_batch (List[str]): List of GTF records (as strings) to be written to the temporary file. + + Returns: + None: This function performs a subprocess call and does not return any value. + """ + with tempfile.NamedTemporaryFile(mode="w+t", delete=False, dir=output_dir) as gtf_temp_out: + for line in record_batch: + gtf_temp_out.writelines(line) + # gtf_temp_file_path = gtf_temp_out.name + + (db_name, db_host, db_port, db_user, db_pass) = db_details.split(",") + + loading_cmd = [ + "perl", + str(db_loading_script), + "-genome_file", + str(genome_file), + "-dbname", + db_name, + "-host", + db_host, + "-port", + str(db_port), + "-user", + db_user, + "-pass", + db_pass, + "-gtf_file", + str(gtf_temp_out), + "-analysis_name", + analysis_name, + "-load_type", + load_type, + ] + + if load_type == "gene" and analysis_name == "ensembl": + loading_cmd.extend( + [ + "-protein_coding_biotype", + "anno_protein_coding", + "-non_coding_biotype", + "anno_lncRNA", + ] + ) + + if single_transcript_genes_loading: + loading_cmd.append("-make_single_transcript_genes") + + logging.info(" ".join(loading_cmd)) + try: + # Run the loading command as a subprocess + subprocess.run(loading_cmd, check=True) + except subprocess.CalledProcessError as e: + logging.error("Error executing loading command: %s", e) + finally: + # Clean up temporary files + try: + gtf_temp_out.unlink() + logging.info("Temporary file %s removed.", gtf_temp_out) + except OSError as e: + logging.error("Error removing temporary file %s, %s", gtf_temp_out, e) + # Run garbage collection to free up memory + gc.collect() + logging.info("Finished processing batch from %s", gtf_temp_out) + + +# The following function assumes the file has unique ids for the parent features. +# It then batches them into +# sets of records based on the batch size passed in +def batch_gtf_records(input_gtf_file: Path, batch_size: int, record_type: str) -> List[List[str]]: + """ + Processes a GTF file in batches and returns a list of record batches. + + Args: + input_gtf_file (Path): The path to the input GTF file. + batch_size (int): The number of records per batch. + record_type (str): The type of records to process ('gene' or 'single_line_feature'). + + Returns: + List[List[str]]: A list of batches, each containing GTF lines as lists of strings. + """ + + def read_gtf_lines(gtf_file: Path) -> Generator[str, None, None]: + """ + Generator that reads and yields valid GTF lines from the file. + + Args: + gtf_file (Path): The path to the input GTF file. + + Yields: + str: A valid GTF line. + """ + with open(gtf_file) as gtf_in: + for line in gtf_in: + if re.match(r"^#", line): # Skip comment lines + continue + elems = line.split("\t") + if len(elems) == 9: # Ensure valid GTF line + yield line + + records: List[List[str]] = [] + current_record_batch: List[str] = [] + record_counter: int = 0 + current_gene_id: str = "" + + # Iterate over the GTF lines + for line in read_gtf_lines(input_gtf_file): + if record_type == "gene": + match = re.search(r'gene_id "([^"]+)"', line) + if match: + gene_id = match.group(1) + + if not current_gene_id: + record_counter += 1 + current_gene_id = gene_id + + if gene_id != current_gene_id: + record_counter += 1 + if record_counter % batch_size == 0: + records.append(current_record_batch) + current_record_batch = [] + current_gene_id = gene_id + + current_record_batch.append(line) + + elif record_type == "single_line_feature": + record_counter += 1 + if record_counter % batch_size == 0: + records.append(current_record_batch) + current_record_batch = [] + + current_record_batch.append(line) + + # Append the last batch of records + if current_record_batch: + records.append(current_record_batch) + + return records + + +def build_feature_settings( + args: argparse.Namespace, repeatmasker_analysis: str = "repeatmask_repbase_human" +) -> Dict[str, tuple[int, str, str]]: + """ + Builds the feature settings dictionary based on the enabled flags from the argument object. + + This function uses the provided argument flags (`args`) to determine which features + to include in the returned settings dictionary. The dictionary contains the configuration + for each feature, including batch size, record type, and analysis name. + + Args: + args (object): An object containing flags (e.g., `args.annotation`, `args.rfam`) + that determine which features should be included. + repeatmasker_analysis (str, optional): The analysis name for RepeatMasker output. + Defaults to "repeatmask_repbase_human". + + Returns: + Dict[str, tuple[int, str, str]]: A dictionary where keys are feature names (e.g., "annotation_output") + and values are tuples containing batch size (int), + record type (str), and analysis name (str). + """ + # Base configurations for all features + all_features = { + "annotation_output": (200, "gene", "ensembl"), + "rfam_output": (500, "gene", "ncrna"), + "trnascan_output": (500, "gene", "ncrna"), + "dust_output": (500, "single_line_feature", "dust"), + "red_output": (500, "single_line_feature", "repeatdetector"), + "trf_output": (500, "single_line_feature", "trf"), + "repeatmasker_output": (500, "single_line_feature", repeatmasker_analysis), + "cpg_output": (500, "single_line_feature", "cpg"), + "eponine_output": (500, "single_line_feature", "eponine"), + } + + # Start with an empty dictionary + feature_settings = {} + + # Add features based on enabled flags + if args.annotation: + feature_settings["annotation_output"] = all_features["annotation_output"] + if args.rfam: + feature_settings["rfam_output"] = all_features["rfam_output"] + if args.trnascan: + feature_settings["trnascan_output"] = all_features["trnascan_output"] + if args.dust: + feature_settings["dust_output"] = all_features["dust_output"] + if args.red: + feature_settings["red_output"] = all_features["red_output"] + if args.trf: + feature_settings["trf_output"] = all_features["trf_output"] + if args.repeatmasker: + feature_settings["repeatmasker_output"] = all_features["repeatmasker_output"] + if args.cpg: + feature_settings["cpg_output"] = all_features["cpg_output"] + if args.eponine: + feature_settings["eponine_output"] = all_features["eponine_output"] + + return feature_settings + + +def parse_args(): + """Parse command line arguments.""" + parser = argparse.ArgumentParser(description="Load multiple features into the Ensembl database.") + parser.add_argument("--output_dir", required=True, help="Output directory path") + parser.add_argument("--genome_file", required=True, help="Path for the genome fasta file") + parser.add_argument("--annotation", action="store_true", help="Enable annotation output feature") + parser.add_argument("--rfam", action="store_true", help="Enable RFAM feature") + parser.add_argument("--trnascan", action="store_true", help="Enable tRNA scan feature") + parser.add_argument("--dust", action="store_true", help="Enable dust feature") + parser.add_argument("--red", action="store_true", help="Enable RED feature") + parser.add_argument("--trf", action="store_true", help="Enable TRF feature") + parser.add_argument("--repeatmasker", action="store_true", help="Enable RepeatMasker feature") + parser.add_argument("--cpg", action="store_true", help="Enable CpG feature") + parser.add_argument("--eponine", action="store_true", help="Enable Eponine feature") + parser.add_argument( + "--db_details", + required=True, + type=str, + help="Comma-separated string containing database details ", + ) + parser.add_argument("--num_threads", type=int, default=1, help="Number of threads") + + parser.add_argument( + "--single_transcript_genes_loading", + action="store_true", + help="Flag to indicate whether to load single transcript genes (default is False).", + ) + return parser.parse_args() + + +def main(): + """Loading into core database's entry-point.""" + args = parse_args() + + # Build the feature settings dictionary dynamically + feature_setting = build_feature_settings(args, repeatmasker_analysis="repeatmask_repbase_human") + + load_results_to_ensembl_db( + args.genome_file, + args.output_dir, + args.db_details, + feature_setting, + args.num_threads, + args.single_transcript_genes_loading, + )