diff --git a/Makefile b/Makefile index 2b8d1f5dc..6d2086fc0 100644 --- a/Makefile +++ b/Makefile @@ -295,7 +295,7 @@ suball.abPOA: rm -fr ${INCLDIR}/simde && cp -r submodules/abPOA/include/simde ${INCLDIR} suball.lastz: - cd submodules/lastz/src && sed -i Makefile -e 's/-lm -o/-lm $${LIBS} -o/g' + cd submodules/lastz/src && sed -i -e 's/-lm -o/-lm $${LIBS} -o/g' Makefile cd submodules/lastz && LIBS="${jemallocLib}" ${MAKE} ln -f submodules/lastz/src/lastz bin @@ -321,7 +321,7 @@ suball.FASTGA: ln -f submodules/FASTGA/GIXmake ${BINDIR} ln -f submodules/FASTGA/GIXrm ${BINDIR} suball.FASTAN: - cd submodules/FASTAN && sed -i Makefile -e 's/-lm -lz/-lm -lpthread -lz/g' && ${MAKE} || true + cd submodules/FASTAN && sed -i -e 's/-lm -lz/-lm -lpthread -lz/g' Makefile && ${MAKE} || true ln -f submodules/FASTAN/FasTAN ${BINDIR} suball.alntools: cd submodules/alntools && ${MAKE} diff --git a/caf/impl/pinchIterator.c b/caf/impl/pinchIterator.c index 628c64cd1..cc4cc2389 100644 --- a/caf/impl/pinchIterator.c +++ b/caf/impl/pinchIterator.c @@ -43,7 +43,7 @@ typedef struct _pairwiseAlignmentToPinch { Paf *(*getPairwiseAlignment)(void *); Paf *paf; int64_t xCoordinate, yCoordinate, xName, yName; - Cigar *op; + int64_t opIndex; bool freeAlignments; } PairwiseAlignmentToPinch; @@ -63,43 +63,46 @@ static stPinch *pairwiseAlignmentToPinch_getNext(PairwiseAlignmentToPinch *pA, s if (pA->paf == NULL) { return NULL; } - pA->op = pA->paf->cigar; + pA->opIndex = 0; pA->xCoordinate = pA->paf->same_strand ? pA->paf->query_start : pA->paf->query_end; pA->yCoordinate = pA->paf->target_start; pA->xName = cactusMisc_stringToName(pA->paf->query_name); pA->yName = cactusMisc_stringToName(pA->paf->target_name); } - while (pA->op != NULL) { - assert(pA->op->length >= 1); - if (pA->op->op == match || pA->op->op == sequence_match || pA->op->op == sequence_mismatch) { //deal with the possibility of a zero length match (strange, but not illegal) + while (pA->opIndex < cigar_count(pA->paf->cigar)) { + CigarRecord *op = cigar_get(pA->paf->cigar, pA->opIndex); + assert(op->length >= 1); + if (op->op == match || op->op == sequence_match || op->op == sequence_mismatch) { //deal with the possibility of a zero length match (strange, but not illegal) // Make maximal length (in case run of sequence matches and mismatches) int64_t i=0; // Represents the length of the previous matches in the sequence - while(pA->op->next != NULL && (pA->op->next->op == match || - pA->op->next->op == sequence_match || - pA->op->next->op == sequence_mismatch)) { - i+=pA->op->length; - pA->op = pA->op->next; + while(pA->opIndex + 1 < cigar_count(pA->paf->cigar) && + (cigar_get(pA->paf->cigar, pA->opIndex + 1)->op == match || + cigar_get(pA->paf->cigar, pA->opIndex + 1)->op == sequence_match || + cigar_get(pA->paf->cigar, pA->opIndex + 1)->op == sequence_mismatch)) { + i += op->length; + pA->opIndex++; + op = cigar_get(pA->paf->cigar, pA->opIndex); } if (pA->paf->same_strand) { - stPinch_fillOut(pinchToFillOut, pA->xName, pA->yName, pA->xCoordinate, pA->yCoordinate, i+pA->op->length, + stPinch_fillOut(pinchToFillOut, pA->xName, pA->yName, pA->xCoordinate, pA->yCoordinate, i+op->length, 1); - pA->xCoordinate += i+pA->op->length; + pA->xCoordinate += i+op->length; } else { - pA->xCoordinate -= i+pA->op->length; - stPinch_fillOut(pinchToFillOut, pA->xName, pA->yName, pA->xCoordinate, pA->yCoordinate, i+pA->op->length, + pA->xCoordinate -= i+op->length; + stPinch_fillOut(pinchToFillOut, pA->xName, pA->yName, pA->xCoordinate, pA->yCoordinate, i+op->length, 0); } - pA->yCoordinate += i+pA->op->length; - pA->op = pA->op->next; + pA->yCoordinate += i+op->length; + pA->opIndex++; return pinchToFillOut; } - if (pA->op->op != query_delete) { - pA->xCoordinate += pA->paf->same_strand ? pA->op->length : -pA->op->length; + if (op->op != query_delete) { + pA->xCoordinate += pA->paf->same_strand ? op->length : -op->length; } - if (pA->op->op != query_insert) { - pA->yCoordinate += pA->op->length; + if (op->op != query_insert) { + pA->yCoordinate += op->length; } - pA->op = pA->op->next; + pA->opIndex++; } if (pA->paf->same_strand) { assert(pA->xCoordinate == pA->paf->query_end); diff --git a/caf/tests/pinchIteratorTest.c b/caf/tests/pinchIteratorTest.c index e1f46fc4f..8997e0144 100644 --- a/caf/tests/pinchIteratorTest.c +++ b/caf/tests/pinchIteratorTest.c @@ -23,8 +23,8 @@ static void testIterator(CuTest *testCase, stPinchIterator *pinchIterator, stLis sscanf(paf->target_name, "%" PRIi64 "", &contigY); int64_t x = paf->same_strand ? paf->query_start : paf->query_end; int64_t y = paf->target_start; - Cigar *c = paf->cigar; - while(c != NULL) { + for (int64_t ci = 0; ci < cigar_count(paf->cigar); ci++) { + CigarRecord *c = cigar_get(paf->cigar, ci); if (c->op == match) { if (c->length > 2 * trim) { stPinch *pinch = stPinchIterator_getNext(pinchIterator, &pinchToFillOut); @@ -44,7 +44,6 @@ static void testIterator(CuTest *testCase, stPinchIterator *pinchIterator, stLis if (c->op != query_insert) { y += c->length; } - c = c->next; } } CuAssertPtrEquals(testCase, NULL, stPinchIterator_getNext(pinchIterator, &pinchToFillOut)); @@ -65,23 +64,32 @@ static stList *getRandomPairwiseAlignments() { paf->target_start = st_randomInt(100000, 1000000); paf->same_strand = st_random() > 0.5; int64_t i = paf->query_start, j = paf->target_start; - Cigar **pc = &(paf->cigar); CigarOp p_op_type = query_delete; // This is a fudge to ensure that we don't end up with two matches in succession // in the cigar - because the pinch iterator will smush them together + int64_t capacity = 16, num_recs = 0; + CigarRecord *recs = st_malloc(capacity * sizeof(CigarRecord)); do { - Cigar *c = st_calloc(1, sizeof(Cigar)); - c->length = st_randomInt(1, 10); - c->op = st_random() > 0.3 ? ((st_random() > 0.5 && p_op_type != match) ? match : query_insert): query_delete; - p_op_type = c->op; - if (c->op != query_delete) { - i += c->length; + if (num_recs == capacity) { + capacity *= 2; + recs = realloc(recs, capacity * sizeof(CigarRecord)); } - if (c->op != query_insert) { - j += c->length; + recs[num_recs].length = st_randomInt(1, 10); + recs[num_recs].op = st_random() > 0.3 ? ((st_random() > 0.5 && p_op_type != match) ? match : query_insert): query_delete; + p_op_type = recs[num_recs].op; + if (recs[num_recs].op != query_delete) { + i += recs[num_recs].length; } - *pc = c; - pc = &(c->next); + if (recs[num_recs].op != query_insert) { + j += recs[num_recs].length; + } + num_recs++; } while(st_random() > 0.1 || paf->query_start == i || paf->target_start == j); + Cigar *cigar = st_calloc(1, sizeof(Cigar)); + cigar->recs = recs; + cigar->length = num_recs; + cigar->start = 0; + cigar->capacity = capacity; + paf->cigar = cigar; paf->query_end = i; paf->target_end = j; paf->query_length = i; diff --git a/src/cactus/cactus_progressive_config.xml b/src/cactus/cactus_progressive_config.xml index 4b821a953..999813d5b 100644 --- a/src/cactus/cactus_progressive_config.xml +++ b/src/cactus/cactus_progressive_config.xml @@ -79,6 +79,11 @@ + + diff --git a/src/cactus/paf/local_alignment.py b/src/cactus/paf/local_alignment.py index d49890225..84eb726c0 100755 --- a/src/cactus/paf/local_alignment.py +++ b/src/cactus/paf/local_alignment.py @@ -609,27 +609,86 @@ def chain_alignments(job, alignment_files, alignment_names, reference_event_name def chain_one_alignment(job, alignment_file, alignment_name, params, include_inverted_alignments): - """ run paffy chain on one PAF. include_inverted_alignemnts is a flag to control if we additionally include - the inverted paf records for chaining. + """ run paffy chain on one PAF. include_inverted_alignments is a flag to control if we additionally include + the inverted paf records for chaining. If chainSplitQueryLength is set in the config, splits the PAF by query + contig and chains each chunk in parallel before concatenating the results. """ work_dir = job.fileStore.getLocalTempDir() alignment_path = os.path.join(work_dir, alignment_name + '.paf') - alignment_inv_path = os.path.join(work_dir, alignment_name + '.inv.paf') output_path = os.path.join(work_dir, alignment_name + '.chained.paf') # Copy the alignments from the job store job.fileStore.readGlobalFile(alignment_file, alignment_path) - # Get the forward and reverse versions of each alignment for symmetry with chaining if include_inverted_alignments - # is set + # Append inverted alignments BEFORE splitting (inversion swaps query/target) if include_inverted_alignments: + alignment_inv_path = os.path.join(work_dir, alignment_name + '.inv.paf') shutil.copyfile(alignment_path, alignment_inv_path) cactus_call(parameters=['paffy', 'invert', "-i", alignment_inv_path], outfile=alignment_path, outappend=True, job_memory=job.memory) - # Now chain the alignments - cactus_call(parameters=['paffy', 'chain', "-i", alignment_path, + chain_params = ['paffy', 'chain', + "--maxGapLength", params.find("blast").attrib["chainMaxGapLength"], + "--chainGapOpen", params.find("blast").attrib["chainGapOpen"], + "--chainGapExtend", params.find("blast").attrib["chainGapExtend"], + "--trimFraction", params.find("blast").attrib["chainTrimFraction"], + "--logLevel", getLogLevelString()] + + split_query_length = getOptionalAttrib(params.find("blast"), "chainSplitQueryLength", + typeFn=int, default=0) + chain_split_min_file_size = getOptionalAttrib(params.find("blast"), "chainSplitMinFileSize", + typeFn=int, default=0) + paf_file_size = os.path.getsize(alignment_path) + + if paf_file_size >= chain_split_min_file_size: + # Split the PAF by query contig + split_dir = os.path.join(work_dir, 'split') + os.makedirs(split_dir) + split_prefix = os.path.join(split_dir, 'split_') + cactus_call(parameters=['paffy', 'split_file', '-i', alignment_path, '-q', + '-m', str(split_query_length), '-p', split_prefix], + job_memory=job.memory) + split_files = [os.path.join(split_dir, f) for f in sorted(os.listdir(split_dir)) + if f.endswith('.paf')] + else: + split_files = [] + + if len(split_files) <= 1: + # No splitting or only one chunk — chain directly (no fan-out overhead) + input_path = split_files[0] if split_files else alignment_path + cactus_call(parameters=chain_params + ["-i", input_path], + outfile=output_path, job_memory=job.memory) + job.fileStore.deleteGlobalFile(alignment_file) + return job.fileStore.writeGlobalFile(output_path) + + # Multiple chunks — fan out parallel chain jobs + root_job = Job() + job.addChild(root_job) + + chained_chunk_ids = [] + for split_file in split_files: + chunk_size = os.path.getsize(split_file) + chunk_file_id = job.fileStore.writeGlobalFile(split_file) + chained_chunk_ids.append( + root_job.addChildJobFn(chain_one_split_chunk, chunk_file_id, params, + disk=4 * chunk_size, + memory=cactus_clamp_memory(2 * chunk_size)).rv()) + + job.fileStore.deleteGlobalFile(alignment_file) + return root_job.addFollowOnJobFn(concatenate_chained_chunks, chained_chunk_ids, + disk=2 * alignment_file.size).rv() + + +def chain_one_split_chunk(job, chunk_file_id, params): + """Run paffy chain on a single split chunk of a PAF file.""" + work_dir = job.fileStore.getLocalTempDir() + chunk_path = os.path.join(work_dir, 'chunk.paf') + output_path = os.path.join(work_dir, 'chained_chunk.paf') + + job.fileStore.readGlobalFile(chunk_file_id, chunk_path) + + cactus_call(parameters=['paffy', 'chain', "-i", chunk_path, "--maxGapLength", params.find("blast").attrib["chainMaxGapLength"], "--chainGapOpen", params.find("blast").attrib["chainGapOpen"], "--chainGapExtend", params.find("blast").attrib["chainGapExtend"], @@ -637,11 +696,25 @@ def chain_one_alignment(job, alignment_file, alignment_name, params, include_inv "--logLevel", getLogLevelString()], outfile=output_path, job_memory=job.memory) - job.fileStore.deleteGlobalFile(alignment_file) + job.fileStore.deleteGlobalFile(chunk_file_id) + return job.fileStore.writeGlobalFile(output_path) + + +def concatenate_chained_chunks(job, chained_chunk_ids): + """Concatenate chained PAF chunks back into a single file.""" + work_dir = job.fileStore.getLocalTempDir() + output_path = os.path.join(work_dir, 'chained_combined.paf') + + with open(output_path, 'w') as out_file: + for chunk_id in chained_chunk_ids: + chunk_path = job.fileStore.readGlobalFile(chunk_id) + with open(chunk_path, 'r') as chunk_file: + shutil.copyfileobj(chunk_file, out_file) + job.fileStore.deleteGlobalFile(chunk_id) return job.fileStore.writeGlobalFile(output_path) - - + + def tile_alignments(job, alignment_files, reference_event_name, params, has_resources=False, total_sequence_size=0): # do everything post-chaining # Memory: paffy tile loads all PAFs + creates SequenceCountArray (2 bytes per base of query sequence) diff --git a/submodules/paffy b/submodules/paffy index 8b3c73246..2f2efc6ce 160000 --- a/submodules/paffy +++ b/submodules/paffy @@ -1 +1 @@ -Subproject commit 8b3c732460ad8c307161ec558ba72736b4fd421a +Subproject commit 2f2efc6ced353a04755c4af0c79a0ac9d20b086e diff --git a/submodules/sonLib b/submodules/sonLib index ba4cf9540..d70a79396 160000 --- a/submodules/sonLib +++ b/submodules/sonLib @@ -1 +1 @@ -Subproject commit ba4cf95406f1d29c292d12428a95fde420e19c52 +Subproject commit d70a793964798eb454d7e2fec03f22458106fd38