Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Speedup sync #12

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions gsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ package gsync

import "sync"

const (
var (
// DefaultBlockSize is the default block size.
DefaultBlockSize = 6 * 1024 // 6kb
BlockSize = int(6 * 1024) // 6kb
)

// Rolling checksum is up to 16 bit length for simplicity and speed.
Expand Down Expand Up @@ -69,7 +69,7 @@ type BlockOperation struct {

var bufferPool = sync.Pool{
New: func() interface{} {
b := make([]byte, DefaultBlockSize)
b := make([]byte, BlockSize)
return &b
},
}
146 changes: 55 additions & 91 deletions gsync_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func LookUpTable(ctx context.Context, bc <-chan BlockSignature) (map[uint32][]Bl
// so this function is expected to be called once the remote blocks map is fully populated.
//
// The caller must make sure the concrete reader instance is not nil or this function will panic.
func Sync(ctx context.Context, r io.ReaderAt, shash hash.Hash, remote map[uint32][]BlockSignature) (<-chan BlockOperation, error) {
func Sync(ctx context.Context, r io.Reader, shash hash.Hash, datahash hash.Hash, remote map[uint32][]BlockSignature) (<-chan BlockOperation, error) {
if r == nil {
return nil, errors.New("gsync: reader required")
}
Expand All @@ -55,13 +55,15 @@ func Sync(ctx context.Context, r io.ReaderAt, shash hash.Hash, remote map[uint32
}

go func() {

var (
r1, r2, rhash, old uint32
offset int64
offset, max, delta int
rolling, match bool
err error
)

delta := make([]byte, 0)
bufferSize := 16 * BlockSize
buffer := make([]byte, bufferSize)

defer func() {
close(o)
Expand All @@ -79,39 +81,56 @@ func Sync(ctx context.Context, r io.ReaderAt, shash hash.Hash, remote map[uint32
break
}

bfp := bufferPool.Get().(*[]byte)
buffer := *bfp
for offset > max-BlockSize {
if err == io.EOF {
// If EOF is reached and not match data found, we add trailing data
// to delta array.
left := max - offset + delta
for left >= BlockSize {
o <- BlockOperation{Data: append([]byte(nil), buffer[max-left:max-left+BlockSize]...)}
left -= BlockSize

n, err := r.ReadAt(buffer, offset)
if err != nil && err != io.EOF {
o <- BlockOperation{
Error: errors.Wrapf(err, "failed reading data block"),
}
if left > 0 {
o <- BlockOperation{Data: append([]byte(nil), buffer[max-left:max]...)}
}
return
}
bufferPool.Put(bfp)

// return since data corruption in the server is possible and a re-sync is required.
return
var n int
left := copy(buffer[:], buffer[offset-delta:max])
n, err = r.Read(buffer[left:])
if err != nil && err != io.EOF {
o <- BlockOperation{
Error: errors.Wrapf(err, "failed reading data block"),
}
// return since data corruption in the server is possible and a re-sync is required.
return
}
offset = delta
max = left + n
if datahash != nil {
datahash.Write(buffer[left:max])
}
}

block := buffer[:n]

// If there are no block signatures from remote server, send all data blocks
if len(remote) == 0 {
if n > 0 {
o <- BlockOperation{Data: block}
offset += int64(n)
}

if err == io.EOF {
bufferPool.Put(bfp)
return
for max-offset >= BlockSize {
o <- BlockOperation{Data: append([]byte(nil), buffer[offset:offset+BlockSize]...)}
offset += BlockSize
}
continue
}

left := BlockSize
if max-offset < BlockSize {
// FIXME: is this called?
left = max - offset
}
block := buffer[offset:offset+left]
if rolling {
new := uint32(block[n-1])
r1, r2, rhash = rollingHash2(uint32(n), r1, r2, old, new)
new := uint32(buffer[offset+left-1])
r1, r2, rhash = rollingHash2(uint32(left), r1, r2, old, new)
} else {
r1, r2, rhash = rollingHash(block)
}
Expand All @@ -129,9 +148,9 @@ func Sync(ctx context.Context, r io.ReaderAt, shash hash.Hash, remote map[uint32
match = true

// We need to send deltas before sending an index token.
if len(delta) > 0 {
send(ctx, bytes.NewReader(delta), o)
delta = make([]byte, 0)
if delta > 0 {
o <- BlockOperation{Data: append([]byte(nil), buffer[offset-delta:offset]...)}
delta = 0
}

// instructs the server to copy block data at offset b.Index
Expand All @@ -142,76 +161,21 @@ func Sync(ctx context.Context, r io.ReaderAt, shash hash.Hash, remote map[uint32
}

if match {
if err == io.EOF {
bufferPool.Put(bfp)
break
}

rolling, match = false, false
old, rhash, r1, r2 = 0, 0, 0, 0
offset += int64(n)
offset += left
} else {
if err == io.EOF {
// If EOF is reached and not match data found, we add trailing data
// to delta array.
delta = append(delta, block...)
if len(delta) > 0 {
send(ctx, bytes.NewReader(delta), o)
}
bufferPool.Put(bfp)
break
}
rolling = true
old = uint32(block[0])
delta = append(delta, block[0])
old = uint32(buffer[offset])
delta++
offset++
if delta >= BlockSize {
o <- BlockOperation{Data: append([]byte(nil), buffer[offset-delta:offset-delta+BlockSize]...)}
delta -= BlockSize
}
}

// Returning this buffer to the pool here gives us 5x more speed
bufferPool.Put(bfp)
}
}()

return o, nil
}

// send sends all deltas over the channel. Any error is reported back using the
// same channel.
func send(ctx context.Context, r io.Reader, o chan<- BlockOperation) {
for {
// Allow for cancellation.
select {
case <-ctx.Done():
o <- BlockOperation{
Error: ctx.Err(),
}
return
default:
// break out of the select block and continue reading
break
}

bfp := bufferPool.Get().(*[]byte)
buffer := *bfp
defer bufferPool.Put(bfp)

n, err := r.Read(buffer)
if err != nil && err != io.EOF {
o <- BlockOperation{
Error: errors.Wrapf(err, "failed reading data block"),
}
return
}

// If we don't guard against 0 bytes reads, an operation with index 0 will be sent
// and the server will duplicate block 0 at the end of the reconstructed file.
if n > 0 {
block := buffer[:n]
o <- BlockOperation{Data: block}
}

if err == io.EOF {
break
}
}
}
8 changes: 5 additions & 3 deletions gsync_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func Signatures(ctx context.Context, r io.Reader, shash hash.Hash) (<-chan Block
}

// Apply reconstructs a file given a set of operations. The caller must close the ops channel or the context when done or there will be a deadlock.
func Apply(ctx context.Context, dst io.Writer, cache io.ReaderAt, ops <-chan BlockOperation) error {
func Apply(ctx context.Context, dst io.Writer, cache io.ReaderAt, datahash hash.Hash, ops <-chan BlockOperation) error {
bfp := bufferPool.Get().(*[]byte)
buffer := *bfp
defer bufferPool.Put(bfp)
Expand Down Expand Up @@ -115,14 +115,16 @@ func Apply(ctx context.Context, dst io.Writer, cache io.ReaderAt, ops <-chan Blo
}

index := int64(o.Index)
n, err := cache.ReadAt(buffer, (index * DefaultBlockSize))
n, err := cache.ReadAt(buffer, (index * int64(BlockSize)))
if err != nil && err != io.EOF {
return errors.Wrapf(err, "failed reading cached block")
}

block = buffer[:n]
}

if datahash != nil {
datahash.Write(block)
}
_, err := dst.Write(block)
if err != nil {
return errors.Wrapf(err, "failed writing block to destination")
Expand Down