-
Notifications
You must be signed in to change notification settings - Fork 43
/
Copy pathassemble.go
210 lines (196 loc) · 5.95 KB
/
assemble.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
package desync
import (
"context"
"crypto/sha512"
"fmt"
"os"
"sync"
"github.com/pkg/errors"
)
// AssembleFile re-assembles a file based on a list of index chunks. It runs n
// goroutines, creating one filehandle for the file "name" per goroutine
// and writes to the file simultaneously. If progress is provided, it'll be
// called when a chunk has been processed.
// If the input file exists and is not empty, the algorithm will first
// confirm if the data matches what is expected and only populate areas that
// differ from the expected content. This can be used to complete partly
// written files.
func AssembleFile(ctx context.Context, name string, idx Index, s Store, n int, progress func()) error {
var (
wg sync.WaitGroup
mu sync.Mutex
pErr error
in = make(chan IndexChunk)
nullChunk = NewNullChunk(idx.Index.ChunkSizeMax)
isBlank bool
)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Helper function to record and deal with any errors in the goroutines
recordError := func(err error) {
mu.Lock()
defer mu.Unlock()
if pErr == nil {
pErr = err
}
cancel()
}
// Determine is the target exists and create it if not
info, err := os.Stat(name)
switch {
case os.IsNotExist(err):
f, err := os.Create(name)
if err != nil {
return err
}
f.Close()
isBlank = true
case info.Size() == 0:
isBlank = true
}
// Truncate the output file to the full expected size. Not only does this
// confirm there's enough disk space, but it allows for an optimization
// when dealing with the Null Chunk
if err := os.Truncate(name, idx.Length()); err != nil {
return err
}
// Keep a record of what's already been written to the file and can be
// re-used if there are duplicate chunks
var written fileChunks
// Start the workers, each having its own filehandle to write concurrently
for i := 0; i < n; i++ {
wg.Add(1)
f, err := os.OpenFile(name, os.O_RDWR, 0666)
if err != nil {
return fmt.Errorf("unable to open file %s, %s", name, err)
}
defer f.Close()
go func() {
for c := range in {
if progress != nil {
progress()
}
// See if we can skip the chunk retrieval and decompression if the
// null chunk is being requested. If a new file is truncated to the
// right size beforehand, there's nothing to do since everything
// defaults to 0 bytes.
if isBlank && c.ID == nullChunk.ID {
continue
}
// If we operate on an existing file there's a good chance we already
// have the data written for this chunk. Let's read it from disk and
// compare to what is expected.
if !isBlank {
b := make([]byte, c.Size)
if _, err := f.ReadAt(b, int64(c.Start)); err != nil {
recordError(err)
continue
}
sum := sha512.Sum512_256(b)
if sum == c.ID {
written.add(c)
continue
}
}
// Before pulling a chunk from the store, let's see if that same chunk's
// been written to the file already. If so, we can simply clone it from
// that location.
if cw, ok := written.get(c.ID); ok {
if err := cloneInFile(f, c, cw); err != nil {
recordError(err)
}
continue
}
// Pull the (compressed) chunk from the store
b, err := s.GetChunk(c.ID)
if err != nil {
recordError(err)
continue
}
// Since we know how big the chunk is supposed to be, pre-allocate a
// slice to decompress into
var db []byte
db = make([]byte, c.Size)
// The the chunk is compressed. Decompress it here
db, err = Decompress(db, b)
if err != nil {
recordError(errors.Wrap(err, c.ID.String()))
continue
}
// Verify the checksum of the chunk matches the ID
sum := sha512.Sum512_256(db)
if sum != c.ID {
recordError(fmt.Errorf("unexpected sha512/256 %s for chunk id %s", sum, c.ID))
continue
}
// Might as well verify the chunk size while we're at it
if c.Size != uint64(len(db)) {
recordError(fmt.Errorf("unexpected size for chunk %s", c.ID))
continue
}
// Write the decompressed chunk into the file at the right position
if _, err = f.WriteAt(db, int64(c.Start)); err != nil {
recordError(err)
continue
}
// Make a record of this chunk being available in the file now
written.add(c)
}
wg.Done()
}()
}
// Feed the workers, stop if there are any errors
loop:
for _, c := range idx.Chunks {
// See if we're meant to stop
select {
case <-ctx.Done():
break loop
default:
}
in <- c
}
close(in)
wg.Wait()
return pErr
}
// fileChunks acts as a kind of in-file cache for chunks already written to
// the file being assembled. Every chunk ref that has been successfully written
// into the file is added to it. If another write operation requires the same
// (duplicate) chunk again, it can just copied out of the file to the new
// position, rather than requesting it from a (possibly remote) store again
// and decompressing it.
type fileChunks struct {
mu sync.RWMutex
chunks map[ChunkID]IndexChunk
}
func (f *fileChunks) add(c IndexChunk) {
f.mu.Lock()
defer f.mu.Unlock()
if len(f.chunks) == 0 {
f.chunks = make(map[ChunkID]IndexChunk)
}
f.chunks[c.ID] = c
}
func (f *fileChunks) get(id ChunkID) (IndexChunk, bool) {
f.mu.RLock()
defer f.mu.RUnlock()
c, ok := f.chunks[id]
return c, ok
}
// cloneInFile copies a chunk from one position to another in the same file.
// Used when duplicate chunks are used in a file. TODO: The current implementation
// uses just the one given filehandle, copies into memory, then writes to disk.
// It may be more efficient to open a 2nd filehandle, seek, and copy directly
// with a io.LimitReader.
func cloneInFile(f *os.File, dst, src IndexChunk) error {
if src.ID != dst.ID || src.Size != dst.Size {
return errors.New("internal error: different chunks requested for in-file copy")
}
b := make([]byte, int64(src.Size))
if _, err := f.ReadAt(b, int64(src.Start)); err != nil {
return err
}
_, err := f.WriteAt(b, int64(dst.Start))
return err
}