Skip to content

Commit 909b1c8

Browse files
committed
🔥 feat: Add StreamResponseBody support for the Client
1 parent 64a7113 commit 909b1c8

File tree

5 files changed

+722
-29
lines changed

5 files changed

+722
-29
lines changed

client/client.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -435,6 +435,20 @@ func (c *Client) DisableDebug() *Client {
435435
return c
436436
}
437437

438+
// StreamResponseBody returns the current StreamResponseBody setting.
439+
func (c *Client) StreamResponseBody() bool {
440+
return c.fasthttp.StreamResponseBody
441+
}
442+
443+
// SetStreamResponseBody enables or disables response body streaming.
444+
// When enabled, the response body can be read as a stream using BodyStream()
445+
// instead of being fully loaded into memory. This is useful for large responses
446+
// or server-sent events.
447+
func (c *Client) SetStreamResponseBody(enable bool) *Client {
448+
c.fasthttp.StreamResponseBody = enable
449+
return c
450+
}
451+
438452
// SetCookieJar sets the cookie jar for the client.
439453
func (c *Client) SetCookieJar(cookieJar *CookieJar) *Client {
440454
c.cookieJar = cookieJar

client/client_test.go

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1733,3 +1733,120 @@ func Benchmark_Client_Request_Parallel(b *testing.B) {
17331733
require.NoError(b, err)
17341734
})
17351735
}
1736+
1737+
func Test_Client_StreamResponseBody(t *testing.T) {
1738+
t.Parallel()
1739+
client := New()
1740+
require.False(t, client.StreamResponseBody())
1741+
client.SetStreamResponseBody(true)
1742+
require.True(t, client.StreamResponseBody())
1743+
client.SetStreamResponseBody(false)
1744+
require.False(t, client.StreamResponseBody())
1745+
}
1746+
1747+
func Test_Client_StreamResponseBody_ServerSentEvents(t *testing.T) {
1748+
t.Parallel()
1749+
1750+
app, addr := startTestServerWithPort(t, func(app *fiber.App) {
1751+
app.Get("/sse", func(c fiber.Ctx) error {
1752+
c.Set("Content-Type", "text/event-stream")
1753+
c.Set("Cache-Control", "no-cache")
1754+
c.Set("Connection", "keep-alive")
1755+
1756+
messages := []string{
1757+
"data: message 1\n\n",
1758+
"data: message 2\n\n",
1759+
"data: message 3\n\n",
1760+
}
1761+
1762+
for _, msg := range messages {
1763+
if _, err := c.WriteString(msg); err != nil {
1764+
return err
1765+
}
1766+
}
1767+
1768+
return nil
1769+
})
1770+
})
1771+
defer func() { require.NoError(t, app.Shutdown()) }()
1772+
1773+
client := New().SetStreamResponseBody(true)
1774+
resp, err := client.Get("http://" + addr + "/sse")
1775+
require.NoError(t, err)
1776+
defer resp.Close()
1777+
1778+
bodyStream := resp.BodyStream()
1779+
require.NotNil(t, bodyStream)
1780+
1781+
buffer := make([]byte, 1024)
1782+
n, err := bodyStream.Read(buffer)
1783+
require.NoError(t, err)
1784+
require.Positive(t, n)
1785+
1786+
content := string(buffer[:n])
1787+
require.Contains(t, content, "data: message 1")
1788+
}
1789+
1790+
func Test_Client_StreamResponseBody_LargeResponse(t *testing.T) {
1791+
t.Parallel()
1792+
1793+
largeData := make([]byte, 1024*1024)
1794+
for i := range largeData {
1795+
largeData[i] = byte(i % 256)
1796+
}
1797+
1798+
app, addr := startTestServerWithPort(t, func(app *fiber.App) {
1799+
app.Get("/large", func(c fiber.Ctx) error {
1800+
return c.Send(largeData)
1801+
})
1802+
})
1803+
defer func() { require.NoError(t, app.Shutdown()) }()
1804+
client := New().SetStreamResponseBody(true)
1805+
resp, err := client.Get("http://" + addr + "/large")
1806+
require.NoError(t, err)
1807+
defer resp.Close()
1808+
bodyStream := resp.BodyStream()
1809+
require.NotNil(t, bodyStream)
1810+
streamedData, err := io.ReadAll(bodyStream)
1811+
require.NoError(t, err)
1812+
require.Equal(t, largeData, streamedData)
1813+
client2 := New()
1814+
resp2, err := client2.Get("http://" + addr + "/large")
1815+
require.NoError(t, err)
1816+
defer resp2.Close()
1817+
body := resp2.Body()
1818+
require.Equal(t, largeData, body)
1819+
}
1820+
1821+
func Test_Client_StreamResponseBody_Disabled_Default(t *testing.T) {
1822+
t.Parallel()
1823+
1824+
app, addr := startTestServerWithPort(t, func(app *fiber.App) {
1825+
app.Get("/test", func(c fiber.Ctx) error {
1826+
return c.SendString("Hello, World!")
1827+
})
1828+
})
1829+
defer func() { require.NoError(t, app.Shutdown()) }()
1830+
1831+
client := New()
1832+
resp, err := client.Get("http://" + addr + "/test")
1833+
require.NoError(t, err)
1834+
defer resp.Close()
1835+
1836+
body := resp.Body()
1837+
require.Equal(t, "Hello, World!", string(body))
1838+
1839+
bodyStream := resp.BodyStream()
1840+
require.NotNil(t, bodyStream)
1841+
}
1842+
1843+
func Test_Client_StreamResponseBody_ChainableMethods(t *testing.T) {
1844+
t.Parallel()
1845+
1846+
client := New().
1847+
SetStreamResponseBody(true).
1848+
SetTimeout(time.Second * 5).
1849+
SetStreamResponseBody(false)
1850+
1851+
require.False(t, client.StreamResponseBody())
1852+
}

client/request.go

Lines changed: 16 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -44,30 +44,24 @@ var ErrClientNil = errors.New("client cannot be nil")
4444

4545
// Request contains all data related to an HTTP request.
4646
type Request struct {
47-
ctx context.Context //nolint:containedctx // Context is needed to be stored in the request.
48-
49-
body any
50-
header *Header
51-
params *QueryParam
52-
cookies *Cookie
53-
path *PathParam
54-
55-
client *Client
56-
57-
formData *FormData
58-
59-
RawRequest *fasthttp.Request
60-
url string
61-
method string
62-
userAgent string
63-
boundary string
64-
referer string
65-
files []*File
66-
47+
body any
48+
ctx context.Context //nolint:containedctx // Context is needed to be stored in the request.
49+
client *Client
50+
RawRequest *fasthttp.Request
51+
formData *FormData
52+
path *PathParam
53+
cookies *Cookie
54+
header *Header
55+
params *QueryParam
56+
referer string
57+
boundary string
58+
userAgent string
59+
method string
60+
url string
61+
files []*File
6762
timeout time.Duration
6863
maxRedirects int
69-
70-
bodyType bodyType
64+
bodyType bodyType
7165
}
7266

7367
// Method returns the HTTP method set in the Request.

client/response.go

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,22 @@ func (r *Response) Body() []byte {
8989
return r.RawResponse.Body()
9090
}
9191

92+
// BodyStream returns the response body as a stream reader.
93+
// Note: When using BodyStream(), the response body is not copied to memory,
94+
// so calling Body() afterwards may return an empty slice.
95+
func (r *Response) BodyStream() io.Reader {
96+
if stream := r.RawResponse.BodyStream(); stream != nil {
97+
return stream
98+
}
99+
// If streaming is not enabled, return a bytes.Reader from the regular body
100+
return bytes.NewReader(r.RawResponse.Body())
101+
}
102+
103+
// IsStreaming returns true if the response body is being streamed.
104+
func (r *Response) IsStreaming() bool {
105+
return r.RawResponse.BodyStream() != nil
106+
}
107+
92108
// String returns the response body as a trimmed string.
93109
func (r *Response) String() string {
94110
return utils.Trim(string(r.Body()), ' ')
@@ -143,14 +159,17 @@ func (r *Response) Save(v any) error {
143159
return nil
144160

145161
case io.Writer:
146-
if _, err := io.Copy(p, bytes.NewReader(r.Body())); err != nil {
147-
return fmt.Errorf("failed to write response body to io.Writer: %w", err)
162+
var err error
163+
if r.IsStreaming() {
164+
_, err = io.CopyBuffer(p, r.BodyStream(), nil)
165+
} else {
166+
_, err = io.Copy(p, bytes.NewReader(r.Body()))
148167
}
149-
defer func() {
150-
if pc, ok := p.(io.WriteCloser); ok {
151-
_ = pc.Close() //nolint:errcheck // not needed
152-
}
153-
}()
168+
169+
if err != nil {
170+
return fmt.Errorf("failed to write response body to writer: %w", err)
171+
}
172+
154173
return nil
155174

156175
default:

0 commit comments

Comments
 (0)