Skip to content

Commit

Permalink
solve conflict by merge dv to master at 0.0.5-beta
Browse files Browse the repository at this point in the history
  • Loading branch information
ygidtu committed Jul 20, 2020
1 parent 140b0c7 commit 47780d8
Show file tree
Hide file tree
Showing 4 changed files with 225 additions and 23 deletions.
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
# REDItools2

**Created at 2020.07.14**


A go version of REDItools2

1. Too much faster.
- The original Python version takes 3mins
- The `multiprocessing version` in previous commits, takes 1m22s with 10 processes
- The go version takes 1min with 1 process, 7~9s with 10 processes
2. More information, even more editing sites.

---

**REDItools2** is the optimized, parallel multi-node version of [<i class="icon-link"></i> REDItools](https://github.com/BioinfoUNIBA/REDItools).

REDItools takes in input a RNA-Seq (or DNA-Seq BAM) file and outputs a table of RNA-Seq editing events. Here is an example of REDItools's output:
Expand Down
36 changes: 34 additions & 2 deletions const.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/biogo/biogo/alphabet"
"github.com/biogo/hts/bgzf"
"github.com/biogo/hts/sam"
"github.com/voxelbrain/goptions"
"go.uber.org/zap"
Expand All @@ -19,14 +20,14 @@ var conf config
var region *Region

const (
VERSION = "0.0.2-low memory"
VERSION = "0.0.2-beta"
DefaultBaseQuality = 30
)

type config struct {
Config string `goptions:"-c, --config, description='Config file'"`
Version bool `goptions:"-v, --version, description='Show version'"`
Debug bool `goptions:"--debug, description='Show debug info'"`
Debug bool `goptions:"--debug, description='Show debug info'" long:"debug" description:"Config file"`
File string `goptions:"-f, --file, description='The bam file to be analyzed'"`
Output string `goptions:"-o, --output-file, description='The output statistics file'"`
Process int `goptions:"-p, --process, description='How many process to use'"`
Expand Down Expand Up @@ -123,6 +124,37 @@ func decodeRegion(region string) *Region {
return res
}

type ChanChunk struct {
Ref string
Start int
End int
Chunks bgzf.Chunk
}

func (c *ChanChunk) ToRegion() *Region {
return &Region{
Chrom: c.Ref,
Start: c.Start,
End: c.End,
}
}

func (c *ChanChunk) SwitchRegion() *Region {
ref := c.Ref

if strings.HasPrefix(ref, "chr") {
ref = strings.ReplaceAll(ref, "chr", "")
} else {
ref = "chr" + ref
}

return &Region{
Chrom: ref,
Start: c.Start,
End: c.End,
}
}

type Omopolymeric struct {
Chromosome string
NegEquals int
Expand Down
125 changes: 124 additions & 1 deletion load.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@ package main
import (
"bufio"
"compress/gzip"
"io"
"math"
"os"
"regexp"
"strconv"
"strings"

"github.com/biogo/biogo/alphabet"
"github.com/biogo/biogo/io/seqio"
"github.com/biogo/biogo/io/seqio/fasta"
Expand Down Expand Up @@ -368,6 +375,89 @@ func loadSplicingPositions() (map[string]*set.Set, error) {
return res, nil
}

// splitRegion is used to spit a number of size into chunks
func splitRegion(end, chunks int) []int {
regions := []int{0}

bk := end / maxInt(chunks, 1)

for i := 0; i < chunks; i++ {
if i == chunks-1 {
regions = append(regions, end)
} else {
regions = append(regions, (i+1)*bk)
}
}
return regions
}

// adjustRegion is used to move the end site a little backwards
func adjustRegion(regions []int, idx *bam.Index, ref *sam.Reference, bamReader *bam.Reader) ([]*ChanChunk, error) {

res := make([]*ChanChunk, 0, 0)
for i := 0; i < len(regions); i++ {
if i > 0 && i < len(regions)-1 {
chunks, err := idx.Chunks(ref, regions[i], regions[i+1])
if err != nil {
sugar.Debugf("%v", regions)
// return res, errors.Wrapf(err, "failed before adjust %s: %d - %d", ref.Name(), region[i], region[i+1])
continue
}

if len(chunks) == 0 {
continue
}

iter, err := bam.NewIterator(bamReader, chunks)
if err != nil {
return res, err
}

lastEnd := 0
for iter.Next() {
record := iter.Record()

if lastEnd == 0 {
lastEnd = record.End()
} else if record.Start() > lastEnd {
break
}
}

if lastEnd > 0 {
regions[i] = lastEnd
}
}
}

for i := 1; i < len(regions); i++ {
// avoid duplicated regions
if regions[i-1] > regions[i] {
regions[i] = regions[i-1]
continue
}

chunks, err := idx.Chunks(ref, regions[i-1], regions[i])
if err != nil {
continue
// return res, errors.Wrapf(err, "failed after adjust %s: %d - %d", ref.Name(), region[i-1], region[i])
}
//res = append(res, chunks...)

for _, c := range chunks {
res = append(res, &ChanChunk{
Ref: ref.Name(),
Start: regions[i-1],
End: regions[i],
Chunks: c,
})
}

}

return res, nil
}

func fetchBamRefs() ([]string, error) {
res := make([]string, 0, 0)
ifh, err := os.Open(conf.File)
Expand All @@ -384,7 +474,40 @@ func fetchBamRefs() ([]string, error) {
}

for _, ref := range bamReader.Header().Refs() {
res = append(res, ref.Name())
length, err := strconv.Atoi(ref.Get(sam.NewTag("LN")))
if err != nil {
return res, err
}

regions := make([]int, 0, 0)
if region.Empty() {
regions = splitRegion(length, maxInt(conf.Process, 1))
} else if ref.Name() == region.Chrom {
if region.End != 0 {
length = region.End
}

for _, i := range splitRegion(length-region.Start, maxInt(conf.Process, 1)) {
regions = append(regions, region.Start+i)
}
}

//sugar.Infof("%s - %d", ref.Name(), len(regions))

if len(regions) == 0 {
continue
}
sugar.Debugf("make chunks of %s", ref.Name())
temp, err := adjustRegion(regions, idx, ref, bamReader)
if err != nil {
return res, err
}

if len(temp) == 0 {
continue
}

res[ref.Name()] = temp
}
return res, nil
}
Expand Down
74 changes: 54 additions & 20 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func writer(w chan string, wg *sync.WaitGroup) {
f, err := os.OpenFile(conf.Output, mode, 0644)
if err != nil {
sugar.Fatalf("failed to open %s: %s", conf.Output, err.Error())
os.Exit(1)
}
defer f.Close()

Expand Down Expand Up @@ -81,17 +82,19 @@ func worker(wg *sync.WaitGroup, refs chan *Region, w chan string, omopolymericPo
break
}

chrRef, err := fetchFasta(ref)
if err != nil {
sugar.Warnf("try to modify %s", ref.Chrom)
if strings.HasPrefix(ref.Chrom, "chr") {
chrRef, err = fetchFasta(&Region{Chrom: strings.ReplaceAll(ref.Chrom, "chr", "")})
} else {
chrRef, err = fetchFasta(&Region{Chrom: "chr" + ref.Chrom})
}
}
if err != nil {
sugar.Fatal(err)
//chrRef, err := fetchFasta(ref.ToRegion())
//if err != nil {
// sugar.Warnf("try to modify %s", ref)
// chrRef, err = fetchFasta(ref.SwitchRegion())
//}
//if err != nil {
// sugar.Fatal(err)
//}

chrRef, ok := chrRefs[ref.Ref]
if !ok {
sugar.Errorf("failed to get %s from reference", ref.Ref)
continue
}

iter, err := fetchBam(ref)
Expand Down Expand Up @@ -126,8 +129,6 @@ func worker(wg *sync.WaitGroup, refs chan *Region, w chan string, omopolymericPo
sugar.Fatal(record.SeqString())
}

// record.QueryPosition = append(record.QueryPosition, at)

genomic := start + record.Start

if _, ok := edits[genomic]; !ok {
Expand Down Expand Up @@ -165,6 +166,8 @@ func main() {

setLogger(conf.Debug, conf.Log)

sugar.Debugf("options: %v", conf)

if conf.Version {
sugar.Infof("current version: %v", VERSION)
os.Exit(0)
Expand Down Expand Up @@ -212,6 +215,39 @@ func main() {

var wg sync.WaitGroup
w := make(chan string)
refs := make(chan *ChanChunk)

references, err := fetchBamRefs()
if err != nil {
sugar.Fatal(err)
}

sugar.Infof("load reference from %s", conf.Reference)
chrRefs := make(map[string][]byte)
for ref, _ := range references {
wg.Add(1)
go func(ref string, wg *sync.WaitGroup, lock *sync.Mutex) {
defer wg.Done()
temp, err := fetchFasta(&Region{Chrom: ref})
if err != nil {
sugar.Warnf("try to modify %s", ref)
if strings.HasPrefix(ref, "chr") {
temp, err = fetchFasta(&Region{Chrom: strings.ReplaceAll(ref, "chr", "")})
} else {
temp, err = fetchFasta(&Region{Chrom: "chr" + ref})
}
}
if err != nil {
sugar.Fatal(err)
}
lock.Lock()
chrRefs[ref] = temp
lock.Unlock()
}(ref, &wg, &lock)
}

wg.Wait()

wg.Add(1)
go writer(w, &wg)
refs := make(chan *Region)
Expand All @@ -221,14 +257,12 @@ func main() {
wg.Add(1)
}

if region.Empty() {
if references, err := fetchBamRefs(); err == nil {
for _, r := range references {
refs <- &Region{Chrom: r}
}
for ref, chunks := range references {
sugar.Infof("read reads from %s", ref)
for _, c := range chunks {
sugar.Debug(c)
refs <- c
}
} else {
refs <- region
}

close(refs)
Expand Down

0 comments on commit 47780d8

Please sign in to comment.