diff --git a/gsync.go b/gsync.go index 6725754..2c04d04 100644 --- a/gsync.go +++ b/gsync.go @@ -7,9 +7,9 @@ package gsync import "sync" -const ( +var ( // DefaultBlockSize is the default block size. - DefaultBlockSize = 6 * 1024 // 6kb + BlockSize = int(6 * 1024) // 6kb ) // Rolling checksum is up to 16 bit length for simplicity and speed. @@ -69,7 +69,7 @@ type BlockOperation struct { var bufferPool = sync.Pool{ New: func() interface{} { - b := make([]byte, DefaultBlockSize) + b := make([]byte, BlockSize) return &b }, } diff --git a/gsync_client.go b/gsync_client.go index d47295a..ee5188e 100644 --- a/gsync_client.go +++ b/gsync_client.go @@ -43,7 +43,7 @@ func LookUpTable(ctx context.Context, bc <-chan BlockSignature) (map[uint32][]Bl // so this function is expected to be called once the remote blocks map is fully populated. // // The caller must make sure the concrete reader instance is not nil or this function will panic. -func Sync(ctx context.Context, r io.ReaderAt, shash hash.Hash, remote map[uint32][]BlockSignature) (<-chan BlockOperation, error) { +func Sync(ctx context.Context, r io.Reader, shash hash.Hash, datahash hash.Hash, remote map[uint32][]BlockSignature) (<-chan BlockOperation, error) { if r == nil { return nil, errors.New("gsync: reader required") } @@ -55,13 +55,15 @@ func Sync(ctx context.Context, r io.ReaderAt, shash hash.Hash, remote map[uint32 } go func() { + var ( r1, r2, rhash, old uint32 - offset int64 + offset, max, delta int rolling, match bool + err error ) - - delta := make([]byte, 0) + bufferSize := 16 * BlockSize + buffer := make([]byte, bufferSize) defer func() { close(o) @@ -79,39 +81,56 @@ func Sync(ctx context.Context, r io.ReaderAt, shash hash.Hash, remote map[uint32 break } - bfp := bufferPool.Get().(*[]byte) - buffer := *bfp + for offset > max-BlockSize { + if err == io.EOF { + // If EOF is reached and not match data found, we add trailing data + // to delta array. + left := max - offset + delta + for left >= BlockSize { + o <- BlockOperation{Data: append([]byte(nil), buffer[max-left:max-left+BlockSize]...)} + left -= BlockSize - n, err := r.ReadAt(buffer, offset) - if err != nil && err != io.EOF { - o <- BlockOperation{ - Error: errors.Wrapf(err, "failed reading data block"), + } + if left > 0 { + o <- BlockOperation{Data: append([]byte(nil), buffer[max-left:max]...)} + } + return } - bufferPool.Put(bfp) - // return since data corruption in the server is possible and a re-sync is required. - return + var n int + left := copy(buffer[:], buffer[offset-delta:max]) + n, err = r.Read(buffer[left:]) + if err != nil && err != io.EOF { + o <- BlockOperation{ + Error: errors.Wrapf(err, "failed reading data block"), + } + // return since data corruption in the server is possible and a re-sync is required. + return + } + offset = delta + max = left + n + if datahash != nil { + datahash.Write(buffer[left:max]) + } } - - block := buffer[:n] - // If there are no block signatures from remote server, send all data blocks if len(remote) == 0 { - if n > 0 { - o <- BlockOperation{Data: block} - offset += int64(n) - } - - if err == io.EOF { - bufferPool.Put(bfp) - return + for max-offset >= BlockSize { + o <- BlockOperation{Data: append([]byte(nil), buffer[offset:offset+BlockSize]...)} + offset += BlockSize } continue } + left := BlockSize + if max-offset < BlockSize { + // FIXME: is this called? + left = max - offset + } + block := buffer[offset:offset+left] if rolling { - new := uint32(block[n-1]) - r1, r2, rhash = rollingHash2(uint32(n), r1, r2, old, new) + new := uint32(buffer[offset+left-1]) + r1, r2, rhash = rollingHash2(uint32(left), r1, r2, old, new) } else { r1, r2, rhash = rollingHash(block) } @@ -129,9 +148,9 @@ func Sync(ctx context.Context, r io.ReaderAt, shash hash.Hash, remote map[uint32 match = true // We need to send deltas before sending an index token. - if len(delta) > 0 { - send(ctx, bytes.NewReader(delta), o) - delta = make([]byte, 0) + if delta > 0 { + o <- BlockOperation{Data: append([]byte(nil), buffer[offset-delta:offset]...)} + delta = 0 } // instructs the server to copy block data at offset b.Index @@ -142,76 +161,21 @@ func Sync(ctx context.Context, r io.ReaderAt, shash hash.Hash, remote map[uint32 } if match { - if err == io.EOF { - bufferPool.Put(bfp) - break - } - rolling, match = false, false old, rhash, r1, r2 = 0, 0, 0, 0 - offset += int64(n) + offset += left } else { - if err == io.EOF { - // If EOF is reached and not match data found, we add trailing data - // to delta array. - delta = append(delta, block...) - if len(delta) > 0 { - send(ctx, bytes.NewReader(delta), o) - } - bufferPool.Put(bfp) - break - } rolling = true - old = uint32(block[0]) - delta = append(delta, block[0]) + old = uint32(buffer[offset]) + delta++ offset++ + if delta >= BlockSize { + o <- BlockOperation{Data: append([]byte(nil), buffer[offset-delta:offset-delta+BlockSize]...)} + delta -= BlockSize + } } - - // Returning this buffer to the pool here gives us 5x more speed - bufferPool.Put(bfp) } }() return o, nil } - -// send sends all deltas over the channel. Any error is reported back using the -// same channel. -func send(ctx context.Context, r io.Reader, o chan<- BlockOperation) { - for { - // Allow for cancellation. - select { - case <-ctx.Done(): - o <- BlockOperation{ - Error: ctx.Err(), - } - return - default: - // break out of the select block and continue reading - break - } - - bfp := bufferPool.Get().(*[]byte) - buffer := *bfp - defer bufferPool.Put(bfp) - - n, err := r.Read(buffer) - if err != nil && err != io.EOF { - o <- BlockOperation{ - Error: errors.Wrapf(err, "failed reading data block"), - } - return - } - - // If we don't guard against 0 bytes reads, an operation with index 0 will be sent - // and the server will duplicate block 0 at the end of the reconstructed file. - if n > 0 { - block := buffer[:n] - o <- BlockOperation{Data: block} - } - - if err == io.EOF { - break - } - } -} diff --git a/gsync_server.go b/gsync_server.go index 3d4d3f9..107fdce 100644 --- a/gsync_server.go +++ b/gsync_server.go @@ -86,7 +86,7 @@ func Signatures(ctx context.Context, r io.Reader, shash hash.Hash) (<-chan Block } // Apply reconstructs a file given a set of operations. The caller must close the ops channel or the context when done or there will be a deadlock. -func Apply(ctx context.Context, dst io.Writer, cache io.ReaderAt, ops <-chan BlockOperation) error { +func Apply(ctx context.Context, dst io.Writer, cache io.ReaderAt, datahash hash.Hash, ops <-chan BlockOperation) error { bfp := bufferPool.Get().(*[]byte) buffer := *bfp defer bufferPool.Put(bfp) @@ -115,14 +115,16 @@ func Apply(ctx context.Context, dst io.Writer, cache io.ReaderAt, ops <-chan Blo } index := int64(o.Index) - n, err := cache.ReadAt(buffer, (index * DefaultBlockSize)) + n, err := cache.ReadAt(buffer, (index * int64(BlockSize))) if err != nil && err != io.EOF { return errors.Wrapf(err, "failed reading cached block") } block = buffer[:n] } - + if datahash != nil { + datahash.Write(block) + } _, err := dst.Write(block) if err != nil { return errors.Wrapf(err, "failed writing block to destination")