-
Notifications
You must be signed in to change notification settings - Fork 4
test full runs in WorkflowFactory.py #131
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 31 commits
c66c80e
3443a82
1ea19c6
c505773
db3258d
23d428d
026da91
e0bb95c
cff7adb
7b71a72
a886761
fb190ab
e2d236b
980d35e
1ce4e3d
ea9b962
e77048c
2faf214
2af7794
43e0395
6c32c70
15a84b0
88e3b70
dc5b1d0
3842dc6
68ecebc
5d89746
9d34625
b368754
62f3ab8
c0d2820
3ee3e49
7f2b47a
110aeed
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,7 +8,6 @@ | |
| import pandas as pd | ||
| from json import dumps | ||
| from collections import defaultdict | ||
| import re | ||
|
|
||
|
|
||
| ASSAY_NAME_NONE = "Assay" | ||
|
|
@@ -144,6 +143,141 @@ def _generate_artifact_name(self, prep_file_path): | |
| # this is a normal pre-prep or sample-sheet. | ||
| return (a_name, False) | ||
|
|
||
| def post_process_raw_fastq_output(self): | ||
| """This is an Amplicon specific method, see that object for | ||
| more info | ||
| """ | ||
| pass | ||
|
|
||
| def quality_control(self): | ||
| """This is an MetaOmic specific method, see that object for | ||
| more info | ||
| """ | ||
| pass | ||
|
|
||
| def execute_pipeline(self): | ||
| ''' | ||
| Executes steps of pipeline in proper sequence. | ||
| :return: None | ||
| ''' | ||
| if not self.is_restart: | ||
| self.pre_check() | ||
|
|
||
| # this is performed even in the event of a restart. | ||
| self.generate_special_map() | ||
|
|
||
| # even if a job is being skipped, it's being skipped because it was | ||
| # determined that it already completed successfully. Hence, | ||
| # increment the status because we are still iterating through them. | ||
|
|
||
| self.update_status("Converting data", 1, 9) | ||
| if "ConvertJob" not in self.skip_steps: | ||
| # converting raw data to fastq depends heavily on the instrument | ||
| # used to generate the run_directory. Hence this method is | ||
| # supplied by the instrument mixin. | ||
| self.convert_raw_to_fastq() | ||
| self.integrate_results() | ||
| self.generate_sequence_counts() | ||
|
|
||
| self.update_status("QC-ing reads", 2, 9) | ||
| if "NuQCJob" not in self.skip_steps: | ||
| # there is no failed samples reporting for amplicon runs. | ||
| self.post_process_raw_fastq_output() | ||
| self.quality_control() | ||
|
|
||
| self.update_status("Generating reports", 3, 9) | ||
| if "FastQCJob" not in self.skip_steps: | ||
| # reports are currently implemented by the assay mixin. This is | ||
| # only because metagenomic runs currently require a failed-samples | ||
| # report to be generated. This is not done for amplicon runs since | ||
| # demultiplexing occurs downstream of SPP. | ||
| self.generate_reports() | ||
|
|
||
| self.update_status("Generating preps", 4, 9) | ||
| if "GenPrepFileJob" not in self.skip_steps: | ||
| # preps are currently associated with array mixin, but only | ||
| # because there are currently some slight differences in how | ||
| # FastQCJob gets instantiated(). This could get moved into a | ||
|
||
| # shared method, but probably still in Assay. | ||
| self.generate_prep_file() | ||
|
|
||
| # moved final component of genprepfilejob outside of object. | ||
| # obtain the paths to the prep-files generated by GenPrepFileJob | ||
| # w/out having to recover full state. | ||
| tmp = join(self.pipeline.output_path, 'GenPrepFileJob', 'PrepFiles') | ||
|
|
||
| self.has_replicates = False | ||
|
|
||
| prep_paths = [] | ||
| self.prep_file_paths = {} | ||
|
|
||
| for root, dirs, files in walk(tmp): | ||
| for _file in files: | ||
| # we are looing for .tsv files and we are only interested | ||
| # in the string after the last _, which is the study_id | ||
| if not _file.endswith('.tsv'): | ||
| continue | ||
| # continue if no underscore | ||
| chunks = _file.rsplit('_', 1) | ||
| if len(chunks) <= 1: | ||
| continue | ||
| # continue if no int after . | ||
| qid = chunks[-1].split('.')[0] | ||
| if not qid.isnumeric(): | ||
| continue | ||
| if qid not in self.prep_file_paths: | ||
| self.prep_file_paths[qid] = [] | ||
|
|
||
| _path = abspath(join(root, _file)) | ||
| prep_paths.append(_path) | ||
| self.prep_file_paths[qid].append(_path) | ||
|
|
||
| for _dir in dirs: | ||
| if _dir == '1': | ||
| # if PrepFiles contains the '1' directory, then it's a | ||
| # given that this sample-sheet contains replicates. | ||
| self.has_replicates = True | ||
|
|
||
| # currently imported from Assay although it is a base method. it | ||
| # could be imported into Workflows potentially, since it is a post- | ||
| # processing step. All pairings of assay and instrument type need to | ||
| # generate prep-info files in the same format. | ||
| self.overwrite_prep_files(prep_paths) | ||
|
|
||
| # for now, simply re-run any line below as if it was a new job, even | ||
| # for a restart. functionality is idempotent, except for the | ||
| # registration of new preps in Qiita. These will simply be removed | ||
| # manually. | ||
|
|
||
| # post-processing steps are by default associated with the Workflow | ||
| # class, since they deal with fastq files and Qiita, and don't depend | ||
| # on assay or instrument type. | ||
| self.update_status("Generating sample information", 5, 9) | ||
| self.sifs = self.generate_sifs() | ||
|
|
||
| # post-processing step. | ||
| self.update_status("Registering blanks in Qiita", 6, 9) | ||
| if self.update: | ||
| self.update_blanks_in_qiita() | ||
|
|
||
| self.update_status("Loading preps into Qiita", 7, 9) | ||
| if self.update: | ||
| self.update_prep_templates() | ||
|
|
||
| # before we load preps into Qiita we need to copy the fastq | ||
| # files n times for n preps and correct the file-paths each | ||
| # prep is pointing to. | ||
| self.load_preps_into_qiita() | ||
|
|
||
| self.fsr.generate_report() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we toss this line a comment, too? I see that it was in
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fsr is an instance of FailedSamplesRecord, which is its own object, and as far as I can tell is used with Job.audit to keep track of the samples lost on each of the steps of the pipeline. Here we call FailedSamplesRecord. generate_report to output the report so it's moved to the final output and the user can get access. Adding this info in the code. |
||
|
|
||
| self.update_status("Generating packaging commands", 8, 9) | ||
| self.generate_commands() | ||
|
|
||
| self.update_status("Packaging results", 9, 9) | ||
| if self.update: | ||
| self.execute_commands() | ||
|
|
||
|
|
||
| class Amplicon(Assay): | ||
| AMPLICON_TYPE = 'Amplicon' | ||
|
|
@@ -521,111 +655,6 @@ def load_preps_into_qiita(self): | |
|
|
||
| return df | ||
|
|
||
| def execute_pipeline(self): | ||
| ''' | ||
| Executes steps of pipeline in proper sequence. | ||
| :return: None | ||
| ''' | ||
| self.pre_check() | ||
|
|
||
| self.generate_special_map() | ||
|
|
||
| self.update_status("Converting data", 1, 9) | ||
|
|
||
| self.convert_raw_to_fastq() | ||
|
|
||
| self.integrate_results() | ||
|
|
||
| self.generate_sequence_counts() | ||
|
|
||
| self.update_status("Performing quality control", 2, 9) | ||
| self.quality_control() | ||
|
|
||
| self.update_status("Generating reports", 3, 9) | ||
| self.generate_reports() | ||
|
|
||
| self.update_status("Generating preps", 4, 9) | ||
| self.generate_prep_file() | ||
|
|
||
| # moved final component of genprepfilejob outside of object. | ||
| # obtain the paths to the prep-files generated by GenPrepFileJob | ||
| # w/out having to recover full state. | ||
| tmp = join(self.pipeline.output_path, 'GenPrepFileJob', 'PrepFiles') | ||
|
|
||
| self.has_replicates = False | ||
|
|
||
| prep_paths = [] | ||
| self.prep_file_paths = {} | ||
|
|
||
| rematch = re.compile( | ||
| r"(?P<runid>[a-zA-Z0-9_-]+)\.(?P<qname>[a-zA-Z0-9_]+)" | ||
| r"(?P<qid>[0-9]{5,6})\..\.tsv") | ||
|
|
||
| for root, dirs, files in walk(tmp): | ||
| for _file in files: | ||
| # breakup the prep-info-file into segments | ||
| # (run-id, project_qid, other) and cleave | ||
| # the qiita-id from the project_name. | ||
| rer = rematch.match(_file) | ||
| if rer is None: | ||
| continue | ||
|
|
||
| _, _, qid = rer.groups() | ||
|
|
||
| if qid not in self.prep_file_paths: | ||
| self.prep_file_paths[qid] = [] | ||
|
|
||
| _path = abspath(join(root, _file)) | ||
| if _path.endswith('.tsv'): | ||
| prep_paths.append(_path) | ||
| self.prep_file_paths[qid].append(_path) | ||
|
|
||
| for _dir in dirs: | ||
| if _dir == '1': | ||
| # if PrepFiles contains the '1' directory, then it's a | ||
| # given that this sample-sheet contains replicates. | ||
| self.has_replicates = True | ||
|
|
||
| # currently imported from Assay although it is a base method. it | ||
| # could be imported into Workflows potentially, since it is a post- | ||
| # processing step. All pairings of assay and instrument type need to | ||
| # generate prep-info files in the same format. | ||
| self.overwrite_prep_files(prep_paths) | ||
|
|
||
| # for now, simply re-run any line below as if it was a new job, even | ||
| # for a restart. functionality is idempotent, except for the | ||
| # registration of new preps in Qiita. These will simply be removed | ||
| # manually. | ||
|
|
||
| # post-processing steps are by default associated with the Workflow | ||
| # class, since they deal with fastq files and Qiita, and don't depend | ||
| # on assay or instrument type. | ||
| self.update_status("Generating sample information", 5, 9) | ||
| self.sifs = self.generate_sifs() | ||
|
|
||
| # post-processing step. | ||
| self.update_status("Registering blanks in Qiita", 6, 9) | ||
| if self.update: | ||
| self.update_blanks_in_qiita() | ||
|
|
||
| self.update_status("Loading preps into Qiita", 7, 9) | ||
| if self.update: | ||
| self.update_prep_templates() | ||
|
|
||
| # before we load preps into Qiita we need to copy the fastq | ||
| # files n times for n preps and correct the file-paths each | ||
| # prep is pointing to. | ||
| self.load_preps_into_qiita() | ||
|
|
||
| self.fsr.generate_report() | ||
|
|
||
| self.update_status("Generating packaging commands", 8, 9) | ||
| self.generate_commands() | ||
|
|
||
| self.update_status("Packaging results", 9, 9) | ||
| if self.update: | ||
| self.execute_commands() | ||
|
|
||
|
|
||
| class Metagenomic(MetaOmic): | ||
| METAGENOMIC_TYPE = 'Metagenomic' | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So,
post_process_raw_fastq_outputis only for amplicons andquality_controlis only for metaomics; they are mutually exclusive, and whichever one the assay in question has gets called in "qc-ing reads" step ofexecute_pipeline:Given this, why have two separate methods? It seems like we could have a
def qc_reads()method onAssaywith no default contents and we could override it inAmpliconwith whatever is currently inAmplicon.post_process_raw_fastq_outputand inMetaOmicswith whatever is currently inMetaOmics.quality_control. Thenexecute_pipelinecould just say:There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great catch and suggestion, thank you!