Skip to content

Commit

Permalink
Fix smk wk
Browse files Browse the repository at this point in the history
  • Loading branch information
[email protected] committed Jul 13, 2023
1 parent 57202d0 commit 2a08c95
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 63 deletions.
3 changes: 3 additions & 0 deletions workflow_taxVamb/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@
"minimap_ppn": "15",
"vaevae_mem": "50GB",
"vaevae_ppn": "30",
"reclust_mem": "50GB",
"reclust_ppn": "30",
"checkm2_mem": "15GB",
"checkm2_ppn": "15",
"mmseq_mem": "260GB",
"mmseq_ppn": "30",
"mmseq_db": "/path/to/mmseq2/gtdb",
"vaevae_params": " -l 64 -e 500 -q 25 75 150 -pe 100 -pq 25 75 --model vaevae -o C ",
"vaevae_preload": "",
"outdir": "taxVamb_outdir",
Expand Down
2 changes: 2 additions & 0 deletions workflow_taxVamb/envs/taxVamb.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,7 @@ dependencies:
- scikit-learn=1.2.2
- pandas=2.0.0
- mmseqs2
- prodigal
- hmmer
- pip:
- ordered-set==4.1.0
128 changes: 65 additions & 63 deletions workflow_taxVamb/taxVamb.snake.conda.smk
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,15 @@ MIN_BIN_SIZE = int(get_config("min_bin_size", "200000", r"[1-9]\d*$"))
MIN_IDENTITY = float(get_config("min_identity", "0.95", r".*"))
MM_MEM = get_config("minimap_mem", "35gb", r"[1-9]\d*GB$")
MM_PPN = get_config("minimap_ppn", "10", r"[1-9]\d*$")
MMSEQ_PPN = get_config("mmseq_ppn_r", "260gb", r"[1-9]\d*GB$")
MMSEQ_MEM = get_config("mmseq_mem_r", "20", r"[1-9]\d*$")
MMSEQ_MEM = get_config("mmseq_mem", "260gb", r"[1-9]\d*GB$")
MMSEQ_PPN = get_config("mmseq_ppn", "30", r"[1-9]\d*$")
MMSEQ_DB = get_config("mmseq_db", "", r".*")
VAEVAE_PARAMS = get_config("vaevae_params"," -o C --minfasta 200000 ", r".*")
VAEVAE_PRELOAD = get_config("vaevae_preload", "", r".*")
VAEVAE_MEM = get_config("vaevae_mem", "20gb", r"[1-9]\d*GB$")
VAEVAE_PPN = get_config("vaevae_ppn", "10", r"[1-9]\d*(:gpus=[1-9]\d*)?$")
RECLUST_MEM = get_config("reclust_mem", "20gb", r"[1-9]\d*GB$")
RECLUST_PPN = get_config("reclust_ppn", "10", r"[1-9]\d*(:gpus=[1-9]\d*)?$")
CHECKM_MEM = get_config("checkm2_mem", "10gb", r"[1-9]\d*GB$")
CHECKM_PPN = get_config("checkm2_ppn", "10", r"[1-9]\d*$")
MIN_COMP = get_config("min_comp", "0.9", r".*")
Expand Down Expand Up @@ -118,16 +121,16 @@ rule mmseq2:
threads:
int(MMSEQ_PPN)
log:
o=os.path.join(OUTDIR,'log','mmseq_{sample}.out'),
e=os.path.join(OUTDIR,'log','mmseq_{sample}.err')
o=os.path.join(OUTDIR,'log','mmseq.out'),
e=os.path.join(OUTDIR,'log','mmseq.err')

conda:
"taxVamb"
shell:
"""
mmseqs createdb {input} {OUTDIR}/tmp/mmseq/qdb
mmseqs taxonomy {OUTDIR}/tmp/mmseq/qdb {OUTDIR}/tmp/mmseq/sdb {OUTDIR}/tmp/mmseq/taxonomy {OUTDIR}/tmp/mmseq/tmp --threads {threads} --tax-lineage 1
mmseqs createtsv {OUTDIR}/tmp/mmseq/qdb {OUTDIR}/tmp/mmseq/taxonomy {OUTDIR}/tmp/mmseq/taxonomy.tsv
mmseqs taxonomy {OUTDIR}/tmp/mmseq/qdb {MMSEQ_DB} {OUTDIR}/tmp/mmseq/taxonomy {OUTDIR}/tmp/mmseq/tmp --threads {threads} --tax-lineage 1
mmseqs createtsv {OUTDIR}/tmp/mmseq/qdb {OUTDIR}/tmp/mmseq/taxonomy {output.taxonomytsv}
touch {output.out_log_file}
"""

Expand Down Expand Up @@ -320,8 +323,8 @@ rule bam_abundance:
"taxVamb"
log:
bam = os.path.join(OUTDIR,"log/abundance/bam_abundance_{sample}.log"),
o = os.path.join(OUTDIR,"log/abundance/{sample}.bam_abundance.o"),
e = os.path.join(OUTDIR,"log/abundance/{sample}.bam_abundance.e")
o = os.path.join(OUTDIR,"log/abundance/bam_abundance.{sample}.o"),
e = os.path.join(OUTDIR,"log/abundance/bam_abundance.{sample}.e")

shell:
"""
Expand Down Expand Up @@ -363,12 +366,10 @@ rule run_vaevae:
abundance = os.path.join(OUTDIR,"abundance.npz"),
taxonomytsv = os.path.join(OUTDIR,"tmp/mmseq/taxonomy.tsv"),
mmsq_log = os.path.join(OUTDIR,"tmp/mmseq_finished.log")

output:
outdir = directory(os.path.join(OUTDIR,"vaevae")),
clusters = os.path.join(OUTDIR,"vaevae/vae_clusters.tsv"),
contignames=os.path.join(OUTDIR,"vaevae/contignames"),
contiglenghts=os.path.join(OUTDIR,"vaevae/lengths.npz")
latents = os.path.join(OUTDIR,"vaevae/latent.npz"),
vaevae_log = os.path.join(OUTDIR,"tmp/vaevae_finished.log")
params:
walltime="86400",
nodes="1",
Expand All @@ -381,72 +382,75 @@ rule run_vaevae:
conda:
"taxVamb"
log:
vaevae_log = os.path.join(OUTDIR,"tmp/vaevae_finished.log"),
o = os.path.join(OUTDIR,'log','run_vaevae.out'),
e = os.path.join(OUTDIR,'log','run_vaevae.err')
e = os.path.join(OUTDIR,'log','run_vaevae.err'),

shell:
"""
#rm -rf {OUTDIR}/abundances
#rm -rf {OUTDIR}/contigs.flt.dict
#rm -f {OUTDIR}/contigs.flt.mmi
rm -rf {output.outdir}
rm -rf {OUTDIR}/vaevae
{VAEVAE_PRELOAD}
mkdir -p {OUTDIR}/Final_bins
vamb \
--outdir {output.outdir} \
--outdir {OUTDIR}/vaevae \
--fasta {input.contigs} \
--rpkm {input.abundance} \
--taxonomy {input.taxonomytsv} \
--minfasta {MIN_BIN_SIZE} \
-p {threads} \
-m {MIN_CONTIG_SIZE} \
{params.cuda} \
{VAEVAE_PARAMS} \
mkdir -p {OUTDIR}/Final_bins
touch {log.vaevae_log}
{VAEVAE_PARAMS}
touch {output.vaevae_log}
"""

rule reclustering
rule reclustering:
input:
latents = os.path.join(OUTDIR,"vaevae/latents.npz"),
clusters = os.path.join(OUTDIR,"vaevae/clusters.tsv"),
latents = os.path.join(OUTDIR,"vaevae/latent.npz"),
clusters = os.path.join(OUTDIR,"vaevae/vae_clusters.tsv"),
contigs = os.path.join(OUTDIR,"contigs.flt.fna.gz"),
abundance = os.path.join(OUTDIR,"abundance.npz"),
vaevae_log = os.path.join(OUTDIR,"tmp/vaevae_finished.log")

output:
reclustered_bins = directory(os.path.join(OUTDIR,"vaevae/bins_reclustered")),
hmm_markers = os.path.join(OUTDIR,"tmp/markers.hmmout")
#reclustered_bins = directory(os.path.join(OUTDIR,"vaevae/bins_reclustered")),
hmm_markers = os.path.join(OUTDIR,"tmp/markers.hmmout"),
recluster_log = os.path.join(OUTDIR,"tmp/reclustering_finished.log")

resources:
mem = RECLUST_MEM

threads:
int(RECUST_PPN)
int(RECLUST_PPN)

conda:
"taxVamb"
log:
recluster_log = os.path.join(OUTDIR,"tmp/reclustering_finished.log"),
o = os.path.join(OUTDIR,'log','run_reclustering.out'),
e = os.path.join(OUTDIR,'log','run_reclustering.err')
shell:
"""
rm -rf {OUTDIR}/vaevae/bins_reclustered
vamb \
--model reclustering \
--latent_path {input.latents} \
--clusters_path {input.clusters} \
--fasta {input.contigs} \
--rpkm {input.abundance} \
--outdir {output.reclustered_bins} \
--outdir {OUTDIR}/vaevae/bins_reclustered \
--hmmout_path {output.hmm_markers} \
--minfasta {MIN_BIN_SIZE}
touch {log.recluster_log}
touch {output.recluster_log}
"""

# Evaluate in which samples bins were reconstructed
checkpoint samples_with_bins:
input:
os.path.join(OUTDIR,"tmp/vaevae_finished.log")
os.path.join(OUTDIR,"tmp/reclustering_finished.log")
output:
os.path.join(OUTDIR,"tmp/samples_with_bins.txt")
params:
Expand All @@ -462,7 +466,7 @@ checkpoint samples_with_bins:
threads:
1
shell:
"find {OUTDIR}/vaevae/bins/*/ -type d ! -empty |sed 's=.*bins/==g' |sed 's=/==g' > {output}"
"find {OUTDIR}/vaevae/bins_reclustered/bins/*/ -type d ! -empty | sed 's=.*bins/==g' |sed 's=/==g' > {output}"


def samples_with_bins_f(wildcards):
Expand Down Expand Up @@ -491,7 +495,7 @@ rule run_checkm2_per_sample_all_bins:
conda:
"checkm2"
shell:
"checkm2 predict --threads {threads} --input {OUTDIR}/vaevae/bins/{wildcards.sample}/*.fna --output-directory {OUTDIR}/tmp/checkm2_all/{wildcards.sample} > {output.out_log_file}"
"checkm2 predict --threads {threads} --input {OUTDIR}/vaevae/bins_reclustered/bins/{wildcards.sample}/*.fna --output-directory {OUTDIR}/tmp/checkm2_all/{wildcards.sample} > {output.out_log_file}"

# this rule will be executed when all CheckM2 runs per sample finish, so it can move to the next step
rule cat_checkm2_all:
Expand Down Expand Up @@ -539,7 +543,7 @@ rule create_cluster_scores_bin_path_dictionaries:
e=os.path.join(OUTDIR,'log','cs_bp_dicts.err')

shell:
"python {params.path} --s {OUTDIR}/tmp/checkm2_all --b {OUTDIR}/vaevae/bins --cs_d {output.cluster_score_dict_path} --bp_d {output.bin_path_dict_path} "
"python {params.path} --s {OUTDIR}/tmp/checkm2_all --b {OUTDIR}/vaevae/bins_reclustered/bins --cs_d {output.cluster_score_dict_path} --bp_d {output.bin_path_dict_path} "



Expand All @@ -548,12 +552,10 @@ rule sym_NC_bins:
input:
cluster_score_dict_path = os.path.join(OUTDIR,"tmp/cs_d.json"),
bin_path_dict_path = os.path.join(OUTDIR,"tmp/bp_d.json")

output:
nc_bins_dir = os.path.join(OUTDIR,"Final_bins")

nc_bins_dir = directory(os.path.join(OUTDIR,"Final_bins"))
params:
path = UPDATE os.path.join(SNAKEDIR, "src", "symlink_nc_bins.py"),
path = os.path.join(SNAKEDIR, "src", "symlink_nc_bins.py"),
walltime = "86400",
nodes = "1",
ppn = "4"
Expand All @@ -570,38 +572,38 @@ rule sym_NC_bins:

shell:
"""
python {params.path} --cs_d {input.cluster_score_dict_path} --bp_d {input.bin_path_dict_path} --min_comp {MIN_COMP} --max_cont {MAX_CONT}
python {params.path} --cs_d {input.cluster_score_dict_path} --bp_d {input.bin_path_dict_path} --min_comp {MIN_COMP} --max_cont {MAX_CONT} -o {output.nc_bins_dir}
touch {log.ncs_syml}
"""


# Write final clusters from the Final_bins folder
rule write_clusters_from_nc_folders:
input:
ncs_syml = os.path.join(OUTDIR,'tmp','sym_final_bins_finished.log')
# # Write final clusters from the Final_bins folder # NC_bins contains symlinks and bins from all samples in the same dir
# rule write_clusters_from_nc_folders:
# input:
# ncs_syml = os.path.join(OUTDIR,'tmp','sym_final_bins_finished.log')

output:
os.path.join(OUTDIR,"Final_clusters.tsv")
log:
log_fin = os.path.join(OUTDIR,"tmp/final_taxvamb_clusters_written.log"),
o=os.path.join(OUTDIR,'log','create_final_clusters.out'),
e=os.path.join(OUTDIR,'log','create_final_clusters.err')
# output:
# os.path.join(OUTDIR,"Final_clusters.tsv")
# log:
# log_fin = os.path.join(OUTDIR,"tmp/final_taxvamb_clusters_written.log"),
# o=os.path.join(OUTDIR,'log','create_final_clusters.out'),
# e=os.path.join(OUTDIR,'log','create_final_clusters.err')

params:
path = os.path.join(SNAKEDIR, "src", "write_clusters_from_final_bins.sh"),
walltime = "86400",
nodes = "1",
ppn = "1"
resources:
mem = "1GB"
threads:
1
conda:
"taxVamb"
# params:
# path = os.path.join(SNAKEDIR, "src", "write_clusters_from_final_bins.sh"),
# walltime = "86400",
# nodes = "1",
# ppn = "1"
# resources:
# mem = "1GB"
# threads:
# 1
# conda:
# "taxVamb"

shell:
"sh {params.path} -d {OUTDIR}/Final_bins -o {output} ;"
"touch {log.log_fin} "
# shell:
# "sh {params.path} -d {OUTDIR}/Final_bins -o {output} ;"
# "touch {log.log_fin} "


# Rename and move some files and folders
Expand All @@ -619,8 +621,8 @@ rule workflow_finished:
threads:
1
log:
o=os.path.join(OUTDIR,'log','workflow_finished.out'),
e=os.path.join(OUTDIR,'log','workflow_finished.err')
o=os.path.join(OUTDIR,'log','workflow_finished_taxvamb.out'),
e=os.path.join(OUTDIR,'log','workflow_finished_taxvamb.err')
shell:
"""
#rm -r {OUTDIR}/tmp/checkm2_all/*/protein_files
Expand Down

0 comments on commit 2a08c95

Please sign in to comment.