From fb143c1bc54e342dc3c59589057a0be14946473f Mon Sep 17 00:00:00 2001 From: folbricht Date: Sun, 7 Oct 2018 19:14:32 -0600 Subject: [PATCH] Refactor code to use errgroup (#65) * Simplify chop command using errorgroup * More refactoring with errgroup * Remove commented code --- assemble.go | 53 ++++++++++++---------------------------- chop.go | 65 ++++++++++++------------------------------------- copy.go | 59 +++++++++++++++----------------------------- index.go | 54 ++++++++++++----------------------------- untar.go | 66 ++++++++++++++++++-------------------------------- verifyindex.go | 52 +++++++++++---------------------------- 6 files changed, 105 insertions(+), 244 deletions(-) diff --git a/assemble.go b/assemble.go index 4391406..65534d3 100644 --- a/assemble.go +++ b/assemble.go @@ -5,7 +5,8 @@ import ( "crypto/sha512" "fmt" "os" - "sync" + + "golang.org/x/sync/errgroup" ) // AssembleFile re-assembles a file based on a list of index chunks. It runs n @@ -22,14 +23,10 @@ func AssembleFile(ctx context.Context, name string, idx Index, s Store, seeds [] source SeedSegment } var ( - wg sync.WaitGroup - mu sync.Mutex - pErr error in = make(chan Job) isBlank bool ) - ctx, cancel := context.WithCancel(ctx) - defer cancel() + g, ctx := errgroup.WithContext(ctx) // Setup and start the progressbar if any if pb != nil { @@ -44,16 +41,6 @@ func AssembleFile(ctx context.Context, name string, idx Index, s Store, seeds [] ChunksTotal: len(idx.Chunks), } - // Helper function to record and deal with any errors in the goroutines - recordError := func(err error) { - mu.Lock() - defer mu.Unlock() - if pErr == nil { - pErr = err - } - cancel() - } - // Determine is the target exists and create it if not info, err := os.Stat(name) switch { @@ -100,13 +87,12 @@ func AssembleFile(ctx context.Context, name string, idx Index, s Store, seeds [] // Start the workers, each having its own filehandle to write concurrently for i := 0; i < n; i++ { - wg.Add(1) f, err := os.OpenFile(name, os.O_RDWR, 0666) if err != nil { return stats, fmt.Errorf("unable to open file %s, %s", name, err) } defer f.Close() - go func() { + g.Go(func() error { for job := range in { if pb != nil { pb.Add(job.segment.lengthChunks()) @@ -117,8 +103,7 @@ func AssembleFile(ctx context.Context, name string, idx Index, s Store, seeds [] length := job.segment.lengthBytes() copied, cloned, err := job.source.WriteInto(f, offset, length, blocksize, isBlank) if err != nil { - recordError(err) - continue + return err } stats.addBytesCopied(copied) stats.addBytesCloned(cloned) @@ -134,8 +119,7 @@ func AssembleFile(ctx context.Context, name string, idx Index, s Store, seeds [] if !isBlank { b := make([]byte, c.Size) if _, err := f.ReadAt(b, int64(c.Start)); err != nil { - recordError(err) - continue + return err } sum := sha512.Sum512_256(b) if sum == c.ID { @@ -151,29 +135,25 @@ func AssembleFile(ctx context.Context, name string, idx Index, s Store, seeds [] // Pull the (compressed) chunk from the store chunk, err := s.GetChunk(c.ID) if err != nil { - recordError(err) - continue + return err } b, err := chunk.Uncompressed() if err != nil { - recordError(err) - continue + return err } // Might as well verify the chunk size while we're at it if c.Size != uint64(len(b)) { - recordError(fmt.Errorf("unexpected size for chunk %s", c.ID)) - continue + return fmt.Errorf("unexpected size for chunk %s", c.ID) } // Write the decompressed chunk into the file at the right position if _, err = f.WriteAt(b, int64(c.Start)); err != nil { - recordError(err) - continue + return err } // Record this chunk's been written in the self-seed ss.add(job.segment) } - wg.Done() - }() + return nil + }) } // Let the sequencer break up the index into segments, feed the workers, and @@ -181,20 +161,17 @@ func AssembleFile(ctx context.Context, name string, idx Index, s Store, seeds [] seq := NewSeedSequencer(idx, seeds...) loop: for { - // See if we're meant to stop + chunks, from, done := seq.Next() select { case <-ctx.Done(): break loop - default: + case in <- Job{chunks, from}: } - chunks, from, done := seq.Next() - in <- Job{chunks, from} if done { break } } close(in) - wg.Wait() - return stats, pErr + return stats, g.Wait() } diff --git a/chop.go b/chop.go index f36efd3..238a1ef 100644 --- a/chop.go +++ b/chop.go @@ -5,21 +5,15 @@ import ( "fmt" "io" "os" - "sync" + + "golang.org/x/sync/errgroup" ) // ChopFile split a file according to a list of chunks obtained from an Index // and stores them in the provided store func ChopFile(ctx context.Context, name string, chunks []IndexChunk, ws WriteStore, n int, pb ProgressBar) error { - var ( - wg sync.WaitGroup - mu sync.Mutex - pErr error - in = make(chan IndexChunk) - ) - - ctx, cancel := context.WithCancel(ctx) - defer cancel() + in := make(chan IndexChunk) + g, ctx := errgroup.WithContext(ctx) // Setup and start the progressbar if any if pb != nil { @@ -28,91 +22,64 @@ func ChopFile(ctx context.Context, name string, chunks []IndexChunk, ws WriteSto defer pb.Finish() } - // Helper function to record and deal with any errors in the goroutines - recordError := func(err error) { - mu.Lock() - defer mu.Unlock() - if pErr == nil { - pErr = err - } - cancel() - } - s := NewChunkStorage(ws) // Start the workers, each having its own filehandle to read concurrently for i := 0; i < n; i++ { - wg.Add(1) - f, err := os.Open(name) if err != nil { return fmt.Errorf("unable to open file %s, %s", name, err) } defer f.Close() - go func() { - defer wg.Done() + g.Go(func() error { for c := range in { // Update progress bar if any if pb != nil { pb.Add(1) } - var err error - b, err := readChunkFromFile(f, c) + chunk, err := readChunkFromFile(f, c) if err != nil { - recordError(err) - continue - } - - chunk, err := NewChunkWithID(c.ID, b, nil, false) - if err != nil { - recordError(err) - continue + return err } if err := s.StoreChunk(chunk); err != nil { - recordError(err) - continue + return err } } - }() + return nil + }) } // Feed the workers, stop if there are any errors loop: for _, c := range chunks { - // See if we're meant to stop select { case <-ctx.Done(): break loop - default: + case in <- c: } - in <- c } close(in) - wg.Wait() - - return pErr + return g.Wait() } // Helper function to read chunk contents from file -func readChunkFromFile(f *os.File, c IndexChunk) ([]byte, error) { +func readChunkFromFile(f *os.File, c IndexChunk) (*Chunk, error) { var err error b := make([]byte, c.Size) // Position the filehandle to the place where the chunk is meant to come // from within the file if _, err = f.Seek(int64(c.Start), io.SeekStart); err != nil { - return b, err + return nil, err } // Read the whole (uncompressed) chunk into memory - if _, err = io.ReadFull(f, b); err != nil { - return b, err + return nil, err } - - return b, nil + return NewChunkWithID(c.ID, b, nil, false) } diff --git a/copy.go b/copy.go index 1086dcc..07deb70 100644 --- a/copy.go +++ b/copy.go @@ -2,7 +2,8 @@ package desync import ( "context" - "sync" + + "golang.org/x/sync/errgroup" ) // Copy reads a list of chunks from the provided src store, and copies the ones @@ -10,14 +11,8 @@ import ( // store to populate a cache. If progress is provided, it'll be called when a // chunk has been processed. Used to draw a progress bar, can be nil. func Copy(ctx context.Context, ids []ChunkID, src Store, dst WriteStore, n int, pb ProgressBar) error { - var ( - wg sync.WaitGroup - in = make(chan ChunkID) - mu sync.Mutex - pErr error - ) - ctx, cancel := context.WithCancel(ctx) - defer cancel() + in := make(chan ChunkID) + g, ctx := errgroup.WithContext(ctx) // Setup and start the progressbar if any if pb != nil { @@ -26,52 +21,38 @@ func Copy(ctx context.Context, ids []ChunkID, src Store, dst WriteStore, n int, defer pb.Finish() } - // Helper function to record and deal with any errors in the goroutines - recordError := func(err error) { - mu.Lock() - defer mu.Unlock() - if pErr == nil { - pErr = err - } - cancel() - } - // Start the workers for i := 0; i < n; i++ { - wg.Add(1) - go func() { + g.Go(func() error { for id := range in { - if !dst.HasChunk(id) { - chunk, err := src.GetChunk(id) - if err != nil { - recordError(err) - continue - } - if err := dst.StoreChunk(chunk); err != nil { - recordError(err) - } - } if pb != nil { pb.Increment() } + if dst.HasChunk(id) { + continue + } + chunk, err := src.GetChunk(id) + if err != nil { + return err + } + if err := dst.StoreChunk(chunk); err != nil { + return err + } } - wg.Done() - }() + return nil + }) } - // Feed the workers, stop on any errors + // Feed the workers, the context is cancelled if any goroutine encounters an error loop: for _, c := range ids { - // See if we're meant to stop select { case <-ctx.Done(): break loop - default: + case in <- c: } - in <- c } close(in) - wg.Wait() - return pErr + return g.Wait() } diff --git a/index.go b/index.go index 6defddf..e33b96a 100644 --- a/index.go +++ b/index.go @@ -7,6 +7,8 @@ import ( "math" "sync" + "golang.org/x/sync/errgroup" + "github.com/pkg/errors" "io" @@ -130,28 +132,14 @@ func ChunkStream(ctx context.Context, c Chunker, ws WriteStore, n int) (Index, e b []byte } var ( - stop bool - wg sync.WaitGroup mu sync.Mutex - pErr error in = make(chan chunkJob) results = make(map[int]IndexChunk) ) - ctx, cancel := context.WithCancel(ctx) - defer cancel() + g, ctx := errgroup.WithContext(ctx) s := NewChunkStorage(ws) - // Helper function to record and deal with any errors in the goroutines - recordError := func(err error) { - mu.Lock() - defer mu.Unlock() - if pErr == nil { - pErr = err - } - cancel() - } - // All the chunks are processed in parallel, but we need to preserve the // order for later. So add the chunking results to a map, indexed by // the chunk number so we can rebuild it in the right order when done @@ -164,8 +152,7 @@ func ChunkStream(ctx context.Context, c Chunker, ws WriteStore, n int) (Index, e // Start the workers responsible for checksum calculation, compression and // storage (if required). Each job comes with a chunk number for sorting later for i := 0; i < n; i++ { - wg.Add(1) - go func() { + g.Go(func() error { for c := range in { // Create a chunk object, needed to calculate the checksum chunk := NewChunkFromUncompressed(c.b) @@ -175,26 +162,19 @@ func ChunkStream(ctx context.Context, c Chunker, ws WriteStore, n int) (Index, e recordResult(c.num, idxChunk) if err := s.StoreChunk(chunk); err != nil { - recordError(err) - continue + return err } } - wg.Done() - }() + return nil + }) } // Feed the workers, stop if there are any errors. To keep the index list in // order, we calculate the checksum here before handing them over to the // workers for compression and storage. That could probablybe optimized further var num int // chunk #, so we can re-assemble the index in the right order later +loop: for { - // See if we're meant to stop - select { - case <-ctx.Done(): - stop = true - break - default: - } start, b, err := c.Next() if err != nil { return Index{}, err @@ -204,19 +184,17 @@ func ChunkStream(ctx context.Context, c Chunker, ws WriteStore, n int) (Index, e } // Send it off for compression and storage - in <- chunkJob{num: num, start: start, b: b} - + select { + case <-ctx.Done(): + break loop + case in <- chunkJob{num: num, start: start, b: b}: + } num++ } close(in) - wg.Wait() - // Everything has settled, now see if something happend that would invalidate - // the results. Either an error or an interrupt by the user. We don't just - // want to bail out when it happens and abandon any running goroutines that - // might still be writing/processing chunks. Only stop here it's safe like here. - if stop { - return Index{}, pErr + if err := g.Wait(); err != nil { + return Index{}, err } // All the chunks have been processed and are stored in a map. Now build a @@ -236,5 +214,5 @@ func ChunkStream(ctx context.Context, c Chunker, ws WriteStore, n int) (Index, e }, Chunks: chunks, } - return index, pErr + return index, nil } diff --git a/untar.go b/untar.go index b44f78c..98f2e93 100644 --- a/untar.go +++ b/untar.go @@ -10,9 +10,10 @@ import ( "os" "path/filepath" "reflect" - "sync" "syscall" + "golang.org/x/sync/errgroup" + "github.com/pkg/errors" ) @@ -171,25 +172,10 @@ func UnTarIndex(ctx context.Context, dst string, index Index, s Store, n int, op data chan ([]byte) // channel for the (decompressed) chunk } var ( - // stop bool - wg sync.WaitGroup - mu sync.Mutex - pErr error req = make(chan requestJob) assemble = make(chan chan []byte, n) ) - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - // Helper function to record and deal with any errors in the goroutines - recordError := func(err error) { - mu.Lock() - defer mu.Unlock() - if pErr == nil { - pErr = err - } - cancel() - } + g, ctx := errgroup.WithContext(ctx) // Use a pipe as input to untar and write the chunks into that (in the right // order of course) @@ -197,64 +183,60 @@ func UnTarIndex(ctx context.Context, dst string, index Index, s Store, n int, op // Workers - getting chunks from the store for i := 0; i < n; i++ { - wg.Add(1) - go func() { + g.Go(func() error { for r := range req { // Pull the chunk from the store chunk, err := s.GetChunk(r.chunk.ID) if err != nil { - recordError(err) close(r.data) - continue + return err } b, err := chunk.Uncompressed() if err != nil { - recordError(err) close(r.data) - continue + return err } // Might as well verify the chunk size while we're at it if r.chunk.Size != uint64(len(b)) { - recordError(fmt.Errorf("unexpected size for chunk %s", r.chunk.ID)) close(r.data) - continue + return fmt.Errorf("unexpected size for chunk %s", r.chunk.ID) } r.data <- b close(r.data) } - wg.Done() - }() + return nil + }) } - // Feeder - requesting chunks from the workers - go func() { + // Feeder - requesting chunks from the workers and handing a result data channel + // to the assembler + g.Go(func() error { loop: for _, c := range index.Chunks { - // See if we're meant to stop + data := make(chan []byte, 1) select { case <-ctx.Done(): break loop - default: + case req <- requestJob{chunk: c, data: data}: // request the chunk + assemble <- data // and hand over the data channel to the assembler } - data := make(chan []byte, 1) - req <- requestJob{chunk: c, data: data} // request the chunk - assemble <- data // and hand over the data channel to the assembler } - close(req) - wg.Wait() // wait for the workers to stop + close(req) // tell the workers this is it close(assemble) // tell the assembler we're done - }() + return nil + }) - // Assember - push the chunks into the pipe that untar reads from - go func() { + // Assember - Read from data channels push the chunks into the pipe that untar reads from + g.Go(func() error { for data := range assemble { b := <-data if _, err := io.Copy(w, bytes.NewReader(b)); err != nil { - recordError(err) + return err } } w.Close() // No more chunks to come, stop the untar - }() + return nil + }) // Run untar in the main goroutine err := UnTar(ctx, r, dst, opts) @@ -264,7 +246,7 @@ func UnTarIndex(ctx context.Context, dst string, index Index, s Store, n int, op // the untar stage. If pErr is set, this would have triggered an error from // the untar stage as well (since it cancels the context), so pErr takes // precedence here. - if pErr != nil { + if pErr := g.Wait(); pErr != nil { return pErr } return err diff --git a/verifyindex.go b/verifyindex.go index 1a27ab0..5805478 100644 --- a/verifyindex.go +++ b/verifyindex.go @@ -6,20 +6,15 @@ import ( "fmt" "io" "os" - "sync" + + "golang.org/x/sync/errgroup" ) // VerifyIndex re-calculates the checksums of a blob comparing it to a given index. // Fails if the index does not match the blob. func VerifyIndex(ctx context.Context, name string, idx Index, n int, pb ProgressBar) error { - var ( - wg sync.WaitGroup - mu sync.Mutex - pErr error - in = make(chan IndexChunk) - ) - ctx, cancel := context.WithCancel(ctx) - defer cancel() + in := make(chan IndexChunk) + g, ctx := errgroup.WithContext(ctx) // Setup and start the progressbar if any if pb != nil { @@ -36,26 +31,14 @@ func VerifyIndex(ctx context.Context, name string, idx Index, n int, pb Progress return fmt.Errorf("index size (%d) does not match file size (%d)", idx.Length(), stat.Size()) } - // Helper function to record and deal with any errors in the goroutines - recordError := func(err error) { - mu.Lock() - defer mu.Unlock() - if pErr == nil { - pErr = err - } - cancel() - } - // Start the workers, each having its own filehandle to read concurrently for i := 0; i < n; i++ { - wg.Add(1) f, err := os.Open(name) if err != nil { return fmt.Errorf("unable to open file %s, %s", name, err) } defer f.Close() - go func() { - var err error + g.Go(func() error { for c := range in { // Update progress bar if any if pb != nil { @@ -64,44 +47,37 @@ func VerifyIndex(ctx context.Context, name string, idx Index, n int, pb Progress // Position the filehandle to the place where the chunk is meant to come // from within the file - if _, err = f.Seek(int64(c.Start), io.SeekStart); err != nil { - recordError(err) - continue + if _, err := f.Seek(int64(c.Start), io.SeekStart); err != nil { + return err } // Read the whole (uncompressed) chunk into memory b := make([]byte, c.Size) - if _, err = io.ReadFull(f, b); err != nil { - recordError(err) - continue + if _, err := io.ReadFull(f, b); err != nil { + return err } // Calculate this chunks checksum and compare to what it's supposed to be // according to the index sum := sha512.Sum512_256(b) if sum != c.ID { - recordError(fmt.Errorf("checksum does not match chunk %s", c.ID)) - continue + return fmt.Errorf("checksum does not match chunk %s", c.ID) } } - wg.Done() - }() + return nil + }) } // Feed the workers, stop if there are any errors loop: for _, c := range idx.Chunks { - // See if we're meant to stop select { case <-ctx.Done(): break loop - default: + case in <- c: } - in <- c } close(in) - wg.Wait() - - return pErr + return g.Wait() }