Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 106 additions & 70 deletions qp_klp/Assays.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
from sequence_processing_pipeline.NuQCJob import NuQCJob
from sequence_processing_pipeline.FastQCJob import FastQCJob
from sequence_processing_pipeline.GenPrepFileJob import GenPrepFileJob
from sequence_processing_pipeline.MultiQCJob import MultiQCJob
import pandas as pd
from json import dumps
from collections import defaultdict
import re


ASSAY_NAME_NONE = "Assay"
Expand Down Expand Up @@ -163,37 +165,6 @@ def post_process_raw_fastq_output(self):
projects = [x['project_name'] for x in projects]

for project_name in projects:
# copy the files from ConvertJob output to faked NuQCJob output
# folder: $WKDIR/$RUN_ID/NuQCJob/$PROJ_NAME/amplicon
output_folder = join(self.pipeline.output_path,
'NuQCJob',
project_name,
# for legacy purposes, output folders are
# either 'trimmed_sequences', 'amplicon', or
# 'filtered_sequences'. Hence, this folder
# is not defined using AMPLICON_TYPE as that
# value may or may not equal the needed value.
'amplicon')

makedirs(output_folder)

# get list of all raw output files to be copied.
job_output = [join(self.raw_fastq_files_path, x) for x in
listdir(self.raw_fastq_files_path)]

job_output = [x for x in job_output if isfile(x)]
job_output = [x for x in job_output if x.endswith('fastq.gz')]

# NB: In this case, ensure the ONLY files that get copied are
# Undetermined files, and this is what we expect for 16S runs.
job_output = [x for x in job_output if
basename(x).startswith('Undetermined')]

# copy the file
for fastq_file in job_output:
new_path = join(output_folder, basename(fastq_file))
copyfile(fastq_file, new_path)

# FastQC expects the ConvertJob output to also be organized by
# project. Since this would entail running the same ConvertJob
# multiple times on the same input with just a name-change in
Expand All @@ -212,28 +183,66 @@ def post_process_raw_fastq_output(self):
new_path = join(output_folder, basename(raw_fastq_file))
copyfile(raw_fastq_file, new_path)

# copy the files from ConvertJob output to faked NuQCJob output
# folder: $WKDIR/$RUN_ID/NuQCJob/$PROJ_NAME/amplicon
output_folder = join(self.pipeline.output_path,
'NuQCJob',
project_name,
# for legacy purposes, output folders are
# either 'trimmed_sequences', 'amplicon', or
# 'filtered_sequences'. Hence, this folder
# is not defined using AMPLICON_TYPE as that
# value may or may not equal the needed value.
'amplicon')
makedirs(output_folder)

# copy the file
for fastq_file in job_output:
new_path = join(output_folder, basename(fastq_file))
copyfile(fastq_file, new_path)

def generate_reports(self):
config = self.pipeline.get_software_configuration('fastqc')
job = FastQCJob(self.pipeline.run_dir,
self.pipeline.output_path,
self.raw_fastq_files_path,
join(self.pipeline.output_path, 'NuQCJob'),
config['nprocs'],
config['nthreads'],
config['fastqc_executable_path'],
config['modules_to_load'],
self.master_qiita_job_id,
config['queue'],
config['nodes'],
config['wallclock_time_in_minutes'],
config['job_total_memory_limit'],
config['job_pool_size'],
config['multiqc_config_file_path'],
config['job_max_array_length'],
True)
fcjob = FastQCJob(self.pipeline.run_dir,
self.pipeline.output_path,
self.raw_fastq_files_path,
join(self.pipeline.output_path, 'NuQCJob'),
config['nprocs'],
config['nthreads'],
config['fastqc_executable_path'],
config['modules_to_load'],
self.master_qiita_job_id,
config['queue'],
config['nodes'],
config['wallclock_time_in_minutes'],
config['job_total_memory_limit'],
config['job_pool_size'],
config['job_max_array_length'],
True)
mqcjob = MultiQCJob(self.pipeline.run_dir,
self.pipeline.output_path,
self.raw_fastq_files_path,
join(self.pipeline.output_path, 'NuQCJob'),
config['nprocs'],
config['nthreads'],
config['multiqc_executable_path'],
config['modules_to_load'],
self.master_qiita_job_id,
config['queue'],
config['nodes'],
config['wallclock_time_in_minutes'],
config['job_total_memory_limit'],
config['job_pool_size'],
join(self.pipeline.output_path, 'FastQCJob'),
config['job_max_array_length'],
config['multiqc_config_file_path'],
True)

if 'FastQCJob' not in self.skip_steps:
job.run(callback=self.job_callback)
fcjob.run(callback=self.job_callback)

if 'MultiQCJob' not in self.skip_steps:
mqcjob.run(callback=self.job_callback)

def generate_prep_file(self):
config = self.pipeline.get_software_configuration('seqpro')
Expand Down Expand Up @@ -386,30 +395,49 @@ def quality_control(self):

def generate_reports(self):
config = self.pipeline.get_software_configuration('fastqc')
job = FastQCJob(self.pipeline.run_dir,
self.pipeline.output_path,
self.raw_fastq_files_path,
join(self.pipeline.output_path, 'NuQCJob'),
config['nprocs'],
config['nthreads'],
config['fastqc_executable_path'],
config['modules_to_load'],
self.master_qiita_job_id,
config['queue'],
config['nodes'],
config['wallclock_time_in_minutes'],
config['job_total_memory_limit'],
config['job_pool_size'],
config['multiqc_config_file_path'],
config['job_max_array_length'],
False)
fqjob = FastQCJob(self.pipeline.run_dir,
self.pipeline.output_path,
self.raw_fastq_files_path,
join(self.pipeline.output_path, 'NuQCJob'),
config['nprocs'],
config['nthreads'],
config['fastqc_executable_path'],
config['modules_to_load'],
self.master_qiita_job_id,
config['queue'],
config['nodes'],
config['wallclock_time_in_minutes'],
config['job_total_memory_limit'],
config['job_pool_size'],
config['job_max_array_length'],
False)
mqcjob = MultiQCJob(self.pipeline.run_dir,
self.pipeline.output_path,
self.raw_fastq_files_path,
join(self.pipeline.output_path, 'NuQCJob'),
config['nprocs'],
config['nthreads'],
config['multiqc_executable_path'],
config['modules_to_load'],
self.master_qiita_job_id,
config['queue'],
config['nodes'],
config['wallclock_time_in_minutes'],
config['job_total_memory_limit'],
config['job_pool_size'],
join(self.pipeline.output_path, 'FastQCJob'),
config['job_max_array_length'],
config['multiqc_config_file_path'],
False)

if 'FastQCJob' not in self.skip_steps:
job.run(callback=self.job_callback)
fqjob.run(callback=self.job_callback)
if 'MultiQCJob' not in self.skip_steps:
mqcjob.run(callback=self.job_callback)

failed_samples = job.audit(self.pipeline.get_sample_ids())
failed_samples = fqjob.audit(self.pipeline.get_sample_ids())
if hasattr(self, 'fsr'):
self.fsr.write(failed_samples, job.__class__.__name__)
self.fsr.write(failed_samples, fqjob.__class__.__name__)
return failed_samples

def generate_prep_file(self):
Expand Down Expand Up @@ -534,12 +562,20 @@ def execute_pipeline(self):
prep_paths = []
self.prep_file_paths = {}

rematch = re.compile(
r"(?P<runid>[a-zA-Z0-9_-]+)\.(?P<qname>[a-zA-Z0-9_]+)"
r"(?P<qid>[0-9]{5,6})\..\.tsv")

for root, dirs, files in walk(tmp):
for _file in files:
# breakup the prep-info-file into segments
# (run-id, project_qid, other) and cleave
# the qiita-id from the project_name.
qid = _file.split('.')[1].split('_')[-1]
rer = rematch.match(_file)
if rer is None:
continue

_, _, qid = rer.groups()

if qid not in self.prep_file_paths:
self.prep_file_paths[qid] = []
Expand Down
6 changes: 6 additions & 0 deletions qp_klp/Protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ def get_config(command):

return failed_samples

def integrate_results(self):
pass

def generate_sequence_counts(self):
pass


class TellSeq(Protocol):
protocol_type = PROTOCOL_NAME_TELLSEQ
Expand Down
32 changes: 10 additions & 22 deletions qp_klp/StandardAmpliconWorkflow.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from .Protocol import Illumina
from os.path import join, abspath, exists
from os.path import join, abspath
from os import walk
from shutil import rmtree
from sequence_processing_pipeline.Pipeline import Pipeline
from .Assays import Amplicon
from .Assays import ASSAY_NAME_AMPLICON
from .Workflows import Workflow
import re


class StandardAmpliconWorkflow(Workflow, Amplicon, Illumina):
Expand Down Expand Up @@ -53,25 +53,6 @@ def __init__(self, **kwargs):

self.update = kwargs['update_qiita']

def determine_steps_to_skip(self):
out_dir = self.pipeline.output_path

# Although amplicon runs don't perform host-filtering,
# the output from ConvertJob is still copied and organized into
# a form suitable for FastQCJob to process. Hence the presence or
# absence of a 'NuQCJob' directory is still a thing (for now)
directories_to_check = ['ConvertJob', 'NuQCJob',
'FastQCJob', 'GenPrepFileJob']

for directory in directories_to_check:
if exists(join(out_dir, directory)):
if exists(join(out_dir, directory, 'job_completed')):
# this step completed successfully.
self.skip_steps.append(directory)
else:
# work stopped before this job could be completed.
rmtree(join(out_dir, directory))

def execute_pipeline(self):
'''
Executes steps of pipeline in proper sequence.
Expand Down Expand Up @@ -124,13 +105,20 @@ def execute_pipeline(self):

prep_paths = []
self.prep_file_paths = {}
rematch = re.compile(
r"(?P<runid>[a-zA-Z0-9_-]+)\.(?P<qname>[a-zA-Z0-9_]+)"
r"(?P<qid>[0-9]{5,6})\..\.tsv")

for root, dirs, files in walk(tmp):
for _file in files:
# breakup the prep-info-file into segments
# (run-id, project_qid, other) and cleave
# the qiita-id from the project_name.
qid = _file.split('.')[1].split('_')[-1]
rer = rematch.match(_file)
if rer is None:
continue

_, _, qid = rer.groups()

if qid not in self.prep_file_paths:
self.prep_file_paths[qid] = []
Expand Down
17 changes: 0 additions & 17 deletions qp_klp/StandardMetagenomicWorkflow.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
from .Protocol import Illumina
from os.path import join, exists
from shutil import rmtree
from sequence_processing_pipeline.Pipeline import Pipeline
from .Assays import Metagenomic
from .Assays import ASSAY_NAME_METAGENOMIC
Expand Down Expand Up @@ -51,18 +49,3 @@ def __init__(self, **kwargs):
"type bool")

self.update = kwargs['update_qiita']

def determine_steps_to_skip(self):
out_dir = self.pipeline.output_path

directories_to_check = ['ConvertJob', 'NuQCJob',
'FastQCJob', 'GenPrepFileJob']

for directory in directories_to_check:
if exists(join(out_dir, directory)):
if exists(join(out_dir, directory, 'job_completed')):
# this step completed successfully.
self.skip_steps.append(directory)
else:
# work stopped before this job could be completed.
rmtree(join(out_dir, directory))
17 changes: 0 additions & 17 deletions qp_klp/StandardMetatranscriptomicWorkflow.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
from .Protocol import Illumina
from os.path import join, exists
from shutil import rmtree
from sequence_processing_pipeline.Pipeline import Pipeline
from .Assays import Metatranscriptomic
from .Assays import ASSAY_NAME_METATRANSCRIPTOMIC
Expand Down Expand Up @@ -52,18 +50,3 @@ def __init__(self, **kwargs):
"type bool")

self.update = kwargs['update_qiita']

def determine_steps_to_skip(self):
out_dir = self.pipeline.output_path

directories_to_check = ['ConvertJob', 'NuQCJob',
'FastQCJob', 'GenPrepFileJob']

for directory in directories_to_check:
if exists(join(out_dir, directory)):
if exists(join(out_dir, directory, 'job_completed')):
# this step completed successfully.
self.skip_steps.append(directory)
else:
# work stopped before this job could be completed.
rmtree(join(out_dir, directory))
25 changes: 4 additions & 21 deletions qp_klp/TellseqMetagenomicWorkflow.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from .Protocol import TellSeq
from os.path import join, exists
from sequence_processing_pipeline.Pipeline import Pipeline, InstrumentUtils
from .Assays import Metagenomic
from .Assays import ASSAY_NAME_METAGENOMIC
Expand Down Expand Up @@ -44,6 +43,10 @@ def __init__(self, **kwargs):
self.lane_number = self.kwargs['lane_number']
self.is_restart = bool(self.kwargs['is_restart'])

self.directories_to_check = [
'TellReadJob', 'TRIntegrateJob', 'NuQCJob', 'FastQCJob',
'SeqCountsJob', 'GenPrepFileJob']

if self.is_restart is True:
self.determine_steps_to_skip()

Expand All @@ -55,23 +58,3 @@ def __init__(self, **kwargs):
"type bool")

self.update = kwargs['update_qiita']

def determine_steps_to_skip(self):
out_dir = self.pipeline.output_path

directories_to_check = ['TellReadJob', 'TRIntegrateJob', 'NuQCJob',
'FastQCJob', 'SeqCountsJob', 'GenPrepFileJob']

for directory in directories_to_check:
if exists(join(out_dir, directory)):
if exists(join(out_dir, directory, 'job_completed')):
# this step completed successfully.
self.skip_steps.append(directory)
if exists(join(out_dir, directory,
'post_processing_completed')):
self.skip_steps.append('TRIJ_Post_Processing')
else:
# work stopped before this job could be completed.
msg = "%s doesn't have job completed" % join(out_dir,
directory)
raise ValueError(msg)
Loading