diff --git a/const.go b/const.go index 0bd9b27..0b197b5 100644 --- a/const.go +++ b/const.go @@ -20,13 +20,14 @@ var conf config var region *Region const ( - VERSION = "0.0.2-beta" + VERSION = "0.0.3-beta" DefaultBaseQuality = 30 ) type config struct { Config string `goptions:"-c, --config, description='Config file'"` Version bool `goptions:"-v, --version, description='Show version'"` + Mode bool `goptions:"--fast, description='Usage fast mode with higher memory usage'"` Debug bool `goptions:"--debug, description='Show debug info'" long:"debug" description:"Config file"` File string `goptions:"-f, --file, description='The bam file to be analyzed'"` Output string `goptions:"-o, --output-file, description='The output statistics file'"` diff --git a/fast.go b/fast.go new file mode 100644 index 0000000..eb4308e --- /dev/null +++ b/fast.go @@ -0,0 +1,152 @@ +package main + +import ( + "github.com/biogo/hts/sam" + "github.com/golang-collections/collections/set" + "strings" + "sync" +) + +func workerFast( + wg *sync.WaitGroup, refs chan *ChanChunk, w chan string, + omopolymericPositions, splicePositions, targetPositions map[string]*set.Set, + chrRefs map[string][]byte) { + defer wg.Done() + + for { + ref, ok := <-refs + if !ok { + break + } + + //chrRef, err := fetchFasta(ref.ToRegion()) + //if err != nil { + // sugar.Warnf("try to modify %s", ref) + // chrRef, err = fetchFasta(ref.SwitchRegion()) + //} + //if err != nil { + // sugar.Fatal(err) + //} + + chrRef, ok := chrRefs[ref.Ref] + if !ok { + sugar.Errorf("failed to get %s from reference", ref.Ref) + continue + } + + iter, err := fetchBamFast(ref.Chunks) + if err != nil { + sugar.Fatal(err) + } + + lastEnd := 0 + total := 0 + edits := make(map[int]*EditsInfo) + for iter.Next() { + record := NewRecord(iter.Record()) + + if lastEnd == 0 { + lastEnd = record.End + } + + total++ + + if !filterReads(record) { + continue + } + + start, index := 0, 0 + for _, i := range record.Cigar { + if i.Type() == sam.CigarMatch { + for j := 1; j <= i.Len(); j++ { + at := index + j - 1 + + if at >= record.Seq.Length { + sugar.Errorf("%s - %d - %d - %d", record.Cigar, index, at, record.Seq.Length) + sugar.Fatal(record.SeqString()) + } + + genomic := start + record.Start + + if _, ok := edits[genomic]; !ok { + edits[genomic] = NewEditsInfo(ref.Ref, chrRef[genomic-1], genomic) + } + + edits[genomic].AddReads(record, at) + start++ + } + index += i.Len() + } else if i.Type() != sam.CigarDeletion || i.Type() != sam.CigarHardClipped || i.Type() != sam.CigarInsertion { + start += i.Len() + } + } + + if record.Start > lastEnd { + getColumn(edits, []map[string]*set.Set{omopolymericPositions, splicePositions}, targetPositions, w) + + edits = make(map[int]*EditsInfo) + lastEnd = record.End + } + } + + getColumn(edits, []map[string]*set.Set{omopolymericPositions, splicePositions}, targetPositions, w) + } + + w <- "done" +} + +func fastMode( + wg *sync.WaitGroup, w chan string, + omopolymericPositions, spicePositions, targetPositions map[string]*set.Set) { + var lock sync.Mutex + refs := make(chan *ChanChunk) + + references, err := fetchBamRefsFast() + if err != nil { + sugar.Fatal(err) + } + + sugar.Infof("load reference from %s", conf.Reference) + chrRefs := make(map[string][]byte) + for ref, _ := range references { + wg.Add(1) + go func(ref string, wg *sync.WaitGroup, lock *sync.Mutex) { + defer wg.Done() + temp, err := fetchFasta(&Region{Chrom: ref}) + if err != nil { + sugar.Warnf("try to modify %s", ref) + if strings.HasPrefix(ref, "chr") { + temp, err = fetchFasta(&Region{Chrom: strings.ReplaceAll(ref, "chr", "")}) + } else { + temp, err = fetchFasta(&Region{Chrom: "chr" + ref}) + } + } + if err != nil { + sugar.Fatal(err) + } + lock.Lock() + chrRefs[ref] = temp + lock.Unlock() + }(ref, wg, &lock) + } + + wg.Wait() + + wg.Add(1) + go writer(w, wg) + + for i := 0; i < conf.Process; i++ { + go workerFast(wg, refs, w, omopolymericPositions, spicePositions, targetPositions, chrRefs) + wg.Add(1) + } + + for ref, chunks := range references { + sugar.Infof("read reads from %s", ref) + for _, c := range chunks { + sugar.Debug(c) + refs <- c + } + } + + close(refs) +} diff --git a/go.mod b/go.mod index 713d772..05b891b 100644 --- a/go.mod +++ b/go.mod @@ -5,11 +5,10 @@ go 1.14 require ( github.com/biogo/biogo v1.0.2 github.com/biogo/hts v1.1.0 - github.com/cheggaaa/pb v2.0.7+incompatible - github.com/cheggaaa/pb/v3 v3.0.4 github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3 github.com/mattn/go-colorable v0.1.7 // indirect github.com/pkg/errors v0.8.1 + github.com/schollz/progressbar/v3 v3.3.4 github.com/voxelbrain/goptions v0.0.0-20180630082107-58cddc247ea2 go.uber.org/zap v1.15.0 gopkg.in/VividCortex/ewma.v1 v1.1.1 // indirect diff --git a/go.sum b/go.sum index 0ccfa15..3496735 100644 --- a/go.sum +++ b/go.sum @@ -12,11 +12,6 @@ github.com/biogo/hts v1.1.0 h1:J6Na8+e+MYyEpV1B762GUvt4mhtPn3tnQQaxWyKo5rk= github.com/biogo/hts v1.1.0/go.mod h1:6C9MdMt9ALD5PsluK5n0B0svHOpmVse3UjQQx/cTgOw= github.com/biogo/store v0.0.0-20200104231603-2c6ad937eb83 h1:wQjRDdhQmyjOlYPJGS/FHKMA13hgT4anRXIWjHX84zU= github.com/biogo/store v0.0.0-20200104231603-2c6ad937eb83/go.mod h1:wdbXg77soR6ESRprAMEwAQDFtLT6EAGF5o1GRy0cB5k= -github.com/cheggaaa/pb v1.0.28 h1:kWGpdAcSp3MxMU9CCHOwz/8V0kCHN4+9yQm2MzWuI98= -github.com/cheggaaa/pb v2.0.7+incompatible h1:gLKifR1UkZ/kLkda5gC0K6c8g+jU2sINPtBeOiNlMhU= -github.com/cheggaaa/pb v2.0.7+incompatible/go.mod h1:pQciLPpbU0oxA0h+VJYYLxO+XeDQb5pZijXscXHm81s= -github.com/cheggaaa/pb/v3 v3.0.4 h1:QZEPYOj2ix6d5oEg63fbHmpolrnNiwjUsk+h74Yt4bM= -github.com/cheggaaa/pb/v3 v3.0.4/go.mod h1:7rgWxLrAUcFMkvJuv09+DYi7mMUYi8nO9iOWcvGJPfw= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys= @@ -25,6 +20,7 @@ github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2 github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3 h1:zN2lZNZRflqFyxVaTIU61KNKQ9C0055u9CAfpmqUvo4= github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3/go.mod h1:nPpo7qLxd6XL3hWJG/O60sR8ZKfMCiIoNap5GvD12KU= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= +github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213/go.mod h1:vNUNkEQ1e29fT/6vq2aBdFsgNPmy8qMdSay1npru+Sw= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kortschak/utter v0.0.0-20190412033250-50fe362e6560/go.mod h1:oDr41C7kH9wvAikWyFhr6UFr8R7nelpmCF5XR5rL7I8= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= @@ -42,10 +38,17 @@ github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHX github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/mattn/go-runewidth v0.0.7 h1:Ei8KR0497xHyKJPAv59M1dkC+rOZCMBJ+t3fZ+twI54= github.com/mattn/go-runewidth v0.0.7/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= +github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0= +github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= +github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db h1:62I3jR2EmQ4l5rM/4FEfDWcRD+abF5XlKShorW5LRoQ= +github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db/go.mod h1:l0dey0ia/Uv7NcFFVbCLtqEBQbrT4OCwCSKTEv6enCw= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/schollz/progressbar v1.0.0 h1:gbyFReLHDkZo8mxy/dLWMr+Mpb1MokGJ1FqCiqacjZM= +github.com/schollz/progressbar/v3 v3.3.4 h1:nMinx+JaEm/zJz4cEyClQeAw5rsYSB5th3xv+5lV6Vg= +github.com/schollz/progressbar/v3 v3.3.4/go.mod h1:Rp5lZwpgtYmlvmGo1FyDwXMqagyRBQYSDwzlP9QDu84= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= @@ -82,6 +85,8 @@ golang.org/x/sys v0.0.0-20191128015809-6d18c012aee9/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae h1:/WDfKMnPU+m5M4xB+6x4kaepxRw6jWvR5iDRdvjHgy8= golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1 h1:ogLJMz+qpzav7lGMh10LMvAkM/fAoGlaiiHYiFYdm80= +golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= diff --git a/load.go b/load.go index b56eb1d..0278ce8 100644 --- a/load.go +++ b/load.go @@ -9,6 +9,7 @@ import ( "regexp" "strconv" "strings" + "sync" "github.com/biogo/biogo/alphabet" "github.com/biogo/biogo/io/seqio" @@ -17,15 +18,10 @@ import ( "github.com/biogo/hts/bam" "github.com/biogo/hts/bgzf" "github.com/biogo/hts/fai" - "github.com/cheggaaa/pb/v3" + "github.com/biogo/hts/sam" "github.com/golang-collections/collections/set" "github.com/pkg/errors" - "io" - "math" - "os" - "regexp" - "strconv" - "strings" + "github.com/schollz/progressbar/v3" ) // loadBedFile is function that load bed files @@ -47,12 +43,14 @@ func loadBedFile() (map[string]*set.Set, error) { } defer f.Close() - bar := pb.New64(stats.Size()) - r := bar.NewProxyReader(f) + bar := progressbar.DefaultBytes( + stats.Size(), + "loading", + ) - reader := bufio.NewReader(r) + reader := bufio.NewReader(f) if strings.HasSuffix(conf.BedFile, ".gz") { - gr, err := gzip.NewReader(r) + gr, err := gzip.NewReader(f) if err != nil { return targetPosition, errors.Wrapf(err, "failed to open %s", conf.BedFile) } @@ -69,6 +67,8 @@ func loadBedFile() (map[string]*set.Set, error) { } } + bar.Add(len([]byte(line))) + fields := strings.Split(strings.TrimSpace(line), "\t") chr := fields[0] @@ -167,43 +167,75 @@ func createOmopolymericPositions() error { return err } - for _, chromosome := range chromosomes { - // fasta.Reader requires a known type template to fill - // with FASTA data. Here we use *linear.Seq. - template := linear.NewSeq(chromosome, nil, alphabet.DNAredundant) - r := fasta.NewReader(f, template) - - // Make a seqio.Scanner to simplify iterating over a - // stream of data. - sc := seqio.NewScanner(r) - - // Iterate through each sequence in a multifasta and examine the - // ID, description and sequence data. - for sc.Next() { - // Get the current sequence and type assert to *linear.Seq. - // While this is unnecessary here, it can be useful to have - // the concrete type. - s := sc.Seq().(*linear.Seq) - - equals := 0 - var last alphabet.Letter - for i, b := range s.Seq { - if b == last { - equals += 1 - } else { - if equals >= conf.OmopolymericSpan { - positions = append(positions, &Omopolymeric{ - Chromosome: chromosome, NegEquals: i - equals, - I: i, Equals: equals, Last: last, - }) + chromChan := make(chan string) + outputChan := make(chan []*Omopolymeric) + + var wg sync.WaitGroup + var lock sync.Mutex + + for i := 0; i < maxInt(conf.Process, 1); i++ { + wg.Add(1) + go func(outputChan chan []*Omopolymeric, chromChan chan string, wg *sync.WaitGroup, lock *sync.Mutex) { + for { + chromosome, ok := <-chromChan + if !ok { + break + } + sugar.Debugf("Loading fasta of %s", chromosome) + tempPositions := make([]*Omopolymeric, 0, 0) + // fasta.Reader requires a known type template to fill + // with FASTA data. Here we use *linear.Seq. + template := linear.NewSeq(chromosome, nil, alphabet.DNAredundant) + r := fasta.NewReader(f, template) + + // Make a seqio.Scanner to simplify iterating over a + // stream of data. + sc := seqio.NewScanner(r) + + // Iterate through each sequence in a multifasta and examine the + // ID, description and sequence data. + for sc.Next() { + // Get the current sequence and type assert to *linear.Seq. + // While this is unnecessary here, it can be useful to have + // the concrete type. + s := sc.Seq().(*linear.Seq) + + equals := 0 + var last alphabet.Letter + for i, b := range s.Seq { + if b == last { + equals += 1 + } else { + if equals >= conf.OmopolymericSpan { + + tempPositions = append(tempPositions, &Omopolymeric{ + Chromosome: chromosome, NegEquals: i - equals, + I: i, Equals: equals, Last: last, + }) + + } + equals = 1 + } + last = b } - equals = 1 } - last = b + + lock.Lock() + positions = append(positions, tempPositions...) + lock.Unlock() } - } + + defer wg.Done() + }(outputChan, chromChan, &wg, &lock) } + for _, chromosome := range chromosomes { + chromChan <- chromosome + } + + close(chromChan) + wg.Wait() + sugar.Infof("%d total omopolymeric positions found.", len(positions)) sugar.Infof("Writing omopolymeric positions to file: %s.", conf.OmopolymericFile) @@ -215,16 +247,24 @@ func createOmopolymericPositions() error { defer f.Close() writer := bufio.NewWriter(f) + defer writer.Flush() if _, err := writer.WriteString("#" + strings.Join([]string{"Chromomosome", "Start", "End", "Length", "Symbol"}, "\t") + "\n"); err != nil { return err } + bar := progressbar.Default(int64(len(positions)), "writing") + for _, position := range positions { - if _, err := writer.WriteString(position.String() + "\n"); err != nil { - return err + data := position.String() + "\n" + bar.Add(1) + if _, err := writer.WriteString(data); err != nil { + sugar.Fatal(err) + } else { + } } + bar.Finish() return nil } @@ -237,12 +277,22 @@ func loadOmopolymericPositions() (map[string]*set.Set, error) { sugar.Infof("Loading omopolymeric positions from file %s", conf.OmopolymericFile) + stats, err := os.Stat(conf.OmopolymericFile) + if os.IsNotExist(err) { + return positions, errors.New(conf.OmopolymericFile + " not exists") + } + f, err := os.Open(conf.OmopolymericFile) if err != nil { return positions, errors.Wrapf(err, "failed to open %s", conf.OmopolymericFile) } defer f.Close() + bar := progressbar.DefaultBytes( + stats.Size(), + "loading", + ) + r := bufio.NewReader(f) total := 0 @@ -259,6 +309,7 @@ func loadOmopolymericPositions() (map[string]*set.Set, error) { if strings.HasPrefix(line, "#") { continue } + bar.Add(len([]byte(line))) fields := strings.Split(strings.TrimSpace(line), "\t") if region.Chrom == "" || fields[0] == region.Chrom { @@ -295,7 +346,9 @@ func loadOmopolymericPositions() (map[string]*set.Set, error) { } } - sugar.Infof("[%v] %d total omopolymeric positions found.", region, total) + bar.Finish() + + sugar.Infof("%d total omopolymeric positions found.", total) return positions, nil } @@ -318,12 +371,14 @@ func loadSplicingPositions() (map[string]*set.Set, error) { } defer f.Close() - bar := pb.New64(stats.Size()) - r := bar.NewProxyReader(f) + bar := progressbar.DefaultBytes( + stats.Size(), + "loading", + ) - reader := bufio.NewReader(r) + reader := bufio.NewReader(f) if strings.HasSuffix(conf.SplicingFile, ".gz") { - gr, err := gzip.NewReader(r) + gr, err := gzip.NewReader(f) if err != nil { return res, errors.Wrapf(err, "failed to open %s", conf.SplicingFile) } @@ -341,7 +396,7 @@ func loadSplicingPositions() (map[string]*set.Set, error) { return res, err } } - + bar.Add(len([]byte(line))) lines := regexp.MustCompile("\\s+").Split(strings.TrimSpace(line), -1) chrom := lines[0] @@ -372,6 +427,8 @@ func loadSplicingPositions() (map[string]*set.Set, error) { res[chrom] = temp } + bar.Finish() + return res, nil } @@ -458,8 +515,8 @@ func adjustRegion(regions []int, idx *bam.Index, ref *sam.Reference, bamReader * return res, nil } -func fetchBamRefs() ([]string, error) { - res := make([]string, 0, 0) +func fetchBamRefsFast() (map[string][]*ChanChunk, error) { + res := make(map[string][]*ChanChunk) ifh, err := os.Open(conf.File) //Panic if something went wrong: if err != nil { @@ -512,6 +569,27 @@ func fetchBamRefs() ([]string, error) { return res, nil } +func fetchBamRefs() ([]string, error) { + res := make([]string, 0, 0) + ifh, err := os.Open(conf.File) + //Panic if something went wrong: + if err != nil { + return res, err + } + + //Create a new BAM reader with maximum + //concurrency: + bamReader, err := bam.NewReader(ifh, 0) + if err != nil { + return res, err + } + + for _, ref := range bamReader.Header().Refs() { + res = append(res, ref.Name()) + } + return res, nil +} + func fetchFasta(region *Region) ([]byte, error) { ifh, err := os.Open(conf.Reference) if err != nil { @@ -550,6 +628,54 @@ func fetchFasta(region *Region) ([]byte, error) { return res, err } +func fetchBamFast(region bgzf.Chunk) (*bam.Iterator, error) { + ifh, err := os.Open(conf.File) + //Panic if something went wrong: + if err != nil { + return nil, err + } + + idxF, err := os.Open(conf.File + ".bai") + if err != nil { + return nil, err + } + defer idxF.Close() + + idx, err := bam.ReadIndex(idxF) + if err != nil { + return nil, err + } + + //Create a new BAM reader with maximum + //concurrency: + bamReader, err := bam.NewReader(ifh, 1) + if err != nil { + return nil, err + } + + chunks := make([]bgzf.Chunk, 0, 0) + + for _, ref := range bamReader.Header().Refs() { + if region.Chrom != "" && ref.Name() == region.Chrom { + if region.Start == 0 && region.End == 0 { + if stats, ok := idx.ReferenceStats(ref.ID()); ok { + chunks = append(chunks, stats.Chunk) + } + } else if region.Chrom != "" && region.Start > 0 && region.End > 0 { + if tempChunks, err := idx.Chunks(ref, region.Start, region.End); err != nil { + chunks = append(chunks, tempChunks...) + } + } + } else if region.Chrom == "" { + if stats, ok := idx.ReferenceStats(ref.ID()); ok { + chunks = append(chunks, stats.Chunk) + } + } + } + + return bam.NewIterator(bamReader, chunks) +} + func fetchBam(region *Region) (*bam.Iterator, error) { sugar.Infof("read reads from %s", region.String()) ifh, err := os.Open(conf.File) diff --git a/main.go b/main.go index 22a602c..79bac6c 100644 --- a/main.go +++ b/main.go @@ -7,8 +7,6 @@ import ( "strings" "sync" - "github.com/biogo/hts/sam" - "github.com/golang-collections/collections/set" "github.com/voxelbrain/goptions" ) @@ -73,93 +71,6 @@ func writer(w chan string, wg *sync.WaitGroup) { wg.Done() } -func worker(wg *sync.WaitGroup, refs chan *Region, w chan string, omopolymericPositions, splicePositions, targetPositions map[string]*set.Set) { - defer wg.Done() - - for { - ref, ok := <-refs - if !ok { - break - } - - //chrRef, err := fetchFasta(ref.ToRegion()) - //if err != nil { - // sugar.Warnf("try to modify %s", ref) - // chrRef, err = fetchFasta(ref.SwitchRegion()) - //} - //if err != nil { - // sugar.Fatal(err) - //} - - chrRef, ok := chrRefs[ref.Ref] - if !ok { - sugar.Errorf("failed to get %s from reference", ref.Ref) - continue - } - - iter, err := fetchBam(ref) - if err != nil { - sugar.Fatal(err) - } - - lastEnd := 0 - total := 0 - edits := make(map[int]*EditsInfo) - for iter.Next() { - record := NewRecord(iter.Record()) - - if lastEnd == 0 { - lastEnd = record.End - } - - total++ - - if !filterReads(record) { - continue - } - - start, index := 0, 0 - for _, i := range record.Cigar { - if i.Type() == sam.CigarMatch { - for j := 1; j <= i.Len(); j++ { - at := index + j - 1 - - if at >= record.Seq.Length { - sugar.Errorf("%s - %d - %d - %d", record.Cigar, index, at, record.Seq.Length) - sugar.Fatal(record.SeqString()) - } - - genomic := start + record.Start - - if _, ok := edits[genomic]; !ok { - edits[genomic] = NewEditsInfo(ref.Chrom, chrRef[genomic-1], genomic) - } - - edits[genomic].AddReads(record, at) - start++ - } - index += i.Len() - } else if i.Type() != sam.CigarDeletion || i.Type() != sam.CigarHardClipped || i.Type() != sam.CigarInsertion { - start += i.Len() - } - } - - if record.Start > lastEnd { - getColumn(edits, []map[string]*set.Set{omopolymericPositions, splicePositions}, targetPositions, w) - - edits = make(map[int]*EditsInfo) - lastEnd = record.End - } - } - - getColumn(edits, []map[string]*set.Set{omopolymericPositions, splicePositions}, targetPositions, w) - - sugar.Debugf("read %d reads from %s", total, ref) - } - - w <- "done" -} - func main() { conf = defaultConfig() goptions.ParseAndFail(&conf) @@ -214,58 +125,15 @@ func main() { sugar.Infof("Narrowing REDItools to region %s", region.String()) var wg sync.WaitGroup - w := make(chan string) - refs := make(chan *ChanChunk) - - references, err := fetchBamRefs() - if err != nil { - sugar.Fatal(err) - } - sugar.Infof("load reference from %s", conf.Reference) - chrRefs := make(map[string][]byte) - for ref, _ := range references { - wg.Add(1) - go func(ref string, wg *sync.WaitGroup, lock *sync.Mutex) { - defer wg.Done() - temp, err := fetchFasta(&Region{Chrom: ref}) - if err != nil { - sugar.Warnf("try to modify %s", ref) - if strings.HasPrefix(ref, "chr") { - temp, err = fetchFasta(&Region{Chrom: strings.ReplaceAll(ref, "chr", "")}) - } else { - temp, err = fetchFasta(&Region{Chrom: "chr" + ref}) - } - } - if err != nil { - sugar.Fatal(err) - } - lock.Lock() - chrRefs[ref] = temp - lock.Unlock() - }(ref, &wg, &lock) - } - - wg.Wait() - - wg.Add(1) - go writer(w, &wg) - refs := make(chan *Region) - - for i := 0; i < conf.Process; i++ { - go worker(&wg, refs, w, omopolymericPositions, spicePositions, targetPositions) - wg.Add(1) - } + w := make(chan string) - for ref, chunks := range references { - sugar.Infof("read reads from %s", ref) - for _, c := range chunks { - sugar.Debug(c) - refs <- c - } + if conf.Mode { + fastMode(&wg, w, omopolymericPositions, spicePositions, targetPositions) + } else { + slowMode(&wg, w, omopolymericPositions, spicePositions, targetPositions) } - close(refs) wg.Wait() timer.Toc() diff --git a/slow.go b/slow.go new file mode 100644 index 0000000..e7449a7 --- /dev/null +++ b/slow.go @@ -0,0 +1,120 @@ +package main + +import ( + "github.com/biogo/hts/sam" + "github.com/golang-collections/collections/set" + "strings" + "sync" +) + +func worker(wg *sync.WaitGroup, refs chan *Region, w chan string, omopolymericPositions, splicePositions, targetPositions map[string]*set.Set) { + defer wg.Done() + + for { + ref, ok := <-refs + if !ok { + break + } + + chrRef, err := fetchFasta(ref) + if err != nil { + sugar.Warnf("try to modify %s", ref.Chrom) + if strings.HasPrefix(ref.Chrom, "chr") { + chrRef, err = fetchFasta(&Region{Chrom: strings.ReplaceAll(ref.Chrom, "chr", "")}) + } else { + chrRef, err = fetchFasta(&Region{Chrom: "chr" + ref.Chrom}) + } + } + if err != nil { + sugar.Fatal(err) + } + + iter, err := fetchBam(ref) + if err != nil { + sugar.Fatal(err) + } + + lastEnd := 0 + total := 0 + edits := make(map[int]*EditsInfo) + for iter.Next() { + record := NewRecord(iter.Record()) + + if lastEnd == 0 { + lastEnd = record.End + } + + total++ + + if !filterReads(record) { + continue + } + + start, index := 0, 0 + for _, i := range record.Cigar { + if i.Type() == sam.CigarMatch { + for j := 1; j <= i.Len(); j++ { + at := index + j - 1 + + if at >= record.Seq.Length { + sugar.Errorf("%s - %d - %d - %d", record.Cigar, index, at, record.Seq.Length) + sugar.Fatal(record.SeqString()) + } + + // record.QueryPosition = append(record.QueryPosition, at) + + genomic := start + record.Start + + if _, ok := edits[genomic]; !ok { + edits[genomic] = NewEditsInfo(ref.Chrom, chrRef[genomic-1], genomic) + } + + edits[genomic].AddReads(record, at) + start++ + } + index += i.Len() + } else if i.Type() != sam.CigarDeletion || i.Type() != sam.CigarHardClipped || i.Type() != sam.CigarInsertion { + start += i.Len() + } + } + + if record.Start > lastEnd { + getColumn(edits, []map[string]*set.Set{omopolymericPositions, splicePositions}, targetPositions, w) + + edits = make(map[int]*EditsInfo) + lastEnd = record.End + } + } + + getColumn(edits, []map[string]*set.Set{omopolymericPositions, splicePositions}, targetPositions, w) + + sugar.Debugf("read %d reads from %s", total, ref) + } + + w <- "done" +} + +func slowMode(wg *sync.WaitGroup, w chan string, + omopolymericPositions, spicePositions, targetPositions map[string]*set.Set) { + + wg.Add(1) + go writer(w, wg) + refs := make(chan *Region) + + for i := 0; i < conf.Process; i++ { + go worker(wg, refs, w, omopolymericPositions, spicePositions, targetPositions) + wg.Add(1) + } + + if region.Empty() { + if references, err := fetchBamRefs(); err == nil { + for _, r := range references { + refs <- &Region{Chrom: r} + } + } + } else { + refs <- region + } + + close(refs) +} diff --git a/utils.go b/utils.go index 70f3b70..72104bb 100644 --- a/utils.go +++ b/utils.go @@ -21,7 +21,6 @@ func withinInterval(i int) bool { return region.Start <= i && i <= region.End } - func prop(tot, va int) float64 { if tot == 0 { return 0.0 @@ -119,7 +118,7 @@ func getColumn(edits map[int]*EditsInfo, positions []map[string]*set.Set, target if !ok { continue } - + if edit.Ref == byte('N') { continue }