Skip to content

Commit

Permalink
Refactor code to use errgroup (#65)
Browse files Browse the repository at this point in the history
* Simplify chop command using errorgroup

* More refactoring with errgroup

* Remove commented code
  • Loading branch information
folbricht authored Oct 8, 2018
1 parent 96ab403 commit fb143c1
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 244 deletions.
53 changes: 15 additions & 38 deletions assemble.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import (
"crypto/sha512"
"fmt"
"os"
"sync"

"golang.org/x/sync/errgroup"
)

// AssembleFile re-assembles a file based on a list of index chunks. It runs n
Expand All @@ -22,14 +23,10 @@ func AssembleFile(ctx context.Context, name string, idx Index, s Store, seeds []
source SeedSegment
}
var (
wg sync.WaitGroup
mu sync.Mutex
pErr error
in = make(chan Job)
isBlank bool
)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
g, ctx := errgroup.WithContext(ctx)

// Setup and start the progressbar if any
if pb != nil {
Expand All @@ -44,16 +41,6 @@ func AssembleFile(ctx context.Context, name string, idx Index, s Store, seeds []
ChunksTotal: len(idx.Chunks),
}

// Helper function to record and deal with any errors in the goroutines
recordError := func(err error) {
mu.Lock()
defer mu.Unlock()
if pErr == nil {
pErr = err
}
cancel()
}

// Determine is the target exists and create it if not
info, err := os.Stat(name)
switch {
Expand Down Expand Up @@ -100,13 +87,12 @@ func AssembleFile(ctx context.Context, name string, idx Index, s Store, seeds []

// Start the workers, each having its own filehandle to write concurrently
for i := 0; i < n; i++ {
wg.Add(1)
f, err := os.OpenFile(name, os.O_RDWR, 0666)
if err != nil {
return stats, fmt.Errorf("unable to open file %s, %s", name, err)
}
defer f.Close()
go func() {
g.Go(func() error {
for job := range in {
if pb != nil {
pb.Add(job.segment.lengthChunks())
Expand All @@ -117,8 +103,7 @@ func AssembleFile(ctx context.Context, name string, idx Index, s Store, seeds []
length := job.segment.lengthBytes()
copied, cloned, err := job.source.WriteInto(f, offset, length, blocksize, isBlank)
if err != nil {
recordError(err)
continue
return err
}
stats.addBytesCopied(copied)
stats.addBytesCloned(cloned)
Expand All @@ -134,8 +119,7 @@ func AssembleFile(ctx context.Context, name string, idx Index, s Store, seeds []
if !isBlank {
b := make([]byte, c.Size)
if _, err := f.ReadAt(b, int64(c.Start)); err != nil {
recordError(err)
continue
return err
}
sum := sha512.Sum512_256(b)
if sum == c.ID {
Expand All @@ -151,50 +135,43 @@ func AssembleFile(ctx context.Context, name string, idx Index, s Store, seeds []
// Pull the (compressed) chunk from the store
chunk, err := s.GetChunk(c.ID)
if err != nil {
recordError(err)
continue
return err
}
b, err := chunk.Uncompressed()
if err != nil {
recordError(err)
continue
return err
}
// Might as well verify the chunk size while we're at it
if c.Size != uint64(len(b)) {
recordError(fmt.Errorf("unexpected size for chunk %s", c.ID))
continue
return fmt.Errorf("unexpected size for chunk %s", c.ID)
}
// Write the decompressed chunk into the file at the right position
if _, err = f.WriteAt(b, int64(c.Start)); err != nil {
recordError(err)
continue
return err
}
// Record this chunk's been written in the self-seed
ss.add(job.segment)
}
wg.Done()
}()
return nil
})
}

// Let the sequencer break up the index into segments, feed the workers, and
// stop if there are any errors
seq := NewSeedSequencer(idx, seeds...)
loop:
for {
// See if we're meant to stop
chunks, from, done := seq.Next()
select {
case <-ctx.Done():
break loop
default:
case in <- Job{chunks, from}:
}
chunks, from, done := seq.Next()
in <- Job{chunks, from}
if done {
break
}
}
close(in)

wg.Wait()
return stats, pErr
return stats, g.Wait()
}
65 changes: 16 additions & 49 deletions chop.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,15 @@ import (
"fmt"
"io"
"os"
"sync"

"golang.org/x/sync/errgroup"
)

// ChopFile split a file according to a list of chunks obtained from an Index
// and stores them in the provided store
func ChopFile(ctx context.Context, name string, chunks []IndexChunk, ws WriteStore, n int, pb ProgressBar) error {
var (
wg sync.WaitGroup
mu sync.Mutex
pErr error
in = make(chan IndexChunk)
)

ctx, cancel := context.WithCancel(ctx)
defer cancel()
in := make(chan IndexChunk)
g, ctx := errgroup.WithContext(ctx)

// Setup and start the progressbar if any
if pb != nil {
Expand All @@ -28,91 +22,64 @@ func ChopFile(ctx context.Context, name string, chunks []IndexChunk, ws WriteSto
defer pb.Finish()
}

// Helper function to record and deal with any errors in the goroutines
recordError := func(err error) {
mu.Lock()
defer mu.Unlock()
if pErr == nil {
pErr = err
}
cancel()
}

s := NewChunkStorage(ws)

// Start the workers, each having its own filehandle to read concurrently
for i := 0; i < n; i++ {
wg.Add(1)

f, err := os.Open(name)
if err != nil {
return fmt.Errorf("unable to open file %s, %s", name, err)
}
defer f.Close()

go func() {
defer wg.Done()
g.Go(func() error {
for c := range in {
// Update progress bar if any
if pb != nil {
pb.Add(1)
}

var err error
b, err := readChunkFromFile(f, c)
chunk, err := readChunkFromFile(f, c)
if err != nil {
recordError(err)
continue
}

chunk, err := NewChunkWithID(c.ID, b, nil, false)
if err != nil {
recordError(err)
continue
return err
}

if err := s.StoreChunk(chunk); err != nil {
recordError(err)
continue
return err
}
}
}()
return nil
})
}

// Feed the workers, stop if there are any errors
loop:
for _, c := range chunks {
// See if we're meant to stop
select {
case <-ctx.Done():
break loop
default:
case in <- c:
}
in <- c
}

close(in)

wg.Wait()

return pErr
return g.Wait()
}

// Helper function to read chunk contents from file
func readChunkFromFile(f *os.File, c IndexChunk) ([]byte, error) {
func readChunkFromFile(f *os.File, c IndexChunk) (*Chunk, error) {
var err error
b := make([]byte, c.Size)

// Position the filehandle to the place where the chunk is meant to come
// from within the file
if _, err = f.Seek(int64(c.Start), io.SeekStart); err != nil {
return b, err
return nil, err
}
// Read the whole (uncompressed) chunk into memory

if _, err = io.ReadFull(f, b); err != nil {
return b, err
return nil, err
}

return b, nil
return NewChunkWithID(c.ID, b, nil, false)
}
59 changes: 20 additions & 39 deletions copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,17 @@ package desync

import (
"context"
"sync"

"golang.org/x/sync/errgroup"
)

// Copy reads a list of chunks from the provided src store, and copies the ones
// not already present in the dst store. The goal is to load chunks from remote
// store to populate a cache. If progress is provided, it'll be called when a
// chunk has been processed. Used to draw a progress bar, can be nil.
func Copy(ctx context.Context, ids []ChunkID, src Store, dst WriteStore, n int, pb ProgressBar) error {
var (
wg sync.WaitGroup
in = make(chan ChunkID)
mu sync.Mutex
pErr error
)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
in := make(chan ChunkID)
g, ctx := errgroup.WithContext(ctx)

// Setup and start the progressbar if any
if pb != nil {
Expand All @@ -26,52 +21,38 @@ func Copy(ctx context.Context, ids []ChunkID, src Store, dst WriteStore, n int,
defer pb.Finish()
}

// Helper function to record and deal with any errors in the goroutines
recordError := func(err error) {
mu.Lock()
defer mu.Unlock()
if pErr == nil {
pErr = err
}
cancel()
}

// Start the workers
for i := 0; i < n; i++ {
wg.Add(1)
go func() {
g.Go(func() error {
for id := range in {
if !dst.HasChunk(id) {
chunk, err := src.GetChunk(id)
if err != nil {
recordError(err)
continue
}
if err := dst.StoreChunk(chunk); err != nil {
recordError(err)
}
}
if pb != nil {
pb.Increment()
}
if dst.HasChunk(id) {
continue
}
chunk, err := src.GetChunk(id)
if err != nil {
return err
}
if err := dst.StoreChunk(chunk); err != nil {
return err
}
}
wg.Done()
}()
return nil
})
}

// Feed the workers, stop on any errors
// Feed the workers, the context is cancelled if any goroutine encounters an error
loop:
for _, c := range ids {
// See if we're meant to stop
select {
case <-ctx.Done():
break loop
default:
case in <- c:
}
in <- c
}
close(in)
wg.Wait()

return pErr
return g.Wait()
}
Loading

0 comments on commit fb143c1

Please sign in to comment.