diff --git a/bin/resplice_primers.py b/bin/resplice_primers.py index ca44e04..c7fbe47 100755 --- a/bin/resplice_primers.py +++ b/bin/resplice_primers.py @@ -5,19 +5,25 @@ Example usage: ``` -usage: resplice_primers.py [-h] --input_bed INPUT_BED [--output_prefix OUTPUT_PREFIX]\ -[--config CONFIG] +usage: resplice_primers.py [-h] --input_bed INPUT_BED [--output_prefix OUTPUT_PREFIX] [--fwd_suffix FWD_SUFFIX] + [--rev_suffix REV_SUFFIX] [--idx_delim IDX_DELIM] [--idx_position IDX_POSITION] options: - -h, --help show this help message and exit - --input_bed INPUT_BED, -i INPUT_BED - BED file with one-off spike-in primers to be respliced into\ - possible amplicons. - --output_prefix OUTPUT_PREFIX, -o OUTPUT_PREFIX + -h, --help show this help message and exit + --input_bed INPUT_BED, -i INPUT_BED + BED file with one-off spike-in primers to be respliced into possible amplicons. + --output_prefix OUTPUT_PREFIX, -o OUTPUT_PREFIX Output prefix for final respliced amplicon BED file. - --config CONFIG, -c CONFIG - YAML file used to configure module such that it avoids\ - hardcoding + --fwd_suffix FWD_SUFFIX, -f FWD_SUFFIX + The suffix to be expected in the names for forward primers + --rev_suffix REV_SUFFIX, -r REV_SUFFIX + The suffix to be expected in the names for reverse primers + --idx_delim IDX_DELIM, -d IDX_DELIM + The symbol used to delimit the index of a spike-in primer, which differentiates it from the + primer original around the same position. Defaults to a hyphen/dash: '-'. + --idx_position IDX_POSITION, -p IDX_POSITION + The position where the primer spike-in index should be expected, defaulting to the final + position after splitting by the specified index delimiter symbol. ``` """ @@ -33,6 +39,20 @@ def parse_command_line_args() -> argparse.Namespace: + """ + Creates an argument parser object to handle command line arguments. + + Returns: + argparse.Namespace: The parsed command line arguments. + + The available arguments are: + --input_bed/-i: Path to the BED file containing spike-in primers. + --output_prefix/-o: Output prefix for the respliced amplicon BED file. + --fwd_suffix/-f: The suffix expected in forward primer names. + --rev_suffix/-r: The suffix expected in reverse primer names. + --idx_delim/-d: The symbol delimiting spike-in primer indices. + --idx_position/-p: Position to expect primer spike-in index after splitting by delimiter. + """ parser = argparse.ArgumentParser() parser.add_argument( "--input_bed", @@ -87,14 +107,29 @@ def parse_command_line_args() -> argparse.Namespace: delimiter symbol.", ) - # TODO @nick: The spike-in primer index delimiter, which is currently assumed to be - # a hyphen, should be customizable by the user, and perhaps also the index of the - # position in the original primer label string once split with the delimiter. - return parser.parse_args() -def partition_by_amplicon(parsed_bed: pl.DataFrame) -> list[pl.DataFrame]: +def partition_by_amplicon( + parsed_bed: pl.DataFrame, + fwd_suffix: str, + rev_suffix: str, +) -> list[pl.DataFrame]: + """ + Partitions a BED file of PCR primer coordinates by amplicon name. + + Args: + parsed_bed (pl.DataFrame): A polars DataFrame containing parsed primer coordinates + from a BED file. Expected columns are Ref, Start Position, Stop Position, NAME, + INDEX and SENSE. + fwd_suffix (str): The suffix used to identify forward PCR primers. + rev_suffix (str): The suffix used to identify reverse PCR primers. + + Returns: + list[pl.DataFrame]: A list of Polars DataFrames, each containing primers from a + shared amplicon, identified by the amplicon name derived from stripping + suffixes from the primer NAME column. + """ return ( parsed_bed.with_columns( # make sure to save the original name for debugging purposes @@ -105,11 +140,19 @@ def partition_by_amplicon(parsed_bed: pl.DataFrame) -> list[pl.DataFrame]: pl.col("NAME").str.replace_all(r"-\d+", "").alias("NAME"), ) .with_columns( + # take the name column, which now has no hyphen-delimited indices, and also + # remove the forward and reverse suffixes, calling the resulting column + # 'Amplicon'. This should result in an amplicon column that will group + # together associated forward and reverse primers, as well as primer name + # groupings with >= 1 forward or reverse primer. If the forward and reverse + # groupings have more than one entry, they should all now have the same name + # and thus be ready for indexing downstream. pl.col("NAME") - .str.replace_all("_LEFT", "") - .str.replace_all("_RIGHT", "") + .str.replace_all(fwd_suffix, "") + .str.replace_all(rev_suffix, "") .alias("Amplicon"), ) + # use a select expression to reorder columns .select( "Ref", "Start Position", @@ -128,7 +171,38 @@ def assign_new_indices( primer_name_df: pl.DataFrame, fwd_suffix: str, rev_suffix: str, -) -> pl.DataFrame | tuple[pl.DataFrame]: + idx_delim: str = "-", + _idx_position: int = -1, +) -> pl.DataFrame: + """ + Assigns new indices to primers within an amplicon scheme when spike-in primers are present. + + Given a DataFrame of primers within a single amplicon that may contain spike-ins, + assigns a new index to each primer that could potentially pair with another primer + from the same amplicon. This indexing scheme ensures that spike-in primers can be + differentiated from their original primers by adding a numerical suffix. + + Args: + primer_name_df (pl.DataFrame): Dataframe containing all primers belonging to a + single amplicon. + fwd_suffix (str): The suffix used to identify forward PCR primers. + rev_suffix (str): The suffix used to identify reverse PCR primers. + idx_delim (str): The character used to delimit the index, defaulting to "-". + _idx_position (int): The as-yet unused expected position within a split primer + name where the index is expected. Defaults to the final position.s + + Returns: + pl.DataFrame: DataFrame with a new NAME column that uniquely identifies each + primer with a numerical suffix based on its position in the amplicon scheme. + + The function assigns indices to primers by: + - Adding a 1-based row index + - Creating a new NAME column by concatenating: + - Original amplicon name + - Forward/reverse suffix + - Hyphen delimiter by default, but the user can provide their own delimiter + - Row index + """ return ( # add a row that is the 1-based index of the primer and cast it as a string # to make concatenating columns more type-safe @@ -143,11 +217,11 @@ def assign_new_indices( pl.concat_str( [ pl.col("Amplicon"), - pl.lit("-"), - pl.col("index"), pl.when(pl.col("NAME").str.contains(fwd_suffix)) .then(pl.lit(fwd_suffix)) .otherwise(pl.lit(rev_suffix)), + pl.lit(idx_delim), + pl.col("index"), ], separator="", ).alias( @@ -164,7 +238,7 @@ def assign_new_indices( "INDEX", "SENSE", "Amplicon", - ), + ) ) @@ -173,21 +247,58 @@ def normalize_indices( fwd_suffix: str, rev_suffix: str, ) -> dict[Any, pl.DataFrame]: + """ + Normalizes indices within a group of primers, assigning new indices to distinguish spike-in primers. + + This function performs several key steps: + 1. Partitions primers by amplicon and checks for presence of spike-ins + 2. For each amplicon group with spike-ins, assigns sequential indices to differentiate primers + 3. Handles special cases like single primers or duplicates + 4. Returns a dictionary mapping amplicon names to their primer DataFrames + + The control flow involves nested loops: + - Outer loop processes each amplicon's primers + - Inner loop handles forward/reverse primer groups within each amplicon + - For each group, checks are performed to ensure valid primer naming + - New indices are assigned only when spike-ins are detected + + Args: + partitioned_bed (list[pl.DataFrame]): List of DataFrames, each containing primers from + a single amplicon + fwd_suffix (str): Suffix identifying forward primers + rev_suffix (str): Suffix identifying reverse primers + + Returns: + dict[Any, pl.DataFrame]: Dictionary mapping amplicon names to DataFrames containing + their normalized primers. Each primer now has a unique index if it is part of a + spike-in group. + + The function maintains separation between original and spike-in primers through careful + index management, ensuring that downstream analyses can properly pair complementary + primers while preserving information about which primers are spike-ins versus originals. + """ + # initialize a new dataframe to be mutated in the following for loops normalized_dfs: list[pl.DataFrame] = [] + + # The outer loop goes through each amplicon, which should have at least two primers, + # a forward primer and a reverse primer. for primer_df in partitioned_bed: + # partition the dataframe for the current amplicon by name, which should + # effectively mean partitioning by forward or reverse. new_dfs = primer_df.partition_by("NAME") - # TODO @nick: Should there be a step here that appends and continues if the name - # dataframe has only one row? Also, in the current setup where - # `partition_by_amplicon()` does what it does before this function is run, - # can't we assume that any dataframes with more than one row *are guaranteed* - # to have duplicates, in which case the dupe check here is unnecessary (and - # verbose)? - + # the inner loop iterates through each group of potentially redundant primer + # names, assigning new indices to differentiate them if they are. For example, + # if there are three forward primers (i.e., one original primer and two spike-in + # primers), they should all have the same name at this stage, and will thus have + # -1, -2, and -3 appended to them respectively. for j, primer_name_df in enumerate(new_dfs): # run a check for whether there's >= 1 spikein. If there are none, there # will just be one primer per name partition, in which case we can skip to - # the next primer set without renaming anything + # the next primer set without renaming anything. We retain this duplicate + # check instead of simply counting rows in case this function has been used + # outside the context of this script's main workflow, in which case we + # report a helpful error. dupe_check = ( primer_name_df.with_columns( pl.col("NAME").is_duplicated().alias("duped"), @@ -196,12 +307,27 @@ def normalize_indices( .to_series() .to_list() ) - if True not in dupe_check: - normalized_dfs.append(primer_name_df) + if True not in dupe_check and dupe_check.count(False) <= 1: + # NAME modifications are no longer needed, so we may now revert the + # primer name to its original state + new_dfs[j] = primer_name_df.with_columns( + pl.col("ORIG_NAME").alias("NAME"), + ) + continue + + # run a check to make sure the expectation that entries in this NAME- + # partitioned dataframe all have the same name at this stage. + if len(primer_name_df.select("NAME").unique().to_series().to_list()) != 1: + logger.warning( + f"Unable to properly assign indices for these primers: \ + {primer_name_df.select('NAME').unique().to_series().to_list()}. \ + Skipping. Please open an issue if the program should crash here \ + rather than skipping.", + ) continue - # otherwise, we'll need to rename the primers to account for spike-ins - # explicitly. + # having passed the above checks, we can now proceed to assigning new names + # to the primers to account for spike-ins explicitly. corrected_indices = assign_new_indices( primer_name_df, fwd_suffix, @@ -210,15 +336,17 @@ def normalize_indices( # overwrite the previous entry in this position of the dataframe list with # the updated entry, which has renamed primers to account for spike-ins. - new_dfs[j] = corrected_indices[ - 0 - ] # TODO @nick: I guess this is okay even if the corrected_indices isn't a tuple?? + new_dfs[j] = corrected_indices + + # end of inner NAME loop # after going through the forward and reverse primers for this amplicon in the # above control flow, append the updated dataframes to the accumulating # normalized dataframes. normalized_dfs.append(pl.concat(new_dfs)) + # end of outer Amplicon loop + return pl.concat(normalized_dfs).partition_by("Amplicon", as_dict=True) @@ -239,6 +367,29 @@ def resolve_primer_names( idx_delim: str = "-", idx_position: int = -1, ) -> tuple[list[str], list[str]]: + """ + Resolves primer names by generating new labels for all possible combinations of forward and reverse primers. + + This function takes lists of forward and reverse primer names and generates unique identifiers for each + possible pairing combination. It handles primers that have been marked with indices to denote spike-ins + or variants. + + Args: + old_fwd_primers (list[str]): List of forward primer names to be paired + old_rev_primers (list[str]): List of reverse primer names to be paired + idx_delim (str, optional): Delimiter used to separate the index in primer names. Defaults to "-" + idx_position (int, optional): Position of the index when splitting by delimiter. Defaults to -1 + + Returns: + tuple[list[str], list[str]]: Two lists - one containing the original primer names in order, and + one containing the newly generated primer names with unique combination identifiers + + The function: + 1. Finds all possible forward/reverse primer combinations + 2. Assigns a unique splice index to each valid pairing + 3. Handles primers with existing hyphen-delimited indices + 4. Returns both old and new primer names for mapping the changes + """ # find all possible combinations of the provided primers all_possible_pairs = list(product(old_fwd_primers, old_rev_primers)) @@ -314,13 +465,60 @@ def resplice_primers( amplicon_dfs: dict[Any, pl.DataFrame], fwd_suffix: str, rev_suffix: str, + idx_delim: str = "-", + idx_position: int = -1, ) -> list[pl.DataFrame]: + """ + Generates all possible combinations of forward and reverse primers within each amplicon. + + This function takes amplicon DataFrames, identifies spike-in primers, and creates new + primer combinations by pairing forward and reverse primers. It handles special cases like + standard primer pairs and single primers. + + Args: + amplicon_dfs (dict[Any, pl.DataFrame]): Dictionary mapping amplicon names to their + primer DataFrames + fwd_suffix (str): Suffix identifying forward primers + rev_suffix (str): Suffix identifying reverse primers + idx_delim (str, optional): Delimiter used for spike-in indices. Defaults to "-" + idx_position (int, optional): Position of spike-in index after splitting. Defaults to -1 + + Returns: + list[pl.DataFrame]: List of DataFrames containing valid primer combinations for each + amplicon, with updated names reflecting the possible pairings. + + The function: + 1. Checks for standard two-primer amplicons + 2. Identifies forward and reverse primers in amplicons with spike-ins + 3. Generates valid primer combinations and assigns new names + 4. Skips invalid or incomplete primer sets + 5. Returns DataFrames ready for final processing + """ mutated_frames: list[pl.DataFrame] = [] for amplicon, primer_df in amplicon_dfs.items(): # our usual expectation is that there are two primers per amplicon. If this is # the case, append it to our output list and skip to the next amplicon's primers - primer_pair_number = 2 - if primer_df.shape[0] == primer_pair_number: + if primer_df.shape[0] == 2: # noqa: PLR2004 + # double check that there's an identifiable forward and reverse primer + # within the pair + pair_labels = primer_df.select("NAME").to_series().to_list() + logger.debug( + f"Pair of primers within the amplicon {amplicon} detected: \ + {pair_labels}. No resplicing will be needed here, though double check \ + that the necessary forward and reverse suffixes, {fwd_suffix} and \ + {rev_suffix}, are present.", + ) + assert any( + fwd_suffix in primer for primer in pair_labels + ), f"The forward suffix {fwd_suffix} is missing in the provided primer \ + pairs: {pair_labels}. Aborting." + assert any( + rev_suffix in primer for primer in pair_labels + ), f"The reverse suffix {rev_suffix} is missing in the provided primer \ + pairs: {pair_labels}. Aborting." + + # if so, all is well. Append the primers into the growing list of correct + # dataframes and move onto the next amplicon mutated_frames.append(primer_df) continue @@ -343,6 +541,8 @@ def resplice_primers( old_primer_names, new_primer_names = resolve_primer_names( fwd_primers, rev_primers, + idx_delim, + idx_position, ) # double check that we have the correct number of primer labels, though with the @@ -361,7 +561,7 @@ def resplice_primers( primer_df.with_columns(pl.col("NAME").cast(pl.Utf8)), how="left", on="NAME", - validate="1:1", + validate="m:1", coalesce=False, ) .with_columns(pl.Series(new_primer_names).alias("NAME")) @@ -419,9 +619,26 @@ def finalize_primer_pairings( def main() -> None: """ - `main()` coordinates the flow of data through the module's functions. + This script generates all possible combinations of amplicons based on an input BED + file of spike-in primers and original primers. + + The workflow consists of the following steps: + 1. Parse command line arguments specifying input BED, output prefix, and primer + naming conventions + 2. Read in BED file of primers as a polars DataFrame + 3. Group primers by amplicon and partition them into separate dataframes + 4. Normalize primer indices within each amplicon group to handle spike-ins + 5. Generate all valid primer pair combinations for each amplicon + 6. Filter out invalid pairings and finalize primer names + 7. Output a new BED file containing all valid primer pair combinations + + Each primer is required to have forward/reverse suffixes and a spike-in index + delimited by a specified character. The script checks that all primers follow this + naming convention to ensure reliable pairing. + + The output maintains the original BED file format while expanding out all possible + amplicon combinations from the given spike-in primers. """ - args = parse_command_line_args() bed_df = pl.read_csv( @@ -440,7 +657,11 @@ def main() -> None: # identify amplicons/primer pairings and create a separate dataframe for the primers # associated with each - partitioned_bed = partition_by_amplicon(bed_df) + partitioned_bed = partition_by_amplicon( + bed_df, + args.fwd_suffix, + args.rev_suffix, + ) # make sure that all primers have informative names in amplicons where there are # spiked-in primers @@ -454,6 +675,8 @@ def main() -> None: indexed_primer_dfs, args.fwd_suffix, args.rev_suffix, + args.idx_delim, + args.idx_position, ) if len(respliced_dfs) == len(indexed_primer_dfs): @@ -464,17 +687,17 @@ def main() -> None: set.", ) - # TODO @nick: Is this function necessary? It seems to be controlling for something - # that should no longer be possible, right? + # run one last check that each group has at least one identifiable forward primer + # and one identifiable reverse primer, if not multiple. final_df = finalize_primer_pairings( respliced_dfs, args.fwd_suffix, args.rev_suffix, ) - # write the now unnecessary columns, sort by start and stop position, and write + # drop the now unnecessary columns, sort by start and stop position, and write # out with the user-provided output prefix - final_df.drop("Amplicon").drop("NAME").sort( + final_df.drop("Amplicon").drop("ORIG_NAME").sort( "Start Position", "Stop Position", ).write_csv(