Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/qp_klp/Assays.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class Assay():
w/'_'.
"""
assay_type = ASSAY_NAME_NONE
assay_warnings = []

@classmethod
def _replace_tube_ids_w_sample_names(cls, prep_file_path, tube_id_map):
Expand Down Expand Up @@ -166,6 +167,7 @@ def execute_pipeline(self):
self.convert_raw_to_fastq()
self.integrate_results()
self.generate_sequence_counts()
self.subsample_reads()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that we will now ALWAYS subsample?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but really we will always check if subsample is needed and only run it when necessary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, we will always subsample every fastq that has more than the max number of reads, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct.


self.update_status("QC-ing reads", 2, 9)
if "NuQCJob" not in self.skip_steps:
Expand Down Expand Up @@ -260,6 +262,13 @@ def execute_pipeline(self):
self.update_status("Generating packaging commands", 8, 9)
self.generate_commands()

# store the warnings, if they exist so they are packed with the
# final results
if self.assay_warnings:
wfp = f'{self.pipeline.output_path}/final_results/WARNINGS.txt'
with open(wfp, 'w') as f:
f.write('\n'.join(self.warnings))

self.update_status("Packaging results", 9, 9)
if self.update:
self.execute_commands()
Expand Down
47 changes: 46 additions & 1 deletion src/qp_klp/Protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@
from sequence_processing_pipeline.TRIntegrateJob import TRIntegrateJob
from sequence_processing_pipeline.PipelineError import PipelineError
from sequence_processing_pipeline.util import determine_orientation
from os.path import join, split
from os.path import join, split, basename, dirname
from re import match
from os import makedirs, rename, walk
from metapool import load_sample_sheet
from metapool.sample_sheet import PROTOCOL_NAME_ILLUMINA, PROTOCOL_NAME_TELLSEQ
import pandas as pd
from glob import glob
from qiita_client.util import system_call


PROTOCOL_NAME_NONE = "None"
Expand All @@ -22,6 +25,48 @@ class Protocol():
initialization.
"""
protocol_type = PROTOCOL_NAME_NONE
# this value was selected by looking at all the successful NuQC/SPP jobs,
# the max sequeces were: 712,497,596
MAX_READS = 720000000

def subsample_reads(self):
if self.assay_type == 'Amplicon':
return

df = pd.read_csv(self.reports_path)
if 'raw_reads_r1r2' in df.columns:
# this is a TellSeq run: SeqCounts.csv
read_col = 'raw_reads_r1r2'
index_col = 'Sample_ID'
elif '# Reads' in df.columns:
# this is a Illumina: Demultiplex_Stats.csv
read_col = '# Reads'
index_col = 'SampleID'
else:
raise ValueError(
'Not sure how to check for seq counts to subsample, '
'please let an admin know.')
df = df[df[read_col] > self.MAX_READS]
if df.shape[0]:
for _, row in df.iterrows():
sn = row[index_col]
files = glob(f'{self.raw_fastq_files_path}/*/{sn}*.fastq.gz')
for f in files:
dn = dirname(f)
bn = basename(f)
nbn = join(dn, bn.replace('fastq.gz', 'subsampled.gz'))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it going to cause any problems later that the subsampled reads file doesn't end with 'fastq.gz'? I'm not familiar with the workflow but I know I've seen regexes around that expect this suffix ...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to keep in the working directory a backup of the original file, just in case we need to debug it. This basically moves the original problematic file to a new name with subsample,, then in the next command the subsample will create a new (smaller) file with the name of the original. In fact, I'm relying on those regex to ignore the subsampled.gz. I'll add a comment about this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, that makes sense :) However, I wonder if we could name them something other than "subsampled.gz"? That name makes me think that the file with that name IS the one that was subsampled, rather than being the original one. Could we name it "not_subsampled.gz" or something?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, while writing the comment I realized the same thing so I called it "full".

cmd = f'mv {f} {nbn}'
_, se, rv = system_call(cmd)
if rv != 0 or se:
raise ValueError(f'Error during mv: {cmd}. {se}')
cmd = (f'seqtk sample -s 42 {nbn} {self.MAX_READS} '
f'| gzip > {f}')
_, se, rv = system_call(cmd)
if rv != 0 or se:
raise ValueError(f'Error during mv: {cmd}. {se}')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This error message seems misleading since it (still) says "Error during mv"; I think it should be changed to say it is reporting on errors that occur during seqtk.

self.assay_warnings.append(
f'{sn} ({bn}) had {row[read_col]} sequences, '
f'subsampling to {self.MAX_READS}')


class Illumina(Protocol):
Expand Down
Loading