diff --git a/FAQ.md b/FAQ.md index 94a1bbce..6553003c 100644 --- a/FAQ.md +++ b/FAQ.md @@ -16,25 +16,48 @@ and then map noisy reads to contigs. By default, minimap2 indexes 4 billion reference bases (4Gb) in a batch and map all reads against each reference batch. Given a reference longer than 4Gb, minimap2 is unable to see all the sequences and thus can't produce a correct -SAM header. In this case, minimap2 doesn't output any SAM header. There are two -solutions to this issue. First, you may increase option `-I` to, for example, -`-I8g` to index more reference bases in a batch. This is preferred if your -machine has enough memory. Second, if your machines doesn't have enough memory -to hold the reference index, you can use the `--split-prefix` option in a -command line like: +SAM header. In this case, minimap2 doesn't output any SAM header. You may +increase option `-I` to, for example, `-I8g` to index more reference bases in a +batch. This is preferred if your machine has enough memory. Otherwise, see the +next question. + +#### 4. The reference index is too large to fit in memory. + +When the reference is longer than 4Gb (or as set by `-I` described above), +minimap2 processes it in batches and reports mappings for each batch +consecutively. The resulting output stream may therefore seem to report +multiple primary mappings for many reads. + +The `--split-prefix` option adds a postprocessing stage to merge results across +the batches and produce mappings more similar (but not exactly identical) to +those for one large reference index. + ```sh minimap2 -ax map-ont --split-prefix=tmp ref.fa reads.fq ``` -This second approach uses less memory, but it is slower and requires temporary -disk space. -#### 4. The output SAM is malformatted. +This writes out temporary files prefixed by `tmp`, using time and disk space. +See [Gamaarachchi et al. (2019)](https://www.nature.com/articles/s41598-019-40739-8) +for further information about this feature. + +**Distributed mapping.** The postprocessing merge algorithm can also be used on +intermediate mappings for separate index shards, allowing parallelization on a +cluster. To do this, first generate the reference index shards using +`minimap2 -ax sr -I 9999G -d N.idx ref.N.fa` for each *N* of the desired +shards. Then, map the reads against each shard using +`minimap2 -ax sr --split-map N.mappings N.idx reads_1.fq reads_2.fq` for each +*N* (in parallel as desired). Lastly, merge the intermediate mappings into a +final PAF or SAM with +`minimap2 -ax sr --split-merge reads_1.fq reads_2.fq . *.mappings`. Make sure +to supply the same configuration options to each invocation. + +#### 5. The output SAM is malformatted. This typically happens when you use nohup to wrap a minimap2 command line. Nohup is discouraged as it breaks piping. If you have to use nohup, please specify an output file with option `-o`. -#### 5. How to output one alignment per read? +#### 6. How to output one alignment per read? You can use `--secondary=no` to suppress secondary alignments (aka multiple mappings), but you can't suppress supplementary alignment (aka split or diff --git a/main.c b/main.c index 84d31297..5bd89ba7 100644 --- a/main.c +++ b/main.c @@ -75,6 +75,8 @@ static ko_longopt_t long_options[] = { { "qstrand", ko_no_argument, 348 }, { "cap-kalloc", ko_required_argument, 349 }, { "q-occ-frac", ko_required_argument, 350 }, + { "split-map", ko_required_argument, 351 }, + { "split-merge", ko_no_argument, 352 }, { "help", ko_no_argument, 'h' }, { "max-intron-len", ko_required_argument, 'G' }, { "version", ko_no_argument, 'V' }, @@ -116,13 +118,28 @@ static inline void yes_or_no(mm_mapopt_t *opt, int64_t flag, int long_idx, const } } +static int split_merge_cmd(int argc, char *argv[], mm_mapopt_t* opt) { + int n_query_fn = 0, n_intermediate_fn = 0; + char **query_fn = argv, **intermediate_fn = 0; + for (; n_query_fn < argc && strcmp(argv[n_query_fn], "."); ++n_query_fn); + if (n_query_fn < argc - 1) { + n_intermediate_fn = argc - n_query_fn - 1; + intermediate_fn = &argv[n_query_fn+1]; + } + if (!(n_query_fn && n_intermediate_fn)) { + fprintf(stderr, "[ERROR] with --split-merge, specify at least one query file and one --split-map intermediate file, separated by '.'\n"); + return 1; + } + return mm_split_merge(n_query_fn, (const char**) query_fn, opt, n_intermediate_fn, (const char**) intermediate_fn); +} + int main(int argc, char *argv[]) { const char *opt_str = "2aSDw:k:K:t:r:f:Vv:g:G:I:d:XT:s:x:Hcp:M:n:z:A:B:O:E:m:N:Qu:R:hF:LC:yYPo:e:U:"; ketopt_t o = KETOPT_INIT; mm_mapopt_t opt; mm_idxopt_t ipt; - int i, c, n_threads = 3, n_parts, old_best_n = -1; + int i, c, n_threads = 3, n_parts, old_best_n = -1, split_merge = 0; char *fnw = 0, *rg = 0, *junc_bed = 0, *s, *alt_list = 0; FILE *fp_help = stderr; mm_idx_reader_t *idx_rdr; @@ -231,6 +248,8 @@ int main(int argc, char *argv[]) else if (c == 348) opt.flag |= MM_F_QSTRAND | MM_F_NO_INV; // --qstrand else if (c == 349) opt.cap_kalloc = mm_parse_num(o.arg); // --cap-kalloc else if (c == 350) opt.q_occ_frac = atof(o.arg); // --q-occ-frac + else if (c == 351) opt.split_map = o.arg; // --split-map + else if (c == 352) split_merge = 1; // --split-merge else if (c == 330) { fprintf(stderr, "[WARNING] \033[1;31m --lj-min-ratio has been deprecated.\033[0m\n"); } else if (c == 314) { // --frag @@ -364,6 +383,14 @@ int main(int argc, char *argv[]) return fp_help == stdout? 0 : 1; } + if (split_merge) { + return split_merge_cmd(argc - o.ind, &argv[o.ind], &opt); + } + if (opt.split_map && opt.split_prefix) { + fprintf(stderr, "[ERROR] specify at most one of --split-prefix or --split-map\n"); + return 1; + } + if ((opt.flag & MM_F_SR) && argc - o.ind > 3) { fprintf(stderr, "[ERROR] incorrect input: in the sr mode, please specify no more than two query files.\n"); return 1; @@ -388,7 +415,7 @@ int main(int argc, char *argv[]) mm_idx_reader_close(idx_rdr); return 1; } - if ((opt.flag & MM_F_OUT_SAM) && idx_rdr->n_parts == 1) { + if ((opt.flag & MM_F_OUT_SAM) && idx_rdr->n_parts == 1 && !opt.split_map) { if (mm_idx_reader_eof(idx_rdr)) { if (opt.split_prefix == 0) ret = mm_write_sam_hdr(mi, rg, MM_VERSION, argc, argv); @@ -405,6 +432,10 @@ int main(int argc, char *argv[]) return 1; } } + if (opt.split_map && !mm_idx_reader_eof(idx_rdr)) { + fprintf(stderr, "[ERROR] use --split-prefix instead of --split-map for multi-part index file\n"); + return 1; + } if (mm_verbose >= 3) fprintf(stderr, "[M::%s::%.3f*%.2f] loaded/built the index for %d target sequence(s)\n", __func__, realtime() - mm_realtime0, cputime() / (realtime() - mm_realtime0), mi->n_seq); @@ -434,8 +465,10 @@ int main(int argc, char *argv[]) n_parts = idx_rdr->n_parts; mm_idx_reader_close(idx_rdr); - if (opt.split_prefix) - mm_split_merge(argc - (o.ind + 1), (const char**)&argv[o.ind + 1], &opt, n_parts); + if (opt.split_prefix) { + assert(!opt.split_map); + mm_split_merge_tmp(argc - (o.ind + 1), (const char**) &argv[o.ind + 1], &opt, n_parts); + } if (fflush(stdout) == EOF) { perror("[ERROR] failed to write the results"); diff --git a/map.c b/map.c index 7da9a49c..b1b4eb8d 100644 --- a/map.c +++ b/map.c @@ -484,6 +484,7 @@ static void merge_hits(step_t *s) for (j = 0, l = 0; j < s->p->n_parts; ++j) { for (t = 0; t < n_reg_part[j]; ++t, ++l) { mm_reg1_t *r = &s->reg[k][l]; + /* TODO: double-check all r->id match */ uint32_t capacity; mm_err_fread(r, sizeof(mm_reg1_t), 1, fp[j]); r->rid += s->p->rid_shift[j]; @@ -566,7 +567,8 @@ static void *worker_pipeline(void *shared, int step, void *in) int seg_st = s->seg_off[k], seg_en = s->seg_off[k] + s->n_seg[k]; for (i = seg_st; i < seg_en; ++i) { mm_bseq1_t *t = &s->seq[i]; - if (p->opt->split_prefix && p->n_parts == 0) { // then write to temporary files + if (p->fp_split && p->n_parts == 0) { // then write to intermediate file + assert(p->opt->split_prefix || p->opt->split_map); mm_err_fwrite(&s->n_reg[i], sizeof(int), 1, p->fp_split); mm_err_fwrite(&s->rep_len[i], sizeof(int), 1, p->fp_split); mm_err_fwrite(&s->frag_gap[i], sizeof(int), 1, p->fp_split); @@ -646,7 +648,9 @@ int mm_map_file_frag(const mm_idx_t *idx, int n_segs, const char **fn, const mm_ pl.n_threads = n_threads > 1? n_threads : 1; pl.mini_batch_size = opt->mini_batch_size; if (opt->split_prefix) - pl.fp_split = mm_split_init(opt->split_prefix, idx); + pl.fp_split = mm_split_init_tmp(opt->split_prefix, idx); + else if (opt->split_map) + pl.fp_split = mm_split_init(opt->split_map, idx); pl_threads = n_threads == 1? 1 : (opt->flag&MM_F_2_IO_THREADS)? 3 : 2; kt_pipeline(pl_threads, worker_pipeline, &pl, 3); @@ -663,7 +667,7 @@ int mm_map_file(const mm_idx_t *idx, const char *fn, const mm_mapopt_t *opt, int return mm_map_file_frag(idx, 1, &fn, opt, n_threads); } -int mm_split_merge(int n_segs, const char **fn, const mm_mapopt_t *opt, int n_split_idx) +int mm_split_merge(int n_segs, const char **fn, const mm_mapopt_t *opt, int n_split_idx, const char **fn_intermediates) { int i; pipeline_t pl; @@ -679,7 +683,7 @@ int mm_split_merge(int n_segs, const char **fn, const mm_mapopt_t *opt, int n_sp pl.n_parts = n_split_idx; pl.fp_parts = CALLOC(FILE*, pl.n_parts); pl.rid_shift = CALLOC(uint32_t, pl.n_parts); - pl.mi = mi = mm_split_merge_prep(opt->split_prefix, n_split_idx, pl.fp_parts, pl.rid_shift); + pl.mi = mi = mm_split_merge_prep(fn_intermediates, n_split_idx, pl.fp_parts, pl.rid_shift); if (pl.mi == 0) { free(pl.fp_parts); free(pl.rid_shift); @@ -704,6 +708,19 @@ int mm_split_merge(int n_segs, const char **fn, const mm_mapopt_t *opt, int n_sp for (i = 0; i < pl.n_fp; ++i) mm_bseq_close(pl.fp[i]); free(pl.fp); - mm_split_rm_tmp(opt->split_prefix, n_split_idx); return 0; } + +int mm_split_merge_tmp(int n_segs, const char **fn, const mm_mapopt_t *opt, int n_split_idx) +{ + char **fn_intermediates; + int ret, i; + fn_intermediates = mm_split_tmp_intermediates(opt->split_prefix, n_split_idx); + ret = mm_split_merge(n_segs, fn, opt, n_split_idx, (const char**) fn_intermediates); + for (i = 0; i < n_split_idx; ++i) { + remove(fn_intermediates[i]); + free(fn_intermediates[i]); + } + free(fn_intermediates); + return ret; +} diff --git a/minimap.h b/minimap.h index 2204b6cd..423e0dad 100644 --- a/minimap.h +++ b/minimap.h @@ -171,7 +171,8 @@ typedef struct { int64_t max_sw_mat; int64_t cap_kalloc; - const char *split_prefix; + const char *split_prefix; // temp file prefix for mapping to split index (sequentially within one process) + const char *split_map; // intermediate output file for mapping to one part of a split index } mm_mapopt_t; // index reader diff --git a/mmpriv.h b/mmpriv.h index a2b5a80d..a20b0a6b 100644 --- a/mmpriv.h +++ b/mmpriv.h @@ -104,10 +104,13 @@ mm_seg_t *mm_seg_gen(void *km, uint32_t hash, int n_segs, const int *qlens, int void mm_seg_free(void *km, int n_segs, mm_seg_t *segs); void mm_pair(void *km, int max_gap_ref, int dp_bonus, int sub_diff, int match_sc, const int *qlens, int *n_regs, mm_reg1_t **regs); -FILE *mm_split_init(const char *prefix, const mm_idx_t *mi); -mm_idx_t *mm_split_merge_prep(const char *prefix, int n_splits, FILE **fp, uint32_t *n_seq_part); -int mm_split_merge(int n_segs, const char **fn, const mm_mapopt_t *opt, int n_split_idx); -void mm_split_rm_tmp(const char *prefix, int n_splits); +FILE *mm_split_init(const char *fn, const mm_idx_t *mi); +FILE *mm_split_init_tmp(const char *prefix, const mm_idx_t *mi); +mm_idx_t *mm_split_merge_prep(const char **intermediates, int n_splits, FILE **fp, uint32_t *n_seq_part); +mm_idx_t *mm_split_merge_prep_tmp(const char *prefix, int n_splits, FILE **fp, uint32_t *n_seq_part); +int mm_split_merge(int n_segs, const char **fn, const mm_mapopt_t *opt, int n_split_idx, const char **fn_intermediates); +int mm_split_merge_tmp(int n_segs, const char **fn, const mm_mapopt_t *opt, int n_split_idx); +char** mm_split_tmp_intermediates(const char *prefix, int n_splits); void mm_err_puts(const char *str); void mm_err_fwrite(const void *p, size_t size, size_t nitems, FILE *fp); diff --git a/splitidx.c b/splitidx.c index 507e498f..8321e7fc 100644 --- a/splitidx.c +++ b/splitidx.c @@ -5,16 +5,13 @@ #include #include "mmpriv.h" -FILE *mm_split_init(const char *prefix, const mm_idx_t *mi) +FILE *mm_split_init(const char *fn, const mm_idx_t *mi) { - char *fn; FILE *fp; uint32_t i, k = mi->k; - fn = (char*)calloc(strlen(prefix) + 10, 1); - sprintf(fn, "%s.%.4d.tmp", prefix, mi->index); if ((fp = fopen(fn, "wb")) == NULL) { if (mm_verbose >= 1) - fprintf(stderr, "[ERROR]\033[1;31m failed to write to temporary file '%s'\033[0m: %s\n", fn, strerror(errno)); + fprintf(stderr, "[ERROR]\033[1;31m failed to write to intermediate file '%s'\033[0m: %s\n", fn, strerror(errno)); exit(1); } mm_err_fwrite(&k, 4, 1, fp); @@ -26,30 +23,35 @@ FILE *mm_split_init(const char *prefix, const mm_idx_t *mi) mm_err_fwrite(mi->seq[i].name, 1, l, fp); mm_err_fwrite(&mi->seq[i].len, 4, 1, fp); } + return fp; +} + +FILE *mm_split_init_tmp(const char *prefix, const mm_idx_t *mi) +{ + FILE *fp; + char *fn; + fn = (char*)calloc(strlen(prefix) + 10, 1); + sprintf(fn, "%s.%.4d.tmp", prefix, mi->index); + fp = mm_split_init(fn, mi); free(fn); return fp; } -mm_idx_t *mm_split_merge_prep(const char *prefix, int n_splits, FILE **fp, uint32_t *n_seq_part) +mm_idx_t *mm_split_merge_prep(const char **intermediates, int n_splits, FILE **fp, uint32_t *n_seq_part) { mm_idx_t *mi = 0; - char *fn; int i, j; if (n_splits < 1) return 0; - fn = CALLOC(char, strlen(prefix) + 10); for (i = 0; i < n_splits; ++i) { - sprintf(fn, "%s.%.4d.tmp", prefix, i); - if ((fp[i] = fopen(fn, "rb")) == 0) { + if ((fp[i] = fopen(intermediates[i], "rb")) == 0) { if (mm_verbose >= 1) - fprintf(stderr, "ERROR: failed to open temporary file '%s': %s\n", fn, strerror(errno)); + fprintf(stderr, "ERROR: failed to open intermediate file '%s': %s\n", intermediates[i], strerror(errno)); for (j = 0; j < i; ++j) fclose(fp[j]); - free(fn); return 0; } } - free(fn); mi = CALLOC(mm_idx_t, 1); for (i = 0; i < n_splits; ++i) { @@ -71,14 +73,40 @@ mm_idx_t *mm_split_merge_prep(const char *prefix, int n_splits, FILE **fp, uint3 return mi; } -void mm_split_rm_tmp(const char *prefix, int n_splits) +mm_idx_t *mm_split_merge_prep_tmp(const char *prefix, int n_splits, FILE **fp, uint32_t *n_seq_part) { + char **filenames; int i; - char *fn; - fn = CALLOC(char, strlen(prefix) + 10); + mm_idx_t *mi; + + if (n_splits < 1) return 0; + filenames = CALLOC(char*, n_splits); for (i = 0; i < n_splits; ++i) { - sprintf(fn, "%s.%.4d.tmp", prefix, i); - remove(fn); + filenames[i] = CALLOC(char, strlen(prefix) + 10); + sprintf(filenames[i], "%s.%.4d.tmp", prefix, i); } - free(fn); + + mi = mm_split_merge_prep((const char**)filenames, n_splits, fp, n_seq_part); + + for (i = 0; i < n_splits; ++i) { + free(filenames[i]); + } + free(filenames); + + return mi; +} + +char** mm_split_tmp_intermediates(const char *prefix, int n_splits) +{ + char **filenames; + int i; + + if (n_splits < 1) return 0; + filenames = CALLOC(char*, n_splits); + for (i = 0; i < n_splits; ++i) { + filenames[i] = CALLOC(char, strlen(prefix) + 10); + sprintf(filenames[i], "%s.%.4d.tmp", prefix, i); + } + + return filenames; }