From 47780d87648b832d587b8fda90372ceae9ad8c11 Mon Sep 17 00:00:00 2001 From: ygditu Date: Mon, 20 Jul 2020 10:53:08 +0800 Subject: [PATCH] solve conflict by merge dv to master at 0.0.5-beta --- README.md | 13 ++++++ const.go | 36 +++++++++++++++- load.go | 125 +++++++++++++++++++++++++++++++++++++++++++++++++++++- main.go | 74 +++++++++++++++++++++++--------- 4 files changed, 225 insertions(+), 23 deletions(-) diff --git a/README.md b/README.md index 2eece0f..40b2d57 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,18 @@ # REDItools2 +**Created at 2020.07.14** + + +A go version of REDItools2 + +1. Too much faster. + - The original Python version takes 3mins + - The `multiprocessing version` in previous commits, takes 1m22s with 10 processes + - The go version takes 1min with 1 process, 7~9s with 10 processes +2. More information, even more editing sites. + +--- + **REDItools2** is the optimized, parallel multi-node version of [ REDItools](https://github.com/BioinfoUNIBA/REDItools). REDItools takes in input a RNA-Seq (or DNA-Seq BAM) file and outputs a table of RNA-Seq editing events. Here is an example of REDItools's output: diff --git a/const.go b/const.go index 9c7131b..0bd9b27 100644 --- a/const.go +++ b/const.go @@ -8,6 +8,7 @@ import ( "time" "github.com/biogo/biogo/alphabet" + "github.com/biogo/hts/bgzf" "github.com/biogo/hts/sam" "github.com/voxelbrain/goptions" "go.uber.org/zap" @@ -19,14 +20,14 @@ var conf config var region *Region const ( - VERSION = "0.0.2-low memory" + VERSION = "0.0.2-beta" DefaultBaseQuality = 30 ) type config struct { Config string `goptions:"-c, --config, description='Config file'"` Version bool `goptions:"-v, --version, description='Show version'"` - Debug bool `goptions:"--debug, description='Show debug info'"` + Debug bool `goptions:"--debug, description='Show debug info'" long:"debug" description:"Config file"` File string `goptions:"-f, --file, description='The bam file to be analyzed'"` Output string `goptions:"-o, --output-file, description='The output statistics file'"` Process int `goptions:"-p, --process, description='How many process to use'"` @@ -123,6 +124,37 @@ func decodeRegion(region string) *Region { return res } +type ChanChunk struct { + Ref string + Start int + End int + Chunks bgzf.Chunk +} + +func (c *ChanChunk) ToRegion() *Region { + return &Region{ + Chrom: c.Ref, + Start: c.Start, + End: c.End, + } +} + +func (c *ChanChunk) SwitchRegion() *Region { + ref := c.Ref + + if strings.HasPrefix(ref, "chr") { + ref = strings.ReplaceAll(ref, "chr", "") + } else { + ref = "chr" + ref + } + + return &Region{ + Chrom: ref, + Start: c.Start, + End: c.End, + } +} + type Omopolymeric struct { Chromosome string NegEquals int diff --git a/load.go b/load.go index b60790b..b56eb1d 100644 --- a/load.go +++ b/load.go @@ -3,6 +3,13 @@ package main import ( "bufio" "compress/gzip" + "io" + "math" + "os" + "regexp" + "strconv" + "strings" + "github.com/biogo/biogo/alphabet" "github.com/biogo/biogo/io/seqio" "github.com/biogo/biogo/io/seqio/fasta" @@ -368,6 +375,89 @@ func loadSplicingPositions() (map[string]*set.Set, error) { return res, nil } +// splitRegion is used to spit a number of size into chunks +func splitRegion(end, chunks int) []int { + regions := []int{0} + + bk := end / maxInt(chunks, 1) + + for i := 0; i < chunks; i++ { + if i == chunks-1 { + regions = append(regions, end) + } else { + regions = append(regions, (i+1)*bk) + } + } + return regions +} + +// adjustRegion is used to move the end site a little backwards +func adjustRegion(regions []int, idx *bam.Index, ref *sam.Reference, bamReader *bam.Reader) ([]*ChanChunk, error) { + + res := make([]*ChanChunk, 0, 0) + for i := 0; i < len(regions); i++ { + if i > 0 && i < len(regions)-1 { + chunks, err := idx.Chunks(ref, regions[i], regions[i+1]) + if err != nil { + sugar.Debugf("%v", regions) + // return res, errors.Wrapf(err, "failed before adjust %s: %d - %d", ref.Name(), region[i], region[i+1]) + continue + } + + if len(chunks) == 0 { + continue + } + + iter, err := bam.NewIterator(bamReader, chunks) + if err != nil { + return res, err + } + + lastEnd := 0 + for iter.Next() { + record := iter.Record() + + if lastEnd == 0 { + lastEnd = record.End() + } else if record.Start() > lastEnd { + break + } + } + + if lastEnd > 0 { + regions[i] = lastEnd + } + } + } + + for i := 1; i < len(regions); i++ { + // avoid duplicated regions + if regions[i-1] > regions[i] { + regions[i] = regions[i-1] + continue + } + + chunks, err := idx.Chunks(ref, regions[i-1], regions[i]) + if err != nil { + continue + // return res, errors.Wrapf(err, "failed after adjust %s: %d - %d", ref.Name(), region[i-1], region[i]) + } + //res = append(res, chunks...) + + for _, c := range chunks { + res = append(res, &ChanChunk{ + Ref: ref.Name(), + Start: regions[i-1], + End: regions[i], + Chunks: c, + }) + } + + } + + return res, nil +} + func fetchBamRefs() ([]string, error) { res := make([]string, 0, 0) ifh, err := os.Open(conf.File) @@ -384,7 +474,40 @@ func fetchBamRefs() ([]string, error) { } for _, ref := range bamReader.Header().Refs() { - res = append(res, ref.Name()) + length, err := strconv.Atoi(ref.Get(sam.NewTag("LN"))) + if err != nil { + return res, err + } + + regions := make([]int, 0, 0) + if region.Empty() { + regions = splitRegion(length, maxInt(conf.Process, 1)) + } else if ref.Name() == region.Chrom { + if region.End != 0 { + length = region.End + } + + for _, i := range splitRegion(length-region.Start, maxInt(conf.Process, 1)) { + regions = append(regions, region.Start+i) + } + } + + //sugar.Infof("%s - %d", ref.Name(), len(regions)) + + if len(regions) == 0 { + continue + } + sugar.Debugf("make chunks of %s", ref.Name()) + temp, err := adjustRegion(regions, idx, ref, bamReader) + if err != nil { + return res, err + } + + if len(temp) == 0 { + continue + } + + res[ref.Name()] = temp } return res, nil } diff --git a/main.go b/main.go index 9a7fc8d..22a602c 100644 --- a/main.go +++ b/main.go @@ -24,6 +24,7 @@ func writer(w chan string, wg *sync.WaitGroup) { f, err := os.OpenFile(conf.Output, mode, 0644) if err != nil { sugar.Fatalf("failed to open %s: %s", conf.Output, err.Error()) + os.Exit(1) } defer f.Close() @@ -81,17 +82,19 @@ func worker(wg *sync.WaitGroup, refs chan *Region, w chan string, omopolymericPo break } - chrRef, err := fetchFasta(ref) - if err != nil { - sugar.Warnf("try to modify %s", ref.Chrom) - if strings.HasPrefix(ref.Chrom, "chr") { - chrRef, err = fetchFasta(&Region{Chrom: strings.ReplaceAll(ref.Chrom, "chr", "")}) - } else { - chrRef, err = fetchFasta(&Region{Chrom: "chr" + ref.Chrom}) - } - } - if err != nil { - sugar.Fatal(err) + //chrRef, err := fetchFasta(ref.ToRegion()) + //if err != nil { + // sugar.Warnf("try to modify %s", ref) + // chrRef, err = fetchFasta(ref.SwitchRegion()) + //} + //if err != nil { + // sugar.Fatal(err) + //} + + chrRef, ok := chrRefs[ref.Ref] + if !ok { + sugar.Errorf("failed to get %s from reference", ref.Ref) + continue } iter, err := fetchBam(ref) @@ -126,8 +129,6 @@ func worker(wg *sync.WaitGroup, refs chan *Region, w chan string, omopolymericPo sugar.Fatal(record.SeqString()) } - // record.QueryPosition = append(record.QueryPosition, at) - genomic := start + record.Start if _, ok := edits[genomic]; !ok { @@ -165,6 +166,8 @@ func main() { setLogger(conf.Debug, conf.Log) + sugar.Debugf("options: %v", conf) + if conf.Version { sugar.Infof("current version: %v", VERSION) os.Exit(0) @@ -212,6 +215,39 @@ func main() { var wg sync.WaitGroup w := make(chan string) + refs := make(chan *ChanChunk) + + references, err := fetchBamRefs() + if err != nil { + sugar.Fatal(err) + } + + sugar.Infof("load reference from %s", conf.Reference) + chrRefs := make(map[string][]byte) + for ref, _ := range references { + wg.Add(1) + go func(ref string, wg *sync.WaitGroup, lock *sync.Mutex) { + defer wg.Done() + temp, err := fetchFasta(&Region{Chrom: ref}) + if err != nil { + sugar.Warnf("try to modify %s", ref) + if strings.HasPrefix(ref, "chr") { + temp, err = fetchFasta(&Region{Chrom: strings.ReplaceAll(ref, "chr", "")}) + } else { + temp, err = fetchFasta(&Region{Chrom: "chr" + ref}) + } + } + if err != nil { + sugar.Fatal(err) + } + lock.Lock() + chrRefs[ref] = temp + lock.Unlock() + }(ref, &wg, &lock) + } + + wg.Wait() + wg.Add(1) go writer(w, &wg) refs := make(chan *Region) @@ -221,14 +257,12 @@ func main() { wg.Add(1) } - if region.Empty() { - if references, err := fetchBamRefs(); err == nil { - for _, r := range references { - refs <- &Region{Chrom: r} - } + for ref, chunks := range references { + sugar.Infof("read reads from %s", ref) + for _, c := range chunks { + sugar.Debug(c) + refs <- c } - } else { - refs <- region } close(refs)