Skip to content

Commit 7f33624

Browse files
authored
PacBio (#143)
* initial changes * init push * fix InstrumentUtils * rm test_required_file_checks * m11111_20250101_111111_s4 * add conda * pacbio_generate_bam2fastq_commands * self.node_count -> self.nprocs * generate_sequence_counts * init changes to def _inject_data(self, wf): * read_length * pmls_extra_parameters * rm index in _inject_data * dstats * nuqc_job_single.sh * rm extra , * print Profile selected * counts.txt in _inject_data * demux_just_fwd * demux_single -? demux_just_fwd * add cli for demux_just_fwd * fix demux_just_fwd params * rm demux_just_fwd_processing splitter_binary * self.files_regex = long_read * sample_id_column * mv self.read_length = read_length up * _filter_empty_fastq_files * zip_longest * raw_reads_r1r2 * fastq.gz -> trimmed.fastq.gz * barcode * barcode -> barcode_id * S000 * del raw_reverse_seqs * test filenames * S000_L001_R1_001.counts.txt * {rec}{sid} * rm touch for gz files * add_default_workflow * fixing counts * fix TestPipeline.test_generate_sample_information_files * restart changes
1 parent 7cef793 commit 7f33624

File tree

22 files changed

+1058
-207
lines changed

22 files changed

+1058
-207
lines changed

.github/workflows/qiita-plugin-ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ jobs:
8383
# seqtk is a conda-installed requirement for metapool and isn't
8484
# installed automatically by its setup.py.
8585
conda config --add channels bioconda
86-
conda create -q --yes -n klp python=3.9 'seqtk>=1.4'
86+
conda create -q --yes -n klp python=3.9 'seqtk>=1.4' pbtk
8787
8888
conda activate klp
8989

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,3 +59,5 @@ dependencies = [
5959
configure_klp = "qp_klp.scripts.configure_klp:config"
6060
start_klp = "qp_klp.scripts.start_klp:execute"
6161
demux = "sequence_processing_pipeline.scripts.cli:demux"
62+
demux_just_fwd = "sequence_processing_pipeline.Commands:demux_just_fwd"
63+
pacbio_generate_bam2fastq_commands = "qp_klp.scripts.pacbio_commands:generate_bam2fastq_commands"

src/qp_klp/Assays.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -494,6 +494,7 @@ def qc_reads(self):
494494
# is not defined, just fallback to the SPP expected default regex
495495
if not hasattr(self, 'files_regex'):
496496
self.files_regex = 'SPP'
497+
497498
# base quality control used by multiple Assay types.
498499
job = NuQCJob(self.raw_fastq_files_path,
499500
self.pipeline.output_path,
@@ -517,7 +518,8 @@ def qc_reads(self):
517518
bucket_size=config['bucket_size'],
518519
length_limit=config['length_limit'],
519520
cores_per_task=config['cores_per_task'],
520-
files_regex=self.files_regex)
521+
files_regex=self.files_regex,
522+
read_length=self.read_length)
521523

522524
if 'NuQCJob' not in self.skip_steps:
523525
job.run(callback=self.job_callback)
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
from .Protocol import PacBio
2+
from sequence_processing_pipeline.Pipeline import Pipeline
3+
from .Assays import Metagenomic
4+
from .Assays import ASSAY_NAME_METAGENOMIC
5+
from .FailedSamplesRecord import FailedSamplesRecord
6+
from .Workflows import Workflow
7+
import pandas as pd
8+
from os.path import join
9+
10+
11+
class PacBioMetagenomicWorkflow(Workflow, Metagenomic, PacBio):
12+
def __init__(self, **kwargs):
13+
super().__init__(**kwargs)
14+
15+
self.mandatory_attributes = ['qclient', 'uif_path',
16+
'lane_number', 'config_fp',
17+
'run_identifier', 'output_dir', 'job_id',
18+
'is_restart']
19+
20+
self.confirm_mandatory_attributes()
21+
22+
# second stage initializer that could conceivably be pushed down into
23+
# specific children requiring specific parameters.
24+
self.qclient = self.kwargs['qclient']
25+
26+
self.overwrite_prep_with_original = False
27+
if 'overwrite_prep_with_original' in self.kwargs:
28+
self.overwrite_prep_with_original = \
29+
self.kwargs['overwrite_prep_with_original']
30+
self.pipeline = Pipeline(self.kwargs['config_fp'],
31+
self.kwargs['run_identifier'],
32+
self.kwargs['uif_path'],
33+
self.kwargs['output_dir'],
34+
self.kwargs['job_id'],
35+
ASSAY_NAME_METAGENOMIC,
36+
lane_number=self.kwargs['lane_number'])
37+
38+
self.fsr = FailedSamplesRecord(self.kwargs['output_dir'],
39+
self.pipeline.sample_sheet.samples)
40+
41+
samples = [
42+
{'barcode': sample['barcode_id'],
43+
'sample_name': sample['Sample_ID'],
44+
'project_name': sample['Sample_Project'],
45+
'lane': sample['Lane']}
46+
for sample in self.pipeline.sample_sheet.samples]
47+
df = pd.DataFrame(samples)
48+
sample_list_fp = f"{self.kwargs['output_dir']}/sample_list.tsv"
49+
df.to_csv(sample_list_fp, sep='\t', index=False)
50+
51+
self.master_qiita_job_id = self.kwargs['job_id']
52+
53+
self.lane_number = self.kwargs['lane_number']
54+
self.is_restart = bool(self.kwargs['is_restart'])
55+
56+
if self.is_restart is True:
57+
self.raw_fastq_files_path = join(self.pipeline.output_path,
58+
'ConvertJob')
59+
self.reports_path = join(self.raw_fastq_files_path,
60+
'SeqCounts.csv')
61+
self.determine_steps_to_skip()
62+
63+
# this is a convenience member to allow testing w/out updating Qiita.
64+
self.update = True
65+
66+
if 'update_qiita' in kwargs:
67+
if not isinstance(kwargs['update_qiita'], bool):
68+
raise ValueError("value for 'update_qiita' must be of "
69+
"type bool")
70+
71+
self.update = kwargs['update_qiita']

src/qp_klp/Protocol.py

Lines changed: 99 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
1-
from sequence_processing_pipeline.ConvertJob import ConvertJob
1+
from sequence_processing_pipeline.ConvertJob import (
2+
ConvertJob, ConvertPacBioBam2FastqJob)
23
from sequence_processing_pipeline.TellReadJob import TellReadJob
34
from sequence_processing_pipeline.SeqCountsJob import SeqCountsJob
45
from sequence_processing_pipeline.TRIntegrateJob import TRIntegrateJob
56
from sequence_processing_pipeline.PipelineError import PipelineError
67
from sequence_processing_pipeline.util import determine_orientation
7-
from os.path import join, split, basename, dirname
88
from re import match
99
from os import makedirs, rename, walk
10+
from os.path import join, split, basename, dirname, exists
1011
from metapool import load_sample_sheet
11-
from metapool.sample_sheet import PROTOCOL_NAME_ILLUMINA, PROTOCOL_NAME_TELLSEQ
12+
from metapool.sample_sheet import (
13+
PROTOCOL_NAME_ILLUMINA, PROTOCOL_NAME_TELLSEQ,
14+
PROTOCOL_NAME_PACBIO_SMRT)
1215
import pandas as pd
1316
from glob import glob
1417
from qiita_client.util import system_call
@@ -79,6 +82,26 @@ def subsample_reads(self):
7982

8083
class Illumina(Protocol):
8184
protocol_type = PROTOCOL_NAME_ILLUMINA
85+
# required files for successful operation for Illumina (making the default
86+
# here) both RTAComplete.txt and RunInfo.xml should reside in the root of
87+
# the run directory.
88+
required_files = ['RTAComplete.txt', 'RunInfo.xml']
89+
read_length = 'short'
90+
91+
def __init__(self) -> None:
92+
super().__init__()
93+
94+
for some_file in self.required_files:
95+
if not exists(join(self.run_dir, some_file)):
96+
raise PipelineError(f"required file '{some_file}' is not "
97+
f"present in {self.run_dir}.")
98+
99+
# verify that RunInfo.xml file is readable.
100+
try:
101+
fp = open(join(self.run_dir, 'RunInfo.xml'))
102+
fp.close()
103+
except PermissionError:
104+
raise PipelineError('RunInfo.xml is present, but not readable')
82105

83106
def convert_raw_to_fastq(self):
84107
def get_config(command):
@@ -156,6 +179,7 @@ def generate_sequence_counts(self):
156179

157180
class TellSeq(Protocol):
158181
protocol_type = PROTOCOL_NAME_TELLSEQ
182+
read_length = 'short'
159183

160184
def convert_raw_to_fastq(self):
161185
config = self.pipeline.get_software_configuration('tell-seq')
@@ -369,3 +393,75 @@ def _post_process_file(self, fastq_file, mapping, lane):
369393
rename(fastq_file, final_path)
370394

371395
return final_path
396+
397+
398+
class PacBio(Protocol):
399+
protocol_type = PROTOCOL_NAME_PACBIO_SMRT
400+
read_length = 'long'
401+
402+
def convert_raw_to_fastq(self):
403+
config = self.pipeline.get_software_configuration('pacbio_convert')
404+
405+
job = ConvertPacBioBam2FastqJob(
406+
self.pipeline.run_dir,
407+
self.pipeline.output_path,
408+
self.pipeline.input_file_path,
409+
config['queue'],
410+
config['nodes'],
411+
config['nprocs'],
412+
config['wallclock_time_in_minutes'],
413+
config['per_process_memory_limit'],
414+
config['executable_path'],
415+
config['modules_to_load'],
416+
self.master_qiita_job_id)
417+
418+
self.raw_fastq_files_path = join(self.pipeline.output_path,
419+
'ConvertJob')
420+
421+
# if ConvertJob already completed, then skip the over the time-
422+
# consuming portion but populate the needed member variables.
423+
if 'ConvertJob' not in self.skip_steps:
424+
job.run(callback=self.job_callback)
425+
426+
# audit the results to determine which samples failed to convert
427+
# properly. Append these to the failed-samples report and also
428+
# return the list directly to the caller.
429+
failed_samples = job.audit(self.pipeline.get_sample_ids())
430+
if hasattr(self, 'fsr'):
431+
# NB 16S does not require a failed samples report and
432+
# it is not performed by SPP.
433+
self.fsr.write(failed_samples, job.__class__.__name__)
434+
435+
return failed_samples
436+
437+
def generate_sequence_counts(self):
438+
# for other instances of generate_sequence_counts in other objects
439+
# the sequence counting needs to be done; however, for PacBio we
440+
# already have done it and just need to merge the results.
441+
gz_files = glob(f'{self.raw_fastq_files_path}/*/*.fastq.gz')
442+
data, missing_files = [], []
443+
444+
for gzf in gz_files:
445+
cf = gzf.replace('.fastq.gz', '.counts.txt')
446+
sn = basename(cf).replace(
447+
f'_S000_L00{self.lane_number}_R1_001.counts.txt', '')
448+
if not exists(cf):
449+
missing_files.append(sn)
450+
continue
451+
with open(cf, 'r') as fh:
452+
counts = fh.read().strip()
453+
data.append({'Sample_ID': sn,
454+
'raw_reads_r1r2': counts,
455+
'Lane': self.lane_number})
456+
457+
if missing_files:
458+
raise ValueError(f'Missing count files: {missing_files}')
459+
460+
df = pd.DataFrame(data)
461+
self.reports_path = join(self.pipeline.output_path,
462+
'ConvertJob',
463+
'SeqCounts.csv')
464+
df.to_csv(self.reports_path, index=False)
465+
466+
def integrate_results(self):
467+
pass

src/qp_klp/WorkflowFactory.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from .StandardMetatranscriptomicWorkflow import \
44
StandardMetatranscriptomicWorkflow
55
from .TellseqMetagenomicWorkflow import TellSeqMetagenomicWorkflow
6+
from .PacBioMetagenomicWorkflow import PacBioMetagenomicWorkflow
67
from sequence_processing_pipeline.Pipeline import Pipeline
78
from metapool import load_sample_sheet
89
from metapool.sample_sheet import SAMPLE_SHEETS_BY_PROTOCOL as SSBP
@@ -14,7 +15,8 @@ class WorkflowFactory():
1415
WORKFLOWS = [StandardMetagenomicWorkflow,
1516
StandardMetatranscriptomicWorkflow,
1617
StandardAmpliconWorkflow,
17-
TellSeqMetagenomicWorkflow]
18+
TellSeqMetagenomicWorkflow,
19+
PacBioMetagenomicWorkflow]
1820

1921
@classmethod
2022
def _get_instrument_type(cls, sheet):
@@ -76,6 +78,9 @@ def generate_workflow(cls, **kwargs):
7678
"sample-sheet or a mapping-file.")
7779

7880
for workflow in WorkflowFactory.WORKFLOWS:
81+
if workflow.read_length not in {'short', 'long'}:
82+
raise ValueError('Invalid read_length: '
83+
f'{workflow.read_length} for {workflow}')
7984
if workflow.assay_type == assay_type:
8085
if workflow.protocol_type == instrument_type:
8186
# return instantiated workflow object

src/qp_klp/Workflows.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import logging
99
from shutil import rmtree
1010
from .Assays import ASSAY_NAME_AMPLICON
11+
from metapool.sample_sheet import PROTOCOL_NAME_PACBIO_SMRT
1112
from sequence_processing_pipeline.util import determine_orientation
1213

1314

@@ -600,6 +601,10 @@ def _get_postqc_fastq_files(self, out_dir, project):
600601
if self.pipeline.pipeline_type != ASSAY_NAME_AMPLICON:
601602
del (files['raw_barcodes'])
602603

604+
# PacBio doesn't have reverse reads
605+
if self.protocol_type == PROTOCOL_NAME_PACBIO_SMRT:
606+
del (files['raw_reverse_seqs'])
607+
603608
# confirm expected lists of reads are not empty.
604609
for f_type in files:
605610
if not files[f_type]:
@@ -621,8 +626,13 @@ def _load_prep_into_qiita(self, qclient, prep_id, artifact_name,
621626
'prep_id': prep_id,
622627
'artifact_type': atype,
623628
'command_artifact_name': artifact_name,
624-
'add_default_workflow': True,
625629
'files': dumps(fastq_files)}
630+
# this will block adding the default workflow to the
631+
# long read / pacbio processing; which is desirable
632+
# until we have a processing pipeline - note that
633+
# this will only be added if _not_ 'long'
634+
if self.read_length != 'long':
635+
pdata['add_default_workflow'] = True
626636

627637
job_id = qclient.post('/qiita_db/artifact/', data=pdata)['job_id']
628638

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
#!/usr/bin/env python
2+
3+
# -----------------------------------------------------------------------------
4+
# Copyright (c) 2014--, The Qiita Development Team.
5+
#
6+
# Distributed under the terms of the BSD 3-clause License.
7+
#
8+
# The full license is in the file LICENSE, distributed with this software.
9+
# -----------------------------------------------------------------------------
10+
import click
11+
import pandas as pd
12+
from glob import glob
13+
from os import makedirs
14+
15+
16+
@click.command()
17+
@click.argument('sample_list', required=True)
18+
@click.argument('run_folder', required=True)
19+
@click.argument('outdir', required=True)
20+
@click.argument('threads', required=True, default=1)
21+
def generate_bam2fastq_commands(sample_list, run_folder, outdir, threads):
22+
"""Generates the bam2fastq commands"""
23+
df = pd.read_csv(sample_list, sep='\t')
24+
25+
# pacbio raw files are in a hifi_reads folder, wihtin multiple folders
26+
# (1_A01, 2_A02, ect), within the run-id folder; and are named
27+
# m[run-id]XXX.hifi_reads.[barcode].bam; thus to find the [barcode] we
28+
# can split on '.' and then the second to last element [-2].
29+
files = {f.split('.')[-2]: f
30+
for f in glob(f'{run_folder}/*/hifi_reads/*.bam')}
31+
32+
makedirs(outdir, exist_ok=True)
33+
34+
commands, missing_files = [], []
35+
for _, row in df.iterrows():
36+
bc = row['barcode']
37+
sn = row['sample_name']
38+
pn = row['project_name']
39+
lane = row['lane']
40+
if bc not in files:
41+
missing_files.append(bc)
42+
continue
43+
od = f'{outdir}/{pn}'
44+
45+
makedirs(od, exist_ok=True)
46+
fn = f'{od}/{sn}_S000_L00{lane}_R1_001'
47+
cmd = (f'bam2fastq -j {threads} -o {fn} -c 9 '
48+
f'{files[bc]}; '
49+
f'fqtools count {fn}.fastq.gz > '
50+
f'{fn}.counts.txt')
51+
commands.append(cmd)
52+
53+
if missing_files:
54+
raise ValueError(
55+
f'{run_folder} is missing barcodes: {missing_files}')
56+
57+
for cmd in commands:
58+
print(cmd)

0 commit comments

Comments
 (0)