-
Notifications
You must be signed in to change notification settings - Fork 4
PacBio #143
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PacBio #143
Changes from 39 commits
5d65b48
844e4bd
55ab210
4439472
6bf3586
411a119
e8a2d0b
5e16ab1
827a372
551bfd1
6311723
40afadf
52f4c53
e18cf5d
0734a6e
281bf75
1aa102c
01c862d
05036b7
d5adee0
2fcfc28
27f49d1
dea4fd9
e596ade
b27e7d6
1666516
0141440
2b5700f
cc29d39
3a94746
6e12dd5
b2a308e
1da1abb
e600bfb
d73b6a7
c2b0a3e
b2b4d60
0051f49
1418cbc
62e388e
27efe2d
a323f46
48010d3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,66 @@ | ||
| from .Protocol import PacBio | ||
| from sequence_processing_pipeline.Pipeline import Pipeline | ||
| from .Assays import Metagenomic | ||
| from .Assays import ASSAY_NAME_METAGENOMIC | ||
| from .FailedSamplesRecord import FailedSamplesRecord | ||
| from .Workflows import Workflow | ||
| import pandas as pd | ||
|
|
||
|
|
||
| class PacBioMetagenomicWorkflow(Workflow, Metagenomic, PacBio): | ||
| def __init__(self, **kwargs): | ||
| super().__init__(**kwargs) | ||
|
|
||
| self.mandatory_attributes = ['qclient', 'uif_path', | ||
| 'lane_number', 'config_fp', | ||
| 'run_identifier', 'output_dir', 'job_id', | ||
| 'lane_number', 'is_restart'] | ||
|
|
||
| self.confirm_mandatory_attributes() | ||
|
|
||
| # second stage initializer that could conceivably be pushed down into | ||
| # specific children requiring specific parameters. | ||
| self.qclient = self.kwargs['qclient'] | ||
|
|
||
| self.overwrite_prep_with_original = False | ||
| if 'overwrite_prep_with_original' in self.kwargs: | ||
| self.overwrite_prep_with_original = \ | ||
| self.kwargs['overwrite_prep_with_original'] | ||
| self.pipeline = Pipeline(self.kwargs['config_fp'], | ||
| self.kwargs['run_identifier'], | ||
| self.kwargs['uif_path'], | ||
| self.kwargs['output_dir'], | ||
| self.kwargs['job_id'], | ||
| ASSAY_NAME_METAGENOMIC, | ||
| lane_number=self.kwargs['lane_number']) | ||
|
|
||
| self.fsr = FailedSamplesRecord(self.kwargs['output_dir'], | ||
| self.pipeline.sample_sheet.samples) | ||
|
|
||
| samples = [ | ||
| {'barcode': sample['barcode_id'], | ||
| 'sample_name': sample['Sample_ID'], | ||
| 'project_name': sample['Sample_Project'], | ||
| 'lane': sample['Lane']} | ||
| for sample in self.pipeline.sample_sheet.samples] | ||
| df = pd.DataFrame(samples) | ||
| sample_list_fp = f"{self.kwargs['output_dir']}/sample_list.tsv" | ||
| df.to_csv(sample_list_fp, sep='\t', index=False) | ||
|
|
||
| self.master_qiita_job_id = self.kwargs['job_id'] | ||
|
|
||
| self.lane_number = self.kwargs['lane_number'] | ||
| self.is_restart = bool(self.kwargs['is_restart']) | ||
|
|
||
| if self.is_restart is True: | ||
| self.determine_steps_to_skip() | ||
|
|
||
| # this is a convenience member to allow testing w/out updating Qiita. | ||
| self.update = True | ||
|
|
||
| if 'update_qiita' in kwargs: | ||
| if not isinstance(kwargs['update_qiita'], bool): | ||
| raise ValueError("value for 'update_qiita' must be of " | ||
| "type bool") | ||
|
|
||
| self.update = kwargs['update_qiita'] | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,14 +1,17 @@ | ||
| from sequence_processing_pipeline.ConvertJob import ConvertJob | ||
| from sequence_processing_pipeline.ConvertJob import ( | ||
| ConvertJob, ConvertPacBioBam2FastqJob) | ||
| from sequence_processing_pipeline.TellReadJob import TellReadJob | ||
| from sequence_processing_pipeline.SeqCountsJob import SeqCountsJob | ||
| from sequence_processing_pipeline.TRIntegrateJob import TRIntegrateJob | ||
| from sequence_processing_pipeline.PipelineError import PipelineError | ||
| from sequence_processing_pipeline.util import determine_orientation | ||
| from os.path import join, split, basename, dirname | ||
| from re import match | ||
| from os import makedirs, rename, walk | ||
| from os.path import join, split, basename, dirname, exists | ||
| from metapool import load_sample_sheet | ||
| from metapool.sample_sheet import PROTOCOL_NAME_ILLUMINA, PROTOCOL_NAME_TELLSEQ | ||
| from metapool.sample_sheet import ( | ||
| PROTOCOL_NAME_ILLUMINA, PROTOCOL_NAME_TELLSEQ, | ||
| PROTOCOL_NAME_PACBIO_SMRT) | ||
| import pandas as pd | ||
| from glob import glob | ||
| from qiita_client.util import system_call | ||
|
|
@@ -79,6 +82,26 @@ def subsample_reads(self): | |
|
|
||
| class Illumina(Protocol): | ||
| protocol_type = PROTOCOL_NAME_ILLUMINA | ||
| # required files for successful operation for Illumina (making the default | ||
| # here) both RTAComplete.txt and RunInfo.xml should reside in the root of | ||
| # the run directory. | ||
| required_files = ['RTAComplete.txt', 'RunInfo.xml'] | ||
| read_length = 'short' | ||
|
|
||
| def __init__(self) -> None: | ||
| super().__init__() | ||
|
|
||
| for some_file in self.required_files: | ||
| if not exists(join(self.run_dir, some_file)): | ||
| raise PipelineError(f"required file '{some_file}' is not " | ||
| f"present in {self.run_dir}.") | ||
|
|
||
| # verify that RunInfo.xml file is readable. | ||
| try: | ||
| fp = open(join(self.run_dir, 'RunInfo.xml')) | ||
| fp.close() | ||
| except PermissionError: | ||
| raise PipelineError('RunInfo.xml is present, but not readable') | ||
|
|
||
| def convert_raw_to_fastq(self): | ||
| def get_config(command): | ||
|
|
@@ -156,6 +179,7 @@ def generate_sequence_counts(self): | |
|
|
||
| class TellSeq(Protocol): | ||
| protocol_type = PROTOCOL_NAME_TELLSEQ | ||
| read_length = 'short' | ||
|
|
||
| def convert_raw_to_fastq(self): | ||
| config = self.pipeline.get_software_configuration('tell-seq') | ||
|
|
@@ -369,3 +393,74 @@ def _post_process_file(self, fastq_file, mapping, lane): | |
| rename(fastq_file, final_path) | ||
|
|
||
| return final_path | ||
|
|
||
|
|
||
| class PacBio(Protocol): | ||
| protocol_type = PROTOCOL_NAME_PACBIO_SMRT | ||
| read_length = 'long' | ||
|
|
||
| def convert_raw_to_fastq(self): | ||
| config = self.pipeline.get_software_configuration('pacbio_convert') | ||
|
|
||
| job = ConvertPacBioBam2FastqJob( | ||
| self.pipeline.run_dir, | ||
| self.pipeline.output_path, | ||
| self.pipeline.input_file_path, | ||
| config['queue'], | ||
| config['nodes'], | ||
| config['nprocs'], | ||
| config['wallclock_time_in_minutes'], | ||
| config['per_process_memory_limit'], | ||
| config['executable_path'], | ||
| config['modules_to_load'], | ||
| self.master_qiita_job_id) | ||
|
|
||
| self.raw_fastq_files_path = join(self.pipeline.output_path, | ||
| 'ConvertJob') | ||
|
|
||
| # if ConvertJob already completed, then skip the over the time- | ||
| # consuming portion but populate the needed member variables. | ||
| if 'ConvertJob' not in self.skip_steps: | ||
| job.run(callback=self.job_callback) | ||
|
|
||
| # audit the results to determine which samples failed to convert | ||
| # properly. Append these to the failed-samples report and also | ||
| # return the list directly to the caller. | ||
| failed_samples = job.audit(self.pipeline.get_sample_ids()) | ||
| if hasattr(self, 'fsr'): | ||
| # NB 16S does not require a failed samples report and | ||
| # it is not performed by SPP. | ||
| self.fsr.write(failed_samples, job.__class__.__name__) | ||
|
|
||
| return failed_samples | ||
|
|
||
| def generate_sequence_counts(self): | ||
| # for other isntances of generate_sequence_counts in other objects | ||
|
||
| # the sequence counting needs to be done; however, for PacBio we | ||
| # already have done it and just need to merge the results. | ||
| gz_files = glob(f'{self.raw_fastq_files_path}/*/*.fastq.gz') | ||
| data, missing_files = [], [] | ||
|
|
||
| for gzf in gz_files: | ||
| cf = gzf.replace('.fastq.gz', '.counts.txt') | ||
| sn = basename(cf).replace('_R1.counts.txt', '') | ||
| if not exists(cf): | ||
| missing_files.append(sn) | ||
| continue | ||
| with open(cf, 'r') as fh: | ||
| counts = fh.read().strip() | ||
| data.append({'Sample_ID': sn, | ||
| 'raw_reads_r1r2': counts, | ||
| 'Lane': self.lane_number}) | ||
|
|
||
| if missing_files: | ||
| raise ValueError(f'Missing count files: {missing_files}') | ||
|
|
||
| df = pd.DataFrame(data) | ||
| self.reports_path = join(self.pipeline.output_path, | ||
| 'ConvertJob', | ||
| 'SeqCounts.csv') | ||
| df.to_csv(self.reports_path, index=False) | ||
|
|
||
| def integrate_results(self): | ||
| pass | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,53 @@ | ||
| #!/usr/bin/env python | ||
|
|
||
| # ----------------------------------------------------------------------------- | ||
| # Copyright (c) 2014--, The Qiita Development Team. | ||
| # | ||
| # Distributed under the terms of the BSD 3-clause License. | ||
| # | ||
| # The full license is in the file LICENSE, distributed with this software. | ||
| # ----------------------------------------------------------------------------- | ||
| import click | ||
| import pandas as pd | ||
| from glob import glob | ||
| from os import makedirs | ||
|
|
||
|
|
||
| @click.command() | ||
| @click.argument('sample_list', required=True) | ||
| @click.argument('run_folder', required=True) | ||
| @click.argument('outdir', required=True) | ||
| @click.argument('threads', required=True, default=1) | ||
| def generate_bam2fastq_commands(sample_list, run_folder, outdir, threads): | ||
| """Generates the bam2fastq commands""" | ||
| df = pd.read_csv(sample_list, sep='\t') | ||
| files = {f.split('.')[-2]: f | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what is this split/slice grabbing from the file name?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Adding some comments about this. |
||
| for f in glob(f'{run_folder}/*/hifi_reads/*.bam')} | ||
|
|
||
| makedirs(outdir, exist_ok=True) | ||
|
|
||
| commands, missing_files = [], [] | ||
| for _, row in df.iterrows(): | ||
| bc = row['barcode'] | ||
| sn = row['sample_name'] | ||
| pn = row['project_name'] | ||
| lane = row['lane'] | ||
| if bc not in files: | ||
| missing_files.append(bc) | ||
| continue | ||
| od = f'{outdir}/{pn}' | ||
|
|
||
| makedirs(od, exist_ok=True) | ||
| fn = f'{od}/{sn}_S000_L00{lane}_R1_001' | ||
| cmd = (f'bam2fastq -j {threads} -o {fn} -c 9 ' | ||
| f'{files[bc]}; ' | ||
| f'fqtools count {fn}.fastq.gz > ' | ||
| f'{fn}.counts.txt') | ||
| commands.append(cmd) | ||
|
|
||
| if missing_files: | ||
| raise ValueError( | ||
| f'{run_folder} is missing barcodes: {missing_files}') | ||
|
|
||
| for cmd in commands: | ||
| print(cmd) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does
lane_numberneed to be present twice?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was a mistake; maybe we found the culprit of the duplicated lane?!