Skip to content

Commit

Permalink
Merge pull request #4 from TalusBio/generalize
Browse files Browse the repository at this point in the history
  • Loading branch information
wfondrie committed Jan 11, 2022
2 parents f4f9971 + b37444c commit 8517d52
Show file tree
Hide file tree
Showing 17 changed files with 201 additions and 85 deletions.
31 changes: 31 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
name: tests

on:
push:
branches: [ main ]
pull_request:
branches: [ main ]

jobs:
build:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v2
- name: Set up Python 3.10
uses: actions/setup-python@v2
with:
python-version: "3.10"

- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install pytest
curl -s https://get.nextflow.io | bash
chmod u+x nextflow
mkdir bin && mv nextflow bin
echo "$(pwd)/bin" >> $GITHUB_PATH
- name: Run tests
run: |
pytest
134 changes: 80 additions & 54 deletions main.nf
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/usr/bin/env nextflow
include { msconvert } from "./modules/msconvert.nf"
include { msconvert as msconvert_narrow } from "./modules/msconvert.nf"
include { msconvert as msconvert_wide } from "./modules/msconvert.nf"
include { unique_peptides_proteins } from "./modules/post_processing.nf"

nextflow.enable.dsl = 2
Expand All @@ -8,8 +9,7 @@ FILTER = "NO_FILE"

process run_encyclopedia_local {
echo true
publishDir "${params.experimentBucket}/${params.experimentName}/encyclopedia", mode: "copy"
storeDir "${params.cacheBucket}/${params.experimentName}"
publishDir params.publish_dir, mode: "copy"

input:
path mzml_gz_file
Expand All @@ -18,30 +18,40 @@ process run_encyclopedia_local {

output:
tuple(
path("${mzml_gz_file.name.replaceAll(/\.mzML\.gz/, "")}*.elib"),
path("${mzml_gz_file.name.replaceAll(/\.mzML\.gz/, "")}*.dia"),
path("${mzml_gz_file.name.replaceAll(/\.mzML\.gz/, "")}*{features,encyclopedia,decoy}.txt"),
path("${mzml_gz_file.name.replaceAll(/\.mzML\.gz/, "")}*.log"),
path("${mzml_gz_file.baseName}.elib"),
path("${file(mzml_gz_file.baseName).baseName}.dia"),
path("${mzml_gz_file.baseName}.{features,encyclopedia,encyclopedia.decoy}.txt"),
path("logs/${mzml_gz_file.baseName}.local.log"),
)

script:
def mzml_file = mzml_gz_file.name.replaceAll(/\.gz/, "")
"""
mkdir logs
gzip -df ${mzml_gz_file}
java -Djava.awt.headless=true ${params.encyclopedia.memory} \\
-jar /code/encyclopedia-\$VERSION-executable.jar \\
-i ${mzml_file} \\
-i ${mzml_gz_file.baseName} \\
-f ${fasta_file} \\
-l ${library_file} \\
${params.encyclopedia.local_options} \\
&> ${mzml_file}.local.log
| tee logs/${mzml_gz_file.baseName}.local.log
"""

stub:
"""
mkdir logs
touch ${mzml_gz_file.baseName}.elib
touch ${file(mzml_gz_file.baseName).baseName}.dia
touch ${mzml_gz_file.baseName}.features.txt
touch ${mzml_gz_file.baseName}.encyclopedia.txt
touch ${mzml_gz_file.baseName}.encyclopedia.decoy.txt
touch logs/${mzml_gz_file.baseName}.local.log
"""
}

process run_encyclopedia_global {
echo true
publishDir "${params.experimentBucket}/${params.experimentName}/encyclopedia", mode: "copy"
storeDir "${params.cacheBucket}/${params.experimentName}"
publishDir params.publish_dir, mode: "copy"

input:
path local_files
Expand All @@ -53,12 +63,13 @@ process run_encyclopedia_global {
output:
tuple(
path("result-${output_postfix}*.elib"),
path("result-${output_postfix}*{peptides,proteins}.txt"),
path("result-${output_postfix}*.log")
path("result-${output_postfix}*.{peptides,proteins}.txt"),
path("logs/result-${output_postfix}*.global.log")
)

script:
"""
mkdir logs
find . -type f -name '*.gz' -exec gzip -df {} \\;
java -Djava.awt.headless=true ${params.encyclopedia.memory} \\
-jar /code/encyclopedia-\$VERSION-executable.jar \\
Expand All @@ -68,7 +79,17 @@ process run_encyclopedia_global {
-f ${fasta_file} \\
-l ${library_file} \\
${params.encyclopedia.global_options} \\
&> result-${output_postfix}.global.log
| tee logs/result-${output_postfix}.global.log
"""

stub:
def stem = "result-${output_postfix}"
"""
mkdir logs
touch ${stem}.elib
touch ${stem}.peptides.txt
touch ${stem}.proteins.txt
touch logs/${stem}.global.log
"""
}

Expand All @@ -82,7 +103,14 @@ workflow encyclopedia_narrow {
| flatten
| collect
| set { narrow_local_files }
run_encyclopedia_global(narrow_local_files, mzml_gz_files | collect, dlib, fasta, params.encyclopedia.narrow_lib_postfix)

run_encyclopedia_global(
narrow_local_files,
mzml_gz_files | collect,
dlib,
fasta,
params.encyclopedia.narrow_lib_postfix,
)
| flatten
| filter { it.name =~ /.*elib$/ }
| set { narrow_elib }
Expand All @@ -103,8 +131,15 @@ workflow encyclopedia_wide {
| filter { it.name =~ /.*mzML.elib$/ }
| collect
| unique_peptides_proteins

// Use the local .elib's as an input to the global run
run_encyclopedia_global(wide_local_files | collect, mzml_gz_files | collect, elib, fasta, params.encyclopedia.wide_lib_postfix)
run_encyclopedia_global(
wide_local_files | collect,
mzml_gz_files | collect,
elib,
fasta,
params.encyclopedia.wide_lib_postfix
)
| flatten
| filter { it.name =~ /.*elib$/ }
| set { wide_elib }
Expand All @@ -114,47 +149,38 @@ workflow encyclopedia_wide {

workflow {
// Get .fasta and .dlib from metadata-bucket
fasta = Channel.fromPath("${params.metadataBucket}/${params.encyclopedia.fasta}", checkIfExists: true)
dlib = Channel.fromPath("${params.metadataBucket}/${params.encyclopedia.dlib}", checkIfExists: true)

// Use msconvert on raw files, pass through if mzml .gz files are given
if (params.raw_files) {
raw_files = Channel.fromList(params.raw_files) | map { file("${params.rawBucket}/${it}") }
raw_files
| msconvert
| set { mzml_gz_files }
} else if (params.mzml_gz_files) {
mzml_gz_files = Channel.fromList(params.mzml_gz_files) | map { file("${params.mzmlBucket}/${it}") }
} else {
error "No .raw or .mzML files given. Nothing to do."
fasta = Channel.fromPath(params.encyclopedia.fasta, checkIfExists: true)
dlib = Channel.fromPath(params.encyclopedia.dlib, checkIfExists: true)

// Get the narrow and wide files:
narrow_files = Channel
.fromPath(params.narrow_files, checkIfExists: true)
.splitCsv()
.map { row -> file(row[0]) }

wide_files = Channel
.fromPath(params.wide_files, checkIfExists: true)
.splitCsv()
.map { row -> file(row[0]) }

if ( !narrow_files && !wide_files ) {
error "No raw files were given. Nothing to do."
}

// Join the file keys to get the file type and plit the set of files into narrow and wide.
// Get the mapping from file_key to file_type
file_key_types = Channel.of(params.file_key_types) | splitCsv
mzml_gz_files
| map { mzml_gz_file ->
// Create a file_key based off of the file path
// E.g. mzml-bucket/210308/210308_talus_01.mzML.gz --> 210308_talus_01
def file_key = mzml_gz_file.getBaseName().tokenize(".")[0]
return tuple(file_key, mzml_gz_file)
}
| join(file_key_types)
| branch { file_key, mzml_gz_file, file_type ->
narrow: file_type == "Narrow DIA"
return mzml_gz_file
wide: file_type == "Wide DIA"
return mzml_gz_file
}
| set { run_files }

// Run encyclopedia
encyclopedia_narrow(run_files.narrow, dlib, fasta)
// If no narrow files are given the output chr-elib will be empty and we use the dlib instead.
// Convert raw files to gzipped mzML.
narrow_files | msconvert_narrow | set { narrow_mzml_files }
wide_files | msconvert_wide | set { wide_mzml_files }

// Build a chromatogram library with EncyclopeDIA
encyclopedia_narrow(narrow_mzml_files, dlib, fasta)

// If no narrow file are given, use the dlib instead.
encyclopedia_narrow.out
.ifEmpty(file("${params.metadataBucket}/${params.encyclopedia.dlib}"))
.ifEmpty(file(params.encyclopedia.dlib))
.set { chr_elib }
encyclopedia_wide(run_files.wide, chr_elib, fasta)

// Perform quant runs on wide window files.
encyclopedia_wide(wide_mzml_files, chr_elib, fasta)
}

workflow.onComplete {
Expand Down
32 changes: 25 additions & 7 deletions modules/msconvert.nf
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ nextflow.enable.dsl = 2

process run_msconvert {
echo true
publishDir "${params.mzmlBucket}/${outputDir}", mode: "copy"
storeDir "${params.cacheBucket}/${outputDir}"
publishDir "${params.mzml_dir}/${outputDir}", mode: "copy"

input:
tuple path(raw_input), val(outputDir)
output:
path("${raw_input.name.replaceAll(/\.raw/, '.mzML.gz')}")
path("${raw_input.baseName}.mzML.gz")

script:
"""
wine msconvert \\
Expand All @@ -20,15 +20,33 @@ process run_msconvert {
${params.msconvert.filters} \\
${raw_input}
"""

stub:
"""
touch ${raw_input.baseName}.mzML.gz
"""
}

workflow msconvert {
take:
raw_files
main:
raw_files
| map { it -> [it, it.getParent().getBaseName()] }
| run_msconvert
| map { raw -> [raw, raw.getParent().getBaseName()] }
| branch {
mzml_present: file("${params.mzml_dir}/${it[1]}/${it[0].baseName}.mzML.gz").exists()
return "${params.mzml_dir}/${it[1]}/${it[0].baseName}.mzML.gz"
mzml_absent: !file("${params.mzml_dir}/${it[1]}/${it[0].baseName}.mzML.gz").exists()
return it
}
| set { staging }

run_msconvert(staging.mzml_absent)
| concat(staging.mzml_present)
| set { results }

results.view()

emit:
run_msconvert.out
}
results
}
18 changes: 14 additions & 4 deletions modules/post_processing.nf
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,29 @@ nextflow.enable.dsl = 2

process unique_peptides_proteins {
echo true
publishDir "${params.experimentBucket}/${params.experimentName}/encyclopedia", mode: "copy"
publishDir params.publish_dir, mode: "copy"

input:
path elib_files
output:
path("unique_peptides_proteins.csv")

script:
"""
python3 /app/src/unique_peptides_proteins.py -g "./*.mzML.elib" -t encyclopedia
"""

stub:
"""
touch unique_peptides_proteins.csv
"""
}

workflow {
files = Channel.fromPath("experiment-bucket/blue-sparrow/encyclopedia/*")
files | flatten | filter { it.name =~ /.*mzML.elib$/ } | collect | unique_peptides_proteins | view
}
files = Channel.fromPath("${params.publish_dir}/*")
| flatten
| filter { it.name =~ /.*mzML.elib$/ }
| collect
| unique_peptides_proteins
| view
}
28 changes: 8 additions & 20 deletions nextflow.config
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
// Profiles
profiles {
standard {
params.experimentBucket = 'experiment-bucket'
params.metadataBucket = 'metadata-bucket'
params.rawBucket = 'raw-bucket'
params.mzmlBucket = 'mzml-bucket'
params.cacheBucket = 'cache-bucket'
process {
executor = 'local'
withName: 'run_encyclopedia_.*' {
Expand All @@ -20,11 +15,6 @@ profiles {
}
}
cloud {
params.experimentBucket = 's3://data-pipeline-experiment-bucket'
params.metadataBucket = 's3://data-pipeline-metadata-bucket'
params.rawBucket = 's3://data-pipeline-raw-bucket'
params.mzmlBucket = 's3://data-pipeline-mzml-bucket'
params.cacheBucket = 's3://terraform-nextflow-cache-bucket'
process {
executor = 'awsbatch'
errorStrategy = 'retry'
Expand Down Expand Up @@ -66,23 +56,21 @@ docker {
params {
experimentName = 'purple-pig'
email = '[email protected]'
publish_dir = ''
mzml_dir = ''
narrow_files = ''
wide_files = ''

file_key_types = ''
raw_files = []
mzml_gz_files = []
//file_key_types = '170823_wide_1,Wide DIA\n170823_wide_2,Wide DIA'
//raw_files = ['170823/170823_wide_1.raw', '170823/170823_wide_2.raw']
//mzml_gz_files = ['mzml-bucket/210308/210308_talus_01.mzML', 'mzml-bucket/210308/210308_talus_02.mzML']

encyclopedia {
fasta = 'uniprot_human_25apr2019.fasta'
dlib = 'uniprot_human_25apr2019.fasta.z2_nce33.dlib'
fasta = ''
dlib = ''
memory = '-Xmx24G'
narrow_lib_postfix = 'chr'
wide_lib_postfix = 'quant'
local_options = ''
global_options = ''
}

msconvert {
verbose = '-v'
gzip = '--gzip'
Expand All @@ -106,4 +94,4 @@ aws {
cliPath = '/home/ec2-user/bin/aws'
jobRole = 'arn:aws:iam::622568582929:role/terraform-nextflow-batch-job-role'
}
}
}
Empty file added tests/data/experiment/a.raw
Empty file.
Empty file added tests/data/experiment/b.raw
Empty file.
Empty file added tests/data/experiment/c.raw
Empty file.
Empty file added tests/data/experiment/d.raw
Empty file.
Empty file added tests/data/experiment/e.raw
Empty file.
Empty file added tests/data/experiment/f.raw
Empty file.
Loading

0 comments on commit 8517d52

Please sign in to comment.