Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

split-index mapping in separate processes #787

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 33 additions & 10 deletions FAQ.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,48 @@ and then map noisy reads to contigs.
By default, minimap2 indexes 4 billion reference bases (4Gb) in a batch and map
all reads against each reference batch. Given a reference longer than 4Gb,
minimap2 is unable to see all the sequences and thus can't produce a correct
SAM header. In this case, minimap2 doesn't output any SAM header. There are two
solutions to this issue. First, you may increase option `-I` to, for example,
`-I8g` to index more reference bases in a batch. This is preferred if your
machine has enough memory. Second, if your machines doesn't have enough memory
to hold the reference index, you can use the `--split-prefix` option in a
command line like:
SAM header. In this case, minimap2 doesn't output any SAM header. You may
increase option `-I` to, for example, `-I8g` to index more reference bases in a
batch. This is preferred if your machine has enough memory. Otherwise, see the
next question.

#### 4. The reference index is too large to fit in memory.

When the reference is longer than 4Gb (or as set by `-I` described above),
minimap2 processes it in batches and reports mappings for each batch
consecutively. The resulting output stream may therefore seem to report
multiple primary mappings for many reads.

The `--split-prefix` option adds a postprocessing stage to merge results across
the batches and produce mappings more similar (but not exactly identical) to
those for one large reference index.

```sh
minimap2 -ax map-ont --split-prefix=tmp ref.fa reads.fq
```
This second approach uses less memory, but it is slower and requires temporary
disk space.

#### 4. The output SAM is malformatted.
This writes out temporary files prefixed by `tmp`, using time and disk space.
See [Gamaarachchi et al. (2019)](https://www.nature.com/articles/s41598-019-40739-8)
for further information about this feature.

**Distributed mapping.** The postprocessing merge algorithm can also be used on
intermediate mappings for separate index shards, allowing parallelization on a
cluster. To do this, first generate the reference index shards using
`minimap2 -ax sr -I 9999G -d N.idx ref.N.fa` for each *N* of the desired
shards. Then, map the reads against each shard using
`minimap2 -ax sr --split-map N.mappings N.idx reads_1.fq reads_2.fq` for each
*N* (in parallel as desired). Lastly, merge the intermediate mappings into a
final PAF or SAM with
`minimap2 -ax sr --split-merge reads_1.fq reads_2.fq . *.mappings`. Make sure
to supply the same configuration options to each invocation.

#### 5. The output SAM is malformatted.

This typically happens when you use nohup to wrap a minimap2 command line.
Nohup is discouraged as it breaks piping. If you have to use nohup, please
specify an output file with option `-o`.

#### 5. How to output one alignment per read?
#### 6. How to output one alignment per read?

You can use `--secondary=no` to suppress secondary alignments (aka multiple
mappings), but you can't suppress supplementary alignment (aka split or
Expand Down
41 changes: 37 additions & 4 deletions main.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ static ko_longopt_t long_options[] = {
{ "qstrand", ko_no_argument, 348 },
{ "cap-kalloc", ko_required_argument, 349 },
{ "q-occ-frac", ko_required_argument, 350 },
{ "split-map", ko_required_argument, 351 },
{ "split-merge", ko_no_argument, 352 },
{ "help", ko_no_argument, 'h' },
{ "max-intron-len", ko_required_argument, 'G' },
{ "version", ko_no_argument, 'V' },
Expand Down Expand Up @@ -116,13 +118,28 @@ static inline void yes_or_no(mm_mapopt_t *opt, int64_t flag, int long_idx, const
}
}

static int split_merge_cmd(int argc, char *argv[], mm_mapopt_t* opt) {
int n_query_fn = 0, n_intermediate_fn = 0;
char **query_fn = argv, **intermediate_fn = 0;
for (; n_query_fn < argc && strcmp(argv[n_query_fn], "."); ++n_query_fn);
if (n_query_fn < argc - 1) {
n_intermediate_fn = argc - n_query_fn - 1;
intermediate_fn = &argv[n_query_fn+1];
}
if (!(n_query_fn && n_intermediate_fn)) {
fprintf(stderr, "[ERROR] with --split-merge, specify at least one query file and one --split-map intermediate file, separated by '.'\n");
return 1;
}
return mm_split_merge(n_query_fn, (const char**) query_fn, opt, n_intermediate_fn, (const char**) intermediate_fn);
}

int main(int argc, char *argv[])
{
const char *opt_str = "2aSDw:k:K:t:r:f:Vv:g:G:I:d:XT:s:x:Hcp:M:n:z:A:B:O:E:m:N:Qu:R:hF:LC:yYPo:e:U:";
ketopt_t o = KETOPT_INIT;
mm_mapopt_t opt;
mm_idxopt_t ipt;
int i, c, n_threads = 3, n_parts, old_best_n = -1;
int i, c, n_threads = 3, n_parts, old_best_n = -1, split_merge = 0;
char *fnw = 0, *rg = 0, *junc_bed = 0, *s, *alt_list = 0;
FILE *fp_help = stderr;
mm_idx_reader_t *idx_rdr;
Expand Down Expand Up @@ -231,6 +248,8 @@ int main(int argc, char *argv[])
else if (c == 348) opt.flag |= MM_F_QSTRAND | MM_F_NO_INV; // --qstrand
else if (c == 349) opt.cap_kalloc = mm_parse_num(o.arg); // --cap-kalloc
else if (c == 350) opt.q_occ_frac = atof(o.arg); // --q-occ-frac
else if (c == 351) opt.split_map = o.arg; // --split-map
else if (c == 352) split_merge = 1; // --split-merge
else if (c == 330) {
fprintf(stderr, "[WARNING] \033[1;31m --lj-min-ratio has been deprecated.\033[0m\n");
} else if (c == 314) { // --frag
Expand Down Expand Up @@ -364,6 +383,14 @@ int main(int argc, char *argv[])
return fp_help == stdout? 0 : 1;
}

if (split_merge) {
return split_merge_cmd(argc - o.ind, &argv[o.ind], &opt);
}
if (opt.split_map && opt.split_prefix) {
fprintf(stderr, "[ERROR] specify at most one of --split-prefix or --split-map\n");
return 1;
}

if ((opt.flag & MM_F_SR) && argc - o.ind > 3) {
fprintf(stderr, "[ERROR] incorrect input: in the sr mode, please specify no more than two query files.\n");
return 1;
Expand All @@ -388,7 +415,7 @@ int main(int argc, char *argv[])
mm_idx_reader_close(idx_rdr);
return 1;
}
if ((opt.flag & MM_F_OUT_SAM) && idx_rdr->n_parts == 1) {
if ((opt.flag & MM_F_OUT_SAM) && idx_rdr->n_parts == 1 && !opt.split_map) {
if (mm_idx_reader_eof(idx_rdr)) {
if (opt.split_prefix == 0)
ret = mm_write_sam_hdr(mi, rg, MM_VERSION, argc, argv);
Expand All @@ -405,6 +432,10 @@ int main(int argc, char *argv[])
return 1;
}
}
if (opt.split_map && !mm_idx_reader_eof(idx_rdr)) {
fprintf(stderr, "[ERROR] use --split-prefix instead of --split-map for multi-part index file\n");
return 1;
}
if (mm_verbose >= 3)
fprintf(stderr, "[M::%s::%.3f*%.2f] loaded/built the index for %d target sequence(s)\n",
__func__, realtime() - mm_realtime0, cputime() / (realtime() - mm_realtime0), mi->n_seq);
Expand Down Expand Up @@ -434,8 +465,10 @@ int main(int argc, char *argv[])
n_parts = idx_rdr->n_parts;
mm_idx_reader_close(idx_rdr);

if (opt.split_prefix)
mm_split_merge(argc - (o.ind + 1), (const char**)&argv[o.ind + 1], &opt, n_parts);
if (opt.split_prefix) {
assert(!opt.split_map);
mm_split_merge_tmp(argc - (o.ind + 1), (const char**) &argv[o.ind + 1], &opt, n_parts);
}

if (fflush(stdout) == EOF) {
perror("[ERROR] failed to write the results");
Expand Down
27 changes: 22 additions & 5 deletions map.c
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,7 @@ static void merge_hits(step_t *s)
for (j = 0, l = 0; j < s->p->n_parts; ++j) {
for (t = 0; t < n_reg_part[j]; ++t, ++l) {
mm_reg1_t *r = &s->reg[k][l];
/* TODO: double-check all r->id match */
uint32_t capacity;
mm_err_fread(r, sizeof(mm_reg1_t), 1, fp[j]);
r->rid += s->p->rid_shift[j];
Expand Down Expand Up @@ -566,7 +567,8 @@ static void *worker_pipeline(void *shared, int step, void *in)
int seg_st = s->seg_off[k], seg_en = s->seg_off[k] + s->n_seg[k];
for (i = seg_st; i < seg_en; ++i) {
mm_bseq1_t *t = &s->seq[i];
if (p->opt->split_prefix && p->n_parts == 0) { // then write to temporary files
if (p->fp_split && p->n_parts == 0) { // then write to intermediate file
assert(p->opt->split_prefix || p->opt->split_map);
mm_err_fwrite(&s->n_reg[i], sizeof(int), 1, p->fp_split);
mm_err_fwrite(&s->rep_len[i], sizeof(int), 1, p->fp_split);
mm_err_fwrite(&s->frag_gap[i], sizeof(int), 1, p->fp_split);
Expand Down Expand Up @@ -646,7 +648,9 @@ int mm_map_file_frag(const mm_idx_t *idx, int n_segs, const char **fn, const mm_
pl.n_threads = n_threads > 1? n_threads : 1;
pl.mini_batch_size = opt->mini_batch_size;
if (opt->split_prefix)
pl.fp_split = mm_split_init(opt->split_prefix, idx);
pl.fp_split = mm_split_init_tmp(opt->split_prefix, idx);
else if (opt->split_map)
pl.fp_split = mm_split_init(opt->split_map, idx);
pl_threads = n_threads == 1? 1 : (opt->flag&MM_F_2_IO_THREADS)? 3 : 2;
kt_pipeline(pl_threads, worker_pipeline, &pl, 3);

Expand All @@ -663,7 +667,7 @@ int mm_map_file(const mm_idx_t *idx, const char *fn, const mm_mapopt_t *opt, int
return mm_map_file_frag(idx, 1, &fn, opt, n_threads);
}

int mm_split_merge(int n_segs, const char **fn, const mm_mapopt_t *opt, int n_split_idx)
int mm_split_merge(int n_segs, const char **fn, const mm_mapopt_t *opt, int n_split_idx, const char **fn_intermediates)
{
int i;
pipeline_t pl;
Expand All @@ -679,7 +683,7 @@ int mm_split_merge(int n_segs, const char **fn, const mm_mapopt_t *opt, int n_sp
pl.n_parts = n_split_idx;
pl.fp_parts = CALLOC(FILE*, pl.n_parts);
pl.rid_shift = CALLOC(uint32_t, pl.n_parts);
pl.mi = mi = mm_split_merge_prep(opt->split_prefix, n_split_idx, pl.fp_parts, pl.rid_shift);
pl.mi = mi = mm_split_merge_prep(fn_intermediates, n_split_idx, pl.fp_parts, pl.rid_shift);
if (pl.mi == 0) {
free(pl.fp_parts);
free(pl.rid_shift);
Expand All @@ -704,6 +708,19 @@ int mm_split_merge(int n_segs, const char **fn, const mm_mapopt_t *opt, int n_sp
for (i = 0; i < pl.n_fp; ++i)
mm_bseq_close(pl.fp[i]);
free(pl.fp);
mm_split_rm_tmp(opt->split_prefix, n_split_idx);
return 0;
}

int mm_split_merge_tmp(int n_segs, const char **fn, const mm_mapopt_t *opt, int n_split_idx)
{
char **fn_intermediates;
int ret, i;
fn_intermediates = mm_split_tmp_intermediates(opt->split_prefix, n_split_idx);
ret = mm_split_merge(n_segs, fn, opt, n_split_idx, (const char**) fn_intermediates);
for (i = 0; i < n_split_idx; ++i) {
remove(fn_intermediates[i]);
free(fn_intermediates[i]);
}
free(fn_intermediates);
return ret;
}
3 changes: 2 additions & 1 deletion minimap.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ typedef struct {
int64_t max_sw_mat;
int64_t cap_kalloc;

const char *split_prefix;
const char *split_prefix; // temp file prefix for mapping to split index (sequentially within one process)
const char *split_map; // intermediate output file for mapping to one part of a split index
} mm_mapopt_t;

// index reader
Expand Down
11 changes: 7 additions & 4 deletions mmpriv.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,13 @@ mm_seg_t *mm_seg_gen(void *km, uint32_t hash, int n_segs, const int *qlens, int
void mm_seg_free(void *km, int n_segs, mm_seg_t *segs);
void mm_pair(void *km, int max_gap_ref, int dp_bonus, int sub_diff, int match_sc, const int *qlens, int *n_regs, mm_reg1_t **regs);

FILE *mm_split_init(const char *prefix, const mm_idx_t *mi);
mm_idx_t *mm_split_merge_prep(const char *prefix, int n_splits, FILE **fp, uint32_t *n_seq_part);
int mm_split_merge(int n_segs, const char **fn, const mm_mapopt_t *opt, int n_split_idx);
void mm_split_rm_tmp(const char *prefix, int n_splits);
FILE *mm_split_init(const char *fn, const mm_idx_t *mi);
FILE *mm_split_init_tmp(const char *prefix, const mm_idx_t *mi);
mm_idx_t *mm_split_merge_prep(const char **intermediates, int n_splits, FILE **fp, uint32_t *n_seq_part);
mm_idx_t *mm_split_merge_prep_tmp(const char *prefix, int n_splits, FILE **fp, uint32_t *n_seq_part);
int mm_split_merge(int n_segs, const char **fn, const mm_mapopt_t *opt, int n_split_idx, const char **fn_intermediates);
int mm_split_merge_tmp(int n_segs, const char **fn, const mm_mapopt_t *opt, int n_split_idx);
char** mm_split_tmp_intermediates(const char *prefix, int n_splits);

void mm_err_puts(const char *str);
void mm_err_fwrite(const void *p, size_t size, size_t nitems, FILE *fp);
Expand Down
66 changes: 47 additions & 19 deletions splitidx.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,13 @@
#include <errno.h>
#include "mmpriv.h"

FILE *mm_split_init(const char *prefix, const mm_idx_t *mi)
FILE *mm_split_init(const char *fn, const mm_idx_t *mi)
{
char *fn;
FILE *fp;
uint32_t i, k = mi->k;
fn = (char*)calloc(strlen(prefix) + 10, 1);
sprintf(fn, "%s.%.4d.tmp", prefix, mi->index);
if ((fp = fopen(fn, "wb")) == NULL) {
if (mm_verbose >= 1)
fprintf(stderr, "[ERROR]\033[1;31m failed to write to temporary file '%s'\033[0m: %s\n", fn, strerror(errno));
fprintf(stderr, "[ERROR]\033[1;31m failed to write to intermediate file '%s'\033[0m: %s\n", fn, strerror(errno));
exit(1);
}
mm_err_fwrite(&k, 4, 1, fp);
Expand All @@ -26,30 +23,35 @@ FILE *mm_split_init(const char *prefix, const mm_idx_t *mi)
mm_err_fwrite(mi->seq[i].name, 1, l, fp);
mm_err_fwrite(&mi->seq[i].len, 4, 1, fp);
}
return fp;
}

FILE *mm_split_init_tmp(const char *prefix, const mm_idx_t *mi)
{
FILE *fp;
char *fn;
fn = (char*)calloc(strlen(prefix) + 10, 1);
sprintf(fn, "%s.%.4d.tmp", prefix, mi->index);
fp = mm_split_init(fn, mi);
free(fn);
return fp;
}

mm_idx_t *mm_split_merge_prep(const char *prefix, int n_splits, FILE **fp, uint32_t *n_seq_part)
mm_idx_t *mm_split_merge_prep(const char **intermediates, int n_splits, FILE **fp, uint32_t *n_seq_part)
{
mm_idx_t *mi = 0;
char *fn;
int i, j;

if (n_splits < 1) return 0;
fn = CALLOC(char, strlen(prefix) + 10);
for (i = 0; i < n_splits; ++i) {
sprintf(fn, "%s.%.4d.tmp", prefix, i);
if ((fp[i] = fopen(fn, "rb")) == 0) {
if ((fp[i] = fopen(intermediates[i], "rb")) == 0) {
if (mm_verbose >= 1)
fprintf(stderr, "ERROR: failed to open temporary file '%s': %s\n", fn, strerror(errno));
fprintf(stderr, "ERROR: failed to open intermediate file '%s': %s\n", intermediates[i], strerror(errno));
for (j = 0; j < i; ++j)
fclose(fp[j]);
free(fn);
return 0;
}
}
free(fn);

mi = CALLOC(mm_idx_t, 1);
for (i = 0; i < n_splits; ++i) {
Expand All @@ -71,14 +73,40 @@ mm_idx_t *mm_split_merge_prep(const char *prefix, int n_splits, FILE **fp, uint3
return mi;
}

void mm_split_rm_tmp(const char *prefix, int n_splits)
mm_idx_t *mm_split_merge_prep_tmp(const char *prefix, int n_splits, FILE **fp, uint32_t *n_seq_part)
{
char **filenames;
int i;
char *fn;
fn = CALLOC(char, strlen(prefix) + 10);
mm_idx_t *mi;

if (n_splits < 1) return 0;
filenames = CALLOC(char*, n_splits);
for (i = 0; i < n_splits; ++i) {
sprintf(fn, "%s.%.4d.tmp", prefix, i);
remove(fn);
filenames[i] = CALLOC(char, strlen(prefix) + 10);
sprintf(filenames[i], "%s.%.4d.tmp", prefix, i);
}
free(fn);

mi = mm_split_merge_prep((const char**)filenames, n_splits, fp, n_seq_part);

for (i = 0; i < n_splits; ++i) {
free(filenames[i]);
}
free(filenames);

return mi;
}

char** mm_split_tmp_intermediates(const char *prefix, int n_splits)
{
char **filenames;
int i;

if (n_splits < 1) return 0;
filenames = CALLOC(char*, n_splits);
for (i = 0; i < n_splits; ++i) {
filenames[i] = CALLOC(char, strlen(prefix) + 10);
sprintf(filenames[i], "%s.%.4d.tmp", prefix, i);
}

return filenames;
}