Skip to content

Commit

Permalink
A new, improved Acanthophis (#1)
Browse files Browse the repository at this point in the history
- [x] Full support for DPW2 pipeline
- [x] Hologenomics pipeline
  - [x] Kraken2
  - [x] Kaiju
  - [x] Centrifuge
  - [x] Megahit assembly of unmapped reads
- [x] Unify log and data paths
- [x] ~Cloud profiles via DPW2 work~ To be added later
- [x] New config
- [x] New rl2s.tsv
- [x] Fix bug in khmer, and get kwip working again
- [x] fastqc Pre & post
- [x] multiqc
- [x] qualimap
- [x] QCtype
- [x] theta_prior
- [x] Define regions based on coverage not size
  • Loading branch information
kdm9 authored Sep 28, 2022
2 parents 62ec141 + 63c10f4 commit b003183
Show file tree
Hide file tree
Showing 59 changed files with 1,715 additions and 1,575 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/python-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
python -m build --sdist --wheel --outdir dist/
- name: Publish distribution to Test PyPI
uses: pypa/gh-action-pypi-publish@master
uses: pypa/gh-action-pypi-publish@release/v1
if: "!startsWith(github.ref, 'refs/tags')"
with:
password: ${{ secrets.TEST_PYPI_API_TOKEN }}
Expand Down
29 changes: 23 additions & 6 deletions .github/workflows/run-snakemake.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,47 @@ name: Workflow CI

on:
push:
branches: "**"
branches: "main"
pull_request:
branches: [ main ]
workflow_dispatch:

jobs:
runsnkmk:
run_workflow:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3

- name: setup-mamba
uses: conda-incubator/setup-miniconda@v2
with:
mamba-version: "*"
channels: bioconda,kdm801,conda-forge,defaults
channel-priority: true

- name: Cache generated inputs
id: cache-inputs
uses: actions/cache@v3
with:
path: tests/rawdata
key: rawdata-${{ hashFiles('tests/Snakefile.generate-rawdata') }}

- name: setup and run Snakemake
- name: setup data
shell: bash -el {0}
run: |
mamba info
pushd example
pushd tests
source setup.sh
snakemake --snakefile Snakefile -j 4 --use-conda --conda-frontend mamba
popd
- name: run workflow
shell: bash -el {0}
run: |
pushd tests
source test.sh
popd
- name: cat all log files on failure
if: ${{ failure() }}
run: |
find tests/ -name '*.log' -exec tail -n 1000 {} \;
13 changes: 8 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@

A reusable, comprehensive, opinionated plant variant calling pipeline in Snakemake

Until I write the documentation, please see [the example workflow](example/).
It should contain a fully working example workflow.

![Acanthophis, the most beautiful and badass of snakes](.github/logo.jpg)

## Installation & Use
Expand All @@ -17,10 +14,13 @@ conda activate someproject
# install acanthophis itself
pip install acanthophis

# generate boilerplate
# generate a workspace. This copies all files the workflow will need to your workspace directory.
acanthophis-init /path/to/someproject/

# edit config.yml to suit your project
# edit config.yml to suit your project. Hopefully this config file documents
# all options available in an understandable fashion. If not, please raise an
# issue on github.

vim config.yml

# run snakemake
Expand All @@ -29,6 +29,9 @@ snakemake -j 16 -p --use-conda --conda-frontend mamba --ri
snakemake --profile ./ebio-cluster/
```

Until I write the documentation, please see [the example workflow](example/).
It should contain a fully working example workflow.


## About & Authors

Expand Down
167 changes: 6 additions & 161 deletions acanthophis/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
# Copyright 2016-2022 Kevin Murray/Gekkonid Consulting
#
# This Source Code Form is subject to the terms of the Mozilla Public License,
# v. 2.0. If a copy of the MPL was not distributed with this file, You can
# obtain one at http://mozilla.org/MPL/2.0/.

import csv
from collections import defaultdict
from glob import glob
Expand All @@ -16,14 +22,6 @@

HERE = os.path.abspath(os.path.dirname(__file__))

class __Rules(object):
def __init__(self):
for rulefile in glob(f"{HERE}/rules/*.rules"):
rule = splitext(basename(rulefile))[0]
setattr(self, rule, rulefile)

rules = __Rules()

profiles = {}
for profiledir in glob(f"{HERE}/profiles/*"):
profile = basename(profiledir)
Expand All @@ -32,156 +30,3 @@ def __init__(self):

def get_resource(file):
return f"{HERE}/{file}"

def rule_resources(config, rule, **defaults):
def resource(wildcards, attempt, value, maxvalue):
return int(min(value * 2^(attempt-1), maxvalue))
C = config.get("cluster_resources", {})
maxes = C.get("max_values", {})
global_defaults = C.get("defaults", {})
rules = C.get("rules", {})

values = {}
values.update(global_defaults)
values.update(defaults)
values.update(rules.get(rule, {}))
ret = {}
for res, val in values.items():
if isinstance(val, str):
# the logic below allows restarting with increased resources. If
# the resource's value is string, you can't double it with each
# attempt, so just return it as a constant.
# this is used for things like cluster queues etc.
ret[res] = val
if C.get("DEBUG", False):
print(rule, res, val)
continue
maxval = maxes.get(res, inf)
if C.get("DEBUG", False):
print(rule, res, val, maxval)
ret[res] = partial(resource, value=val, maxvalue=maxval)
return ret


def populate_metadata(config, runlib2samp=None, sample_meta=None, setfile_glob=None):
try:
if runlib2samp is None:
runlib2samp = config["metadata"]["runlib2samp_file"]
if sample_meta is None:
sample_meta = config["metadata"]["sample_meta_file"]
if setfile_glob is None:
setfile_glob = config["metadata"]["setfile_glob"]
except KeyError as exc:
raise ValueError("ERROR: metadata files must be configured in config, or passed to populate_metadata()")
RL2S, S2RL = make_runlib2samp(runlib2samp)
config["RUNLIB2SAMP"] = RL2S
config["SAMP2RUNLIB"] = S2RL
config["SAMPLESETS"] = make_samplesets(runlib2samp, setfile_glob)
if "refs" not in config:
raise RuntimeError("ERROR: reference(s) must be configured in config file")
config["CHROMS"] = make_chroms(config["refs"])
if "varcall" in config:
config["VARCALL_REGIONS"] = {
vc: make_regions(config["refs"], window=config["varcall"]["chunksize"][vc])
for vc in config["varcall"]["chunksize"]
}


def parsefai(fai):
with open(fai) as fh:
for l in fh:
cname, clen, _, _, _ = l.split()
clen = int(clen)
yield cname, clen


def make_regions(rdict, window=1e6, base=1):
window = int(window)
ret = {}
for refname, refbits in rdict.items():
fai = refbits['fasta']+".fai"
windows = []
curwin = []
curwinlen = 0
for cname, clen in parsefai(fai):
for start in range(0, clen, window):
wlen = min(clen - start, window)
windows.append("{}:{:09d}-{:09d}".format(cname, start + base, start+wlen))
ret[refname] = windows
return ret


def make_chroms(rdict):
ret = {}
for refname, refbits in rdict.items():
fai = refbits['fasta']+".fai"
ref = dict()
for cname, clen in parsefai(fai):
ref[cname] = clen
ret[refname] = ref
return ret


def _iter_metadata(s2rl_file):
with open(s2rl_file) as fh:
dialect = "excel"
if s2rl_file.endswith(".tsv"):
dialect = "excel-tab"
for samp in csv.DictReader(fh, dialect=dialect):
yield {k.lower(): v for k, v in samp.items()}


def make_runlib2samp(s2rl_file):
rl2s = {}
s2rl = defaultdict(list)
for run in _iter_metadata(s2rl_file):
if not run["library"] or run["library"].lower().startswith("blank"):
# Skip blanks
continue
if run.get("include", "Y") != "Y":
# Remove non-sequenced ones
continue
rl = (run["run"], run["library"])
samp = run["sample"]
rl2s[rl] = samp
s2rl[samp].append(rl)
return dict(rl2s), dict(s2rl)


def stripext(path, exts=".txt"):
if isinstance(exts, str):
exts = [exts,]
for ext in exts:
if path.endswith(ext):
path = path[:-len(ext)]
return path


def make_samplesets(s2rl_file, setfile_glob):
ssets = defaultdict(list)
everything = set()
for setfile in glob(setfile_glob):
setname = stripext(basename(setfile), ".txt")
with open(setfile) as fh:
samples = [x.strip() for x in fh]
ssets[setname] = samples
everything.update(samples)
ssets["all_samples"] = everything

if not os.path.exists("data/samplelists"):
os.makedirs("data/samplelists", exist_ok=True)
with open("data/samplelists/GENERATED_FILES_DO_NOT_EDIT", "w") as fh:
print("you're probably looking for", setfile_glob, file=fh)
for setname, setsamps in ssets.items():
fname = "data/samplelists/{}.txt".format(setname)
try:
with open(fname) as fh:
currsamps = set([l.strip() for l in fh])
except IOError:
currsamps = set()
if set(setsamps) != currsamps:
with open(fname, "w") as fh:
print("WARNING: updating sample sets, this will trigger reruns", setname, file=stderr)
for s in sorted(setsamps):
print(s, file=fh)
return {n: list(sorted(set(s))) for n, s in ssets.items()}
15 changes: 14 additions & 1 deletion acanthophis/cmd.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
# Copyright 2016-2022 Kevin Murray/Gekkonid Consulting
#
# This Source Code Form is subject to the terms of the Mozilla Public License,
# v. 2.0. If a copy of the MPL was not distributed with this file, You can
# obtain one at http://mozilla.org/MPL/2.0/.

import acanthophis
import argparse
import shutil
Expand Down Expand Up @@ -32,6 +38,7 @@ def prompt_yn(message, default=False):
pass
return res


def init():
"""acanthophis-init command entry point"""
ap = argparse.ArgumentParser(description="Initialise an Acanthophis analysis directory", epilog=CLIDOC)
Expand All @@ -55,10 +62,16 @@ def init():
exit(0)

template_dir = acanthophis.get_resource("template/")
rules_dir = acanthophis.get_resource("rules/")
envs_dir = acanthophis.get_resource("rules/envs/")
if args.dryrun:
print(f"cp -r {template_dir} {args.destdir}")
elif args.force or args.yes or prompt_yn(f"cp -r {template_dir} -> {args.destdir}?"):
print(f"cp -r {rules_dir} {args.destdir}/")
print(f"cp -r {envs_dir} {args.destdir}/rules")
elif args.force or args.yes or prompt_yn(f"cp -r {template_dir} -> {args.destdir}? (WARNING: make sure you have git add'd all files as they will be overwritten) "):
shutil.copytree(template_dir, args.destdir, dirs_exist_ok=True)
shutil.copytree(rules_dir, args.destdir + "/rules", dirs_exist_ok=True)
shutil.copytree(envs_dir, args.destdir + "/rules/envs", dirs_exist_ok=True)

for profile in args.cluster_profile:
if profile not in acanthophis.profiles:
Expand Down
2 changes: 1 addition & 1 deletion acanthophis/profiles/ebio-cluster/cluster.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
__default__:
output: "data/log/cluster/"
output: "output/log/cluster/"
DEBUG: True
22 changes: 10 additions & 12 deletions acanthophis/profiles/ebio-cluster/config.yaml
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
cluster: "ebio-cluster/jobsubmit"
cluster-config: "ebio-cluster/cluster.yml"
cluster-status: "ebio-cluster/jobstatus"
jobscript: "ebio-cluster/jobscript.sh"
jobs: 100
#immediate-submit: true
cluster: "profiles/ebio-cluster/jobsubmit"
cluster-config: "profiles/ebio-cluster/cluster.yml"
cluster-status: "profiles/ebio-cluster/jobstatus"
cluster-cancel: "qdel"
jobscript: "profiles/ebio-cluster/jobscript.sh"
jobs: 384
verbose: false
#notemp: true
use-conda: true
conda-frontend: mamba
rerun-incomplete: true
keep-going: true
nolock: true
max-jobs-per-second: 5
max-status-checks-per-second: 1
latency-wait: 60
local-cores: 1
max-status-checks-per-second: 5
latency-wait: 300
local-cores: 64
restart-times: 3
scheduler: greedy
printshellcmds: true
configfiles: "profiles/ebio-cluster/resources.yml"
2 changes: 1 addition & 1 deletion acanthophis/profiles/ebio-cluster/jobscript.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/bin/bash -l
# properties = {properties}

test -f ~/.bash_env && source ~/.bash_env
set -ueo pipefail
{exec_job}
11 changes: 10 additions & 1 deletion acanthophis/profiles/ebio-cluster/jobstatus
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,18 @@ qstat_status="$(qstat -u '*' 2>/dev/null | awk '$1 == "'"$1"'"{print $5}')"
if [ -z "$qstat_status" ]
then
# job has finished
qacct_errcode=$(qacct -j "$1" | awk '$1 == "exit_status"{print $2}')
qacct_errcode=$(qacct -j "$1" 2>/dev/null | awk '$1 == "exit_status"{print $2}')
#echo "qacct status $qacct_errcode" >&2
# it can take a while for jobs to be visible to qacct. We wait for valid output from qacct before proceeding.
while [ -z "$qacct_errcode" ]
do
#echo "No post-job data for $1 yet" >&2
sleep 10
qacct_errcode=$(qacct -j "$1" 2>/dev/null | awk '$1 == "exit_status"{print $2}')
done
if [ "$qacct_errcode" -ne 0 ]
then
echo "job $1 failed: exit status $qacct_errcode" >&2
echo failed
else
echo success
Expand All @@ -19,6 +27,7 @@ else
# running or errored
if [[ "$qstat_status" =~ E ]]
then
echo "job $1 failed: qstat status $qstat_status" >&2
echo failed
else
echo running
Expand Down
Loading

0 comments on commit b003183

Please sign in to comment.