-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprocessor.go
145 lines (124 loc) · 2.67 KB
/
processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package main
import (
"crypto/sha512"
"encoding/hex"
"fmt"
"hash"
"io"
"sync"
"sync/atomic"
)
type Processor struct {
ChunkerType ChunkerType
Compressor Compressor
chunks sync.Map
duplicateChunks int64
}
// Returns a writer that should be feed content of a file.
func (p *Processor) NewWriter() io.WriteCloser {
w := &chunkedWriter{
p: p,
}
w.resetChunker()
return w
}
func (p *Processor) GetSize() int64 {
var uniqueSize int64
p.chunks.Range(func(hash, size interface{}) bool {
uniqueSize += size.(int64)
return true
})
return uniqueSize
}
func (p *Processor) GetExtraStats() string {
var chunks int
p.chunks.Range(func(hash, size interface{}) bool {
chunks++
return true
})
return fmt.Sprintf("chunks: %d duplicate: %d", chunks, p.duplicateChunks)
}
type chunkedWriter struct {
p *Processor
c Chunker
h *hashingWriter
}
func (w *chunkedWriter) Write(p []byte) (n int, err error) {
orgLen := len(p)
for {
splitBytes := w.c.NextSplit(p)
if splitBytes == 0 {
panic(w.c)
}
// May happen at the very beggining, or right after a split.
if w.h == nil {
w.resetWriter()
}
if splitBytes == -1 {
// There is no split in current data, wait for next write.
_, err = w.h.Write(p)
if err != nil {
panic(err)
}
return orgLen, nil
} else {
// There is a split, create chunk and process the rest of bytes.
_, err = w.h.Write(p[:splitBytes])
if err != nil {
panic(err)
}
w.chunk()
w.resetChunker()
w.h = nil
p = p[splitBytes:]
if len(p) == 0 {
return orgLen, nil
}
}
}
}
func (w *chunkedWriter) Close() error {
if w.h != nil {
w.chunk()
}
return nil
}
func (w *chunkedWriter) resetWriter() {
w.h = NewHashingWriter(w.p.Compressor)
}
func (w *chunkedWriter) resetChunker() {
w.c = NewChunker(w.p.ChunkerType)
}
func (w *chunkedWriter) chunk() {
h, size := w.h.GetHashAndSize()
_, loaded := w.p.chunks.LoadOrStore(h, size)
if loaded {
atomic.AddInt64(&(w.p.duplicateChunks), 1)
}
}
// Writer that computes hash of raw data, and size of written data after compression data.
type hashingWriter struct {
out io.WriteCloser
hash hash.Hash
bytesWritten int64
}
func NewHashingWriter(compressor Compressor) *hashingWriter {
h := hashingWriter{}
h.out = compressor.NewWriter(&CountingWriter{&h.bytesWritten})
h.hash = sha512.New512_256()
return &h
}
func (w *hashingWriter) Write(p []byte) (int, error) {
_, err := w.hash.Write(p)
if err != nil {
panic(err)
}
return w.out.Write(p)
}
func (w *hashingWriter) GetHashAndSize() (string, int64) {
err := w.out.Close()
if err != nil {
panic(err)
}
return hex.EncodeToString(w.hash.Sum(nil)), w.bytesWritten
}