-
-
Notifications
You must be signed in to change notification settings - Fork 239
/
body_wrapper.go
292 lines (234 loc) · 6.81 KB
/
body_wrapper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
package httpexpect
import (
"bytes"
"context"
"errors"
"io"
"runtime"
"sync"
)
// Wrapper for request or response body reader.
//
// Allows to read body multiple times using two approaches:
// - use Read to read body contents and Rewind to restart reading from beginning
// - use GetBody to get new reader for body contents
//
// When bodyWrapper is created, it does not read anything. Also, until anything is
// read, rewind operations are no-op.
//
// When the user starts reading body, bodyWrapper automatically copies retrieved
// content in memory. Then, when the body is fully read and Rewind is requested,
// it will close original body and switch to reading body from memory.
//
// If Rewind, GetBody, or Close is invoked before the body is fully read first time,
// bodyWrapper automatically performs full read.
//
// At any moment, the user can call DisableRewinds. In this case, Rewind and GetBody
// functionality is disabled, memory cache is cleared, and bodyWrapper switches to
// reading original body (if it's not fully read yet).
//
// bodyWrapper automatically creates finalizer that will close original body if the
// user never reads it fully or calls Closes.
type bodyWrapper struct {
// Protects all operations.
mu sync.Mutex
// Original reader of HTTP response body.
httpReader io.ReadCloser
// Cancellation function for original HTTP response.
// If set, called after HTTP response is fully read into memory.
httpCancelFunc context.CancelFunc
// Reader for HTTP response body stored in memory.
// Rewind() resets this reader to start from the beginning.
memReader *bytes.Reader
// HTTP response body stored in memory.
memBytes []byte
// Cached read and close errors.
readErr error
closeErr error
// If true, Read will not store bytes in memory, and memBytes and memReader
// won't be used.
isRewindDisabled bool
// True means that HTTP response was fully read into memory already.
isFullyRead bool
// True means that a read operation of any type was called at least once.
isReadBefore bool
}
func newBodyWrapper(reader io.ReadCloser, cancelFunc context.CancelFunc) *bodyWrapper {
bw := &bodyWrapper{
httpReader: reader,
httpCancelFunc: cancelFunc,
}
// Finalizer will close body if closeAndCancel was never called.
runtime.SetFinalizer(bw, (*bodyWrapper).Close)
return bw
}
// Read body contents.
func (bw *bodyWrapper) Read(p []byte) (int, error) {
bw.mu.Lock()
defer bw.mu.Unlock()
bw.isReadBefore = true
if bw.isRewindDisabled && !bw.isFullyRead {
// Regular read from original HTTP response.
return bw.httpReader.Read(p)
} else if !bw.isFullyRead {
// Read from original HTTP response + store into memory.
return bw.httpReadNext(p)
} else {
// Read from memory.
return bw.memReadNext(p)
}
}
// Close body.
func (bw *bodyWrapper) Close() error {
bw.mu.Lock()
defer bw.mu.Unlock()
// Preserve original reader error.
err := bw.closeErr
// Rewind or GetBody may be called later, so be sure to
// read body into memory before closing.
if !bw.isRewindDisabled && !bw.isFullyRead {
bw.isReadBefore = true
if readErr := bw.httpReadFull(); readErr != nil {
err = readErr
}
}
// Close original reader.
closeErr := bw.closeAndCancel()
if closeErr != nil {
err = closeErr
}
// Reset memory reader.
bw.memReader = bytes.NewReader(nil)
// Free memory when rewind is disabled.
if bw.isRewindDisabled {
bw.memBytes = nil
}
return err
}
// Rewind reading to the beginning.
func (bw *bodyWrapper) Rewind() {
bw.mu.Lock()
defer bw.mu.Unlock()
// Rewind is no-op if disabled.
if bw.isRewindDisabled {
return
}
// Rewind is no-op until first read operation.
if !bw.isReadBefore {
return
}
// If HTTP response is not fully read yet, do it now.
// If error occurs, it will be reported next read operation.
if !bw.isFullyRead {
_ = bw.httpReadFull()
}
// Reset memory reader.
bw.memReader = bytes.NewReader(bw.memBytes)
}
// Create new reader to retrieve body contents.
// New reader always reads body from the beginning.
// Does not affected by Rewind().
func (bw *bodyWrapper) GetBody() (io.ReadCloser, error) {
bw.mu.Lock()
defer bw.mu.Unlock()
bw.isReadBefore = true
// Preserve original reader error.
if bw.readErr != nil {
return nil, bw.readErr
}
// GetBody() requires rewinds to be enabled.
if bw.isRewindDisabled {
return nil, errors.New("rewinds are disabled, cannot get body")
}
// If HTTP response is not fully read yet, do it now.
if !bw.isFullyRead {
if err := bw.httpReadFull(); err != nil {
return nil, err
}
}
// Return fresh reader for memory chunk.
return io.NopCloser(bytes.NewReader(bw.memBytes)), nil
}
// Disables storing body contents in memory and clears the cache.
func (bw *bodyWrapper) DisableRewinds() {
bw.mu.Lock()
defer bw.mu.Unlock()
// Free memory if reading from original HTTP response, or reading from memory
// and memory reader has nothing left to read.
// Otherwise, i.e. when we're reading from memory, and there is more to read,
// memReadNext() will free memory later when it hits EOF.
if !bw.isFullyRead || bw.memReader.Len() == 0 {
bw.memReader = bytes.NewReader(nil)
bw.memBytes = nil
}
bw.isRewindDisabled = true
}
func (bw *bodyWrapper) memReadNext(p []byte) (int, error) {
n, err := bw.memReader.Read(p)
if err == io.EOF {
// Free memory after we hit EOF when reading from memory,
// if rewinds were disabled while we were reading from it.
if bw.isRewindDisabled {
bw.memReader = bytes.NewReader(nil)
bw.memBytes = nil
}
if bw.readErr != nil {
err = bw.readErr
}
}
return n, err
}
func (bw *bodyWrapper) httpReadNext(p []byte) (int, error) {
n, err := bw.httpReader.Read(p)
if n > 0 {
bw.memBytes = append(bw.memBytes, p[:n]...)
}
if err != nil {
if err != io.EOF {
bw.readErr = err
}
if closeErr := bw.closeAndCancel(); closeErr != nil && err == io.EOF {
err = closeErr
}
// Switch to reading from memory.
bw.isFullyRead = true
bw.memReader = bytes.NewReader(nil)
}
return n, err
}
func (bw *bodyWrapper) httpReadFull() error {
b, err := io.ReadAll(bw.httpReader)
// Switch to reading from memory.
bw.isFullyRead = true
bw.memBytes = append(bw.memBytes, b...)
bw.memReader = bytes.NewReader(bw.memBytes[len(bw.memBytes)-len(b):])
if err != nil {
bw.readErr = err
}
if closeErr := bw.closeAndCancel(); closeErr != nil && err == nil {
err = closeErr
}
return err
}
func (bw *bodyWrapper) closeAndCancel() error {
if bw.httpReader == nil && bw.httpCancelFunc == nil {
return bw.closeErr
}
if bw.httpReader != nil {
err := bw.httpReader.Close()
bw.httpReader = nil
if bw.readErr == nil {
bw.readErr = err
}
if bw.closeErr == nil {
bw.closeErr = err
}
}
if bw.httpCancelFunc != nil {
bw.httpCancelFunc()
bw.httpCancelFunc = nil
}
// Finalizer is not needed anymore.
runtime.SetFinalizer(bw, nil)
return bw.closeErr
}