diff --git a/workflow_taxVamb/config.json b/workflow_taxVamb/config.json index 6a5a3703..15533983 100644 --- a/workflow_taxVamb/config.json +++ b/workflow_taxVamb/config.json @@ -9,10 +9,13 @@ "minimap_ppn": "15", "vaevae_mem": "50GB", "vaevae_ppn": "30", + "reclust_mem": "50GB", + "reclust_ppn": "30", "checkm2_mem": "15GB", "checkm2_ppn": "15", "mmseq_mem": "260GB", "mmseq_ppn": "30", + "mmseq_db": "/path/to/mmseq2/gtdb", "vaevae_params": " -l 64 -e 500 -q 25 75 150 -pe 100 -pq 25 75 --model vaevae -o C ", "vaevae_preload": "", "outdir": "taxVamb_outdir", diff --git a/workflow_taxVamb/envs/taxVamb.yaml b/workflow_taxVamb/envs/taxVamb.yaml index a8eec2ac..5a594e81 100644 --- a/workflow_taxVamb/envs/taxVamb.yaml +++ b/workflow_taxVamb/envs/taxVamb.yaml @@ -12,5 +12,7 @@ dependencies: - scikit-learn=1.2.2 - pandas=2.0.0 - mmseqs2 +- prodigal +- hmmer - pip: - ordered-set==4.1.0 diff --git a/workflow_taxVamb/taxVamb.snake.conda.smk b/workflow_taxVamb/taxVamb.snake.conda.smk index 56cdfb42..fa19b320 100644 --- a/workflow_taxVamb/taxVamb.snake.conda.smk +++ b/workflow_taxVamb/taxVamb.snake.conda.smk @@ -27,12 +27,15 @@ MIN_BIN_SIZE = int(get_config("min_bin_size", "200000", r"[1-9]\d*$")) MIN_IDENTITY = float(get_config("min_identity", "0.95", r".*")) MM_MEM = get_config("minimap_mem", "35gb", r"[1-9]\d*GB$") MM_PPN = get_config("minimap_ppn", "10", r"[1-9]\d*$") -MMSEQ_PPN = get_config("mmseq_ppn_r", "260gb", r"[1-9]\d*GB$") -MMSEQ_MEM = get_config("mmseq_mem_r", "20", r"[1-9]\d*$") +MMSEQ_MEM = get_config("mmseq_mem", "260gb", r"[1-9]\d*GB$") +MMSEQ_PPN = get_config("mmseq_ppn", "30", r"[1-9]\d*$") +MMSEQ_DB = get_config("mmseq_db", "", r".*") VAEVAE_PARAMS = get_config("vaevae_params"," -o C --minfasta 200000 ", r".*") VAEVAE_PRELOAD = get_config("vaevae_preload", "", r".*") VAEVAE_MEM = get_config("vaevae_mem", "20gb", r"[1-9]\d*GB$") VAEVAE_PPN = get_config("vaevae_ppn", "10", r"[1-9]\d*(:gpus=[1-9]\d*)?$") +RECLUST_MEM = get_config("reclust_mem", "20gb", r"[1-9]\d*GB$") +RECLUST_PPN = get_config("reclust_ppn", "10", r"[1-9]\d*(:gpus=[1-9]\d*)?$") CHECKM_MEM = get_config("checkm2_mem", "10gb", r"[1-9]\d*GB$") CHECKM_PPN = get_config("checkm2_ppn", "10", r"[1-9]\d*$") MIN_COMP = get_config("min_comp", "0.9", r".*") @@ -118,16 +121,16 @@ rule mmseq2: threads: int(MMSEQ_PPN) log: - o=os.path.join(OUTDIR,'log','mmseq_{sample}.out'), - e=os.path.join(OUTDIR,'log','mmseq_{sample}.err') + o=os.path.join(OUTDIR,'log','mmseq.out'), + e=os.path.join(OUTDIR,'log','mmseq.err') conda: "taxVamb" shell: """ mmseqs createdb {input} {OUTDIR}/tmp/mmseq/qdb - mmseqs taxonomy {OUTDIR}/tmp/mmseq/qdb {OUTDIR}/tmp/mmseq/sdb {OUTDIR}/tmp/mmseq/taxonomy {OUTDIR}/tmp/mmseq/tmp --threads {threads} --tax-lineage 1 - mmseqs createtsv {OUTDIR}/tmp/mmseq/qdb {OUTDIR}/tmp/mmseq/taxonomy {OUTDIR}/tmp/mmseq/taxonomy.tsv + mmseqs taxonomy {OUTDIR}/tmp/mmseq/qdb {MMSEQ_DB} {OUTDIR}/tmp/mmseq/taxonomy {OUTDIR}/tmp/mmseq/tmp --threads {threads} --tax-lineage 1 + mmseqs createtsv {OUTDIR}/tmp/mmseq/qdb {OUTDIR}/tmp/mmseq/taxonomy {output.taxonomytsv} touch {output.out_log_file} """ @@ -320,8 +323,8 @@ rule bam_abundance: "taxVamb" log: bam = os.path.join(OUTDIR,"log/abundance/bam_abundance_{sample}.log"), - o = os.path.join(OUTDIR,"log/abundance/{sample}.bam_abundance.o"), - e = os.path.join(OUTDIR,"log/abundance/{sample}.bam_abundance.e") + o = os.path.join(OUTDIR,"log/abundance/bam_abundance.{sample}.o"), + e = os.path.join(OUTDIR,"log/abundance/bam_abundance.{sample}.e") shell: """ @@ -363,12 +366,10 @@ rule run_vaevae: abundance = os.path.join(OUTDIR,"abundance.npz"), taxonomytsv = os.path.join(OUTDIR,"tmp/mmseq/taxonomy.tsv"), mmsq_log = os.path.join(OUTDIR,"tmp/mmseq_finished.log") - output: - outdir = directory(os.path.join(OUTDIR,"vaevae")), clusters = os.path.join(OUTDIR,"vaevae/vae_clusters.tsv"), - contignames=os.path.join(OUTDIR,"vaevae/contignames"), - contiglenghts=os.path.join(OUTDIR,"vaevae/lengths.npz") + latents = os.path.join(OUTDIR,"vaevae/latent.npz"), + vaevae_log = os.path.join(OUTDIR,"tmp/vaevae_finished.log") params: walltime="86400", nodes="1", @@ -381,18 +382,19 @@ rule run_vaevae: conda: "taxVamb" log: - vaevae_log = os.path.join(OUTDIR,"tmp/vaevae_finished.log"), o = os.path.join(OUTDIR,'log','run_vaevae.out'), - e = os.path.join(OUTDIR,'log','run_vaevae.err') + e = os.path.join(OUTDIR,'log','run_vaevae.err'), + shell: """ #rm -rf {OUTDIR}/abundances #rm -rf {OUTDIR}/contigs.flt.dict #rm -f {OUTDIR}/contigs.flt.mmi - rm -rf {output.outdir} + rm -rf {OUTDIR}/vaevae {VAEVAE_PRELOAD} + mkdir -p {OUTDIR}/Final_bins vamb \ - --outdir {output.outdir} \ + --outdir {OUTDIR}/vaevae \ --fasta {input.contigs} \ --rpkm {input.abundance} \ --taxonomy {input.taxonomytsv} \ @@ -400,53 +402,55 @@ rule run_vaevae: -p {threads} \ -m {MIN_CONTIG_SIZE} \ {params.cuda} \ - {VAEVAE_PARAMS} \ - mkdir -p {OUTDIR}/Final_bins - touch {log.vaevae_log} + {VAEVAE_PARAMS} + + touch {output.vaevae_log} + """ -rule reclustering +rule reclustering: input: - latents = os.path.join(OUTDIR,"vaevae/latents.npz"), - clusters = os.path.join(OUTDIR,"vaevae/clusters.tsv"), + latents = os.path.join(OUTDIR,"vaevae/latent.npz"), + clusters = os.path.join(OUTDIR,"vaevae/vae_clusters.tsv"), contigs = os.path.join(OUTDIR,"contigs.flt.fna.gz"), abundance = os.path.join(OUTDIR,"abundance.npz"), vaevae_log = os.path.join(OUTDIR,"tmp/vaevae_finished.log") output: - reclustered_bins = directory(os.path.join(OUTDIR,"vaevae/bins_reclustered")), - hmm_markers = os.path.join(OUTDIR,"tmp/markers.hmmout") + #reclustered_bins = directory(os.path.join(OUTDIR,"vaevae/bins_reclustered")), + hmm_markers = os.path.join(OUTDIR,"tmp/markers.hmmout"), + recluster_log = os.path.join(OUTDIR,"tmp/reclustering_finished.log") resources: mem = RECLUST_MEM threads: - int(RECUST_PPN) + int(RECLUST_PPN) conda: "taxVamb" log: - recluster_log = os.path.join(OUTDIR,"tmp/reclustering_finished.log"), o = os.path.join(OUTDIR,'log','run_reclustering.out'), e = os.path.join(OUTDIR,'log','run_reclustering.err') shell: """ + rm -rf {OUTDIR}/vaevae/bins_reclustered vamb \ --model reclustering \ --latent_path {input.latents} \ --clusters_path {input.clusters} \ --fasta {input.contigs} \ --rpkm {input.abundance} \ - --outdir {output.reclustered_bins} \ + --outdir {OUTDIR}/vaevae/bins_reclustered \ --hmmout_path {output.hmm_markers} \ --minfasta {MIN_BIN_SIZE} - touch {log.recluster_log} + touch {output.recluster_log} """ # Evaluate in which samples bins were reconstructed checkpoint samples_with_bins: input: - os.path.join(OUTDIR,"tmp/vaevae_finished.log") + os.path.join(OUTDIR,"tmp/reclustering_finished.log") output: os.path.join(OUTDIR,"tmp/samples_with_bins.txt") params: @@ -462,7 +466,7 @@ checkpoint samples_with_bins: threads: 1 shell: - "find {OUTDIR}/vaevae/bins/*/ -type d ! -empty |sed 's=.*bins/==g' |sed 's=/==g' > {output}" + "find {OUTDIR}/vaevae/bins_reclustered/bins/*/ -type d ! -empty | sed 's=.*bins/==g' |sed 's=/==g' > {output}" def samples_with_bins_f(wildcards): @@ -491,7 +495,7 @@ rule run_checkm2_per_sample_all_bins: conda: "checkm2" shell: - "checkm2 predict --threads {threads} --input {OUTDIR}/vaevae/bins/{wildcards.sample}/*.fna --output-directory {OUTDIR}/tmp/checkm2_all/{wildcards.sample} > {output.out_log_file}" + "checkm2 predict --threads {threads} --input {OUTDIR}/vaevae/bins_reclustered/bins/{wildcards.sample}/*.fna --output-directory {OUTDIR}/tmp/checkm2_all/{wildcards.sample} > {output.out_log_file}" # this rule will be executed when all CheckM2 runs per sample finish, so it can move to the next step rule cat_checkm2_all: @@ -539,7 +543,7 @@ rule create_cluster_scores_bin_path_dictionaries: e=os.path.join(OUTDIR,'log','cs_bp_dicts.err') shell: - "python {params.path} --s {OUTDIR}/tmp/checkm2_all --b {OUTDIR}/vaevae/bins --cs_d {output.cluster_score_dict_path} --bp_d {output.bin_path_dict_path} " + "python {params.path} --s {OUTDIR}/tmp/checkm2_all --b {OUTDIR}/vaevae/bins_reclustered/bins --cs_d {output.cluster_score_dict_path} --bp_d {output.bin_path_dict_path} " @@ -548,12 +552,10 @@ rule sym_NC_bins: input: cluster_score_dict_path = os.path.join(OUTDIR,"tmp/cs_d.json"), bin_path_dict_path = os.path.join(OUTDIR,"tmp/bp_d.json") - output: - nc_bins_dir = os.path.join(OUTDIR,"Final_bins") - + nc_bins_dir = directory(os.path.join(OUTDIR,"Final_bins")) params: - path = UPDATE os.path.join(SNAKEDIR, "src", "symlink_nc_bins.py"), + path = os.path.join(SNAKEDIR, "src", "symlink_nc_bins.py"), walltime = "86400", nodes = "1", ppn = "4" @@ -570,38 +572,38 @@ rule sym_NC_bins: shell: """ - python {params.path} --cs_d {input.cluster_score_dict_path} --bp_d {input.bin_path_dict_path} --min_comp {MIN_COMP} --max_cont {MAX_CONT} + python {params.path} --cs_d {input.cluster_score_dict_path} --bp_d {input.bin_path_dict_path} --min_comp {MIN_COMP} --max_cont {MAX_CONT} -o {output.nc_bins_dir} touch {log.ncs_syml} """ -# Write final clusters from the Final_bins folder -rule write_clusters_from_nc_folders: - input: - ncs_syml = os.path.join(OUTDIR,'tmp','sym_final_bins_finished.log') +# # Write final clusters from the Final_bins folder # NC_bins contains symlinks and bins from all samples in the same dir +# rule write_clusters_from_nc_folders: +# input: +# ncs_syml = os.path.join(OUTDIR,'tmp','sym_final_bins_finished.log') - output: - os.path.join(OUTDIR,"Final_clusters.tsv") - log: - log_fin = os.path.join(OUTDIR,"tmp/final_taxvamb_clusters_written.log"), - o=os.path.join(OUTDIR,'log','create_final_clusters.out'), - e=os.path.join(OUTDIR,'log','create_final_clusters.err') +# output: +# os.path.join(OUTDIR,"Final_clusters.tsv") +# log: +# log_fin = os.path.join(OUTDIR,"tmp/final_taxvamb_clusters_written.log"), +# o=os.path.join(OUTDIR,'log','create_final_clusters.out'), +# e=os.path.join(OUTDIR,'log','create_final_clusters.err') - params: - path = os.path.join(SNAKEDIR, "src", "write_clusters_from_final_bins.sh"), - walltime = "86400", - nodes = "1", - ppn = "1" - resources: - mem = "1GB" - threads: - 1 - conda: - "taxVamb" +# params: +# path = os.path.join(SNAKEDIR, "src", "write_clusters_from_final_bins.sh"), +# walltime = "86400", +# nodes = "1", +# ppn = "1" +# resources: +# mem = "1GB" +# threads: +# 1 +# conda: +# "taxVamb" - shell: - "sh {params.path} -d {OUTDIR}/Final_bins -o {output} ;" - "touch {log.log_fin} " +# shell: +# "sh {params.path} -d {OUTDIR}/Final_bins -o {output} ;" +# "touch {log.log_fin} " # Rename and move some files and folders @@ -619,8 +621,8 @@ rule workflow_finished: threads: 1 log: - o=os.path.join(OUTDIR,'log','workflow_finished.out'), - e=os.path.join(OUTDIR,'log','workflow_finished.err') + o=os.path.join(OUTDIR,'log','workflow_finished_taxvamb.out'), + e=os.path.join(OUTDIR,'log','workflow_finished_taxvamb.err') shell: """ #rm -r {OUTDIR}/tmp/checkm2_all/*/protein_files