Skip to content

Commit a725b97

Browse files
committed
Add stream response body support
1 parent 64a7113 commit a725b97

File tree

6 files changed

+277
-4
lines changed

6 files changed

+277
-4
lines changed

client/client.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,10 @@ type Client struct {
5555
userResponseHooks []ResponseHook
5656
builtinResponseHooks []ResponseHook
5757

58-
timeout time.Duration
59-
mu sync.RWMutex
60-
debug bool
58+
timeout time.Duration
59+
mu sync.RWMutex
60+
debug bool
61+
streamResponseBody bool
6162
}
6263

6364
// R creates a new Request associated with the client.
@@ -435,6 +436,20 @@ func (c *Client) DisableDebug() *Client {
435436
return c
436437
}
437438

439+
// StreamResponseBody returns the current StreamResponseBody setting.
440+
func (c *Client) StreamResponseBody() bool {
441+
return c.streamResponseBody
442+
}
443+
444+
// SetStreamResponseBody enables or disables response body streaming.
445+
// When enabled, the response body can be read as a stream using BodyStream()
446+
// instead of being fully loaded into memory. This is useful for large responses
447+
// or server-sent events.
448+
func (c *Client) SetStreamResponseBody(enable bool) *Client {
449+
c.streamResponseBody = enable
450+
return c
451+
}
452+
438453
// SetCookieJar sets the cookie jar for the client.
439454
func (c *Client) SetCookieJar(cookieJar *CookieJar) *Client {
440455
c.cookieJar = cookieJar

client/client_test.go

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1733,3 +1733,151 @@ func Benchmark_Client_Request_Parallel(b *testing.B) {
17331733
require.NoError(b, err)
17341734
})
17351735
}
1736+
1737+
func Test_Client_StreamResponseBody(t *testing.T) {
1738+
t.Parallel()
1739+
client := New()
1740+
require.False(t, client.StreamResponseBody())
1741+
client.SetStreamResponseBody(true)
1742+
require.True(t, client.StreamResponseBody())
1743+
client.SetStreamResponseBody(false)
1744+
require.False(t, client.StreamResponseBody())
1745+
}
1746+
1747+
func Test_Client_StreamResponseBody_ServerSentEvents(t *testing.T) {
1748+
t.Parallel()
1749+
1750+
app, addr := startTestServerWithPort(t, func(app *fiber.App) {
1751+
app.Get("/sse", func(c fiber.Ctx) error {
1752+
c.Set("Content-Type", "text/event-stream")
1753+
c.Set("Cache-Control", "no-cache")
1754+
c.Set("Connection", "keep-alive")
1755+
1756+
messages := []string{
1757+
"data: message 1\n\n",
1758+
"data: message 2\n\n",
1759+
"data: message 3\n\n",
1760+
}
1761+
1762+
for _, msg := range messages {
1763+
if _, err := c.WriteString(msg); err != nil {
1764+
return err
1765+
}
1766+
}
1767+
1768+
return nil
1769+
})
1770+
})
1771+
defer func() { require.NoError(t, app.Shutdown()) }()
1772+
1773+
client := New().SetStreamResponseBody(true)
1774+
resp, err := client.Get("http://" + addr + "/sse")
1775+
require.NoError(t, err)
1776+
defer resp.Close()
1777+
1778+
bodyStream := resp.BodyStream()
1779+
require.NotNil(t, bodyStream)
1780+
1781+
buffer := make([]byte, 1024)
1782+
n, err := bodyStream.Read(buffer)
1783+
require.NoError(t, err)
1784+
require.Greater(t, n, 0)
1785+
1786+
content := string(buffer[:n])
1787+
require.Contains(t, content, "data: message 1")
1788+
}
1789+
1790+
func Test_Client_StreamResponseBody_LargeResponse(t *testing.T) {
1791+
t.Parallel()
1792+
1793+
largeData := make([]byte, 1024*1024)
1794+
for i := range largeData {
1795+
largeData[i] = byte(i % 256)
1796+
}
1797+
1798+
app, addr := startTestServerWithPort(t, func(app *fiber.App) {
1799+
app.Get("/large", func(c fiber.Ctx) error {
1800+
return c.Send(largeData)
1801+
})
1802+
})
1803+
defer func() { require.NoError(t, app.Shutdown()) }()
1804+
client := New().SetStreamResponseBody(true)
1805+
resp, err := client.Get("http://" + addr + "/large")
1806+
require.NoError(t, err)
1807+
defer resp.Close()
1808+
bodyStream := resp.BodyStream()
1809+
require.NotNil(t, bodyStream)
1810+
streamedData, err := io.ReadAll(bodyStream)
1811+
require.NoError(t, err)
1812+
require.Equal(t, largeData, streamedData)
1813+
client2 := New()
1814+
resp2, err := client2.Get("http://" + addr + "/large")
1815+
require.NoError(t, err)
1816+
defer resp2.Close()
1817+
body := resp2.Body()
1818+
require.Equal(t, largeData, body)
1819+
}
1820+
1821+
func Test_Client_StreamResponseBody_Disabled_Default(t *testing.T) {
1822+
t.Parallel()
1823+
1824+
app, addr := startTestServerWithPort(t, func(app *fiber.App) {
1825+
app.Get("/test", func(c fiber.Ctx) error {
1826+
return c.SendString("Hello, World!")
1827+
})
1828+
})
1829+
defer func() { require.NoError(t, app.Shutdown()) }()
1830+
1831+
client := New()
1832+
resp, err := client.Get("http://" + addr + "/test")
1833+
require.NoError(t, err)
1834+
defer resp.Close()
1835+
1836+
body := resp.Body()
1837+
require.Equal(t, "Hello, World!", string(body))
1838+
1839+
bodyStream := resp.BodyStream()
1840+
require.NotNil(t, bodyStream)
1841+
}
1842+
1843+
func Test_Client_StreamResponseBody_ChainableMethods(t *testing.T) {
1844+
t.Parallel()
1845+
1846+
client := New().
1847+
SetStreamResponseBody(true).
1848+
SetTimeout(time.Second * 5).
1849+
SetStreamResponseBody(false)
1850+
1851+
require.False(t, client.StreamResponseBody())
1852+
}
1853+
1854+
func Test_Request_StreamResponseBody(t *testing.T) {
1855+
t.Parallel()
1856+
1857+
app, addr := startTestServerWithPort(t, func(app *fiber.App) {
1858+
app.Get("/test", func(c fiber.Ctx) error {
1859+
return c.SendString("Hello, World!")
1860+
})
1861+
})
1862+
defer func() { require.NoError(t, app.Shutdown()) }()
1863+
1864+
client := New().SetStreamResponseBody(false) // client has streaming disabled
1865+
req := client.R().SetStreamResponseBody(true)
1866+
require.True(t, req.StreamResponseBody())
1867+
1868+
resp, err := req.Get("http://" + addr + "/test")
1869+
require.NoError(t, err)
1870+
defer resp.Close()
1871+
bodyStream := resp.BodyStream()
1872+
require.NotNil(t, bodyStream)
1873+
req2 := client.R().SetStreamResponseBody(false)
1874+
require.False(t, req2.StreamResponseBody())
1875+
clientWithStreaming := New().SetStreamResponseBody(true)
1876+
req3 := clientWithStreaming.R()
1877+
require.True(t, req3.StreamResponseBody()) // Should inherit from client
1878+
req4 := client.R().
1879+
SetStreamResponseBody(true).
1880+
SetTimeout(time.Second * 5).
1881+
SetStreamResponseBody(false)
1882+
require.False(t, req4.StreamResponseBody())
1883+
}

client/core.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,16 @@ func (c *core) execFunc() (*Response, error) {
8181
c.req.RawRequest.CopyTo(reqv)
8282
cfg := c.getRetryConfig()
8383

84+
// Set StreamResponseBody option from request setting (falls back to client setting)
85+
originalStreamResponseBody := c.client.fasthttp.StreamResponseBody
86+
c.client.fasthttp.StreamResponseBody = c.req.StreamResponseBody()
87+
8488
var err error
8589
go func() {
8690
respv := fasthttp.AcquireResponse()
8791
defer func() {
92+
// Restore original StreamResponseBody setting
93+
c.client.fasthttp.StreamResponseBody = originalStreamResponseBody
8894
fasthttp.ReleaseRequest(reqv)
8995
fasthttp.ReleaseResponse(respv)
9096
}()

client/request.go

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@ type Request struct {
6767
timeout time.Duration
6868
maxRedirects int
6969

70-
bodyType bodyType
70+
bodyType bodyType
71+
streamResponseBody *bool // nil means use client setting
7172
}
7273

7374
// Method returns the HTTP method set in the Request.
@@ -590,6 +591,25 @@ func (r *Request) SetMaxRedirects(count int) *Request {
590591
return r
591592
}
592593

594+
// StreamResponseBody returns the StreamResponseBody setting for this request.
595+
// Returns the client's setting if not explicitly set at the request level.
596+
func (r *Request) StreamResponseBody() bool {
597+
if r.streamResponseBody != nil {
598+
return *r.streamResponseBody
599+
}
600+
if r.client != nil {
601+
return r.client.streamResponseBody
602+
}
603+
return false
604+
}
605+
606+
// SetStreamResponseBody sets the StreamResponseBody option for this specific request,
607+
// overriding the client-level setting.
608+
func (r *Request) SetStreamResponseBody(enable bool) *Request {
609+
r.streamResponseBody = &enable
610+
return r
611+
}
612+
593613
// checkClient ensures that a Client is set. If none is set, it defaults to the global defaultClient.
594614
func (r *Request) checkClient() {
595615
if r.client == nil {
@@ -656,6 +676,7 @@ func (r *Request) Reset() {
656676
r.maxRedirects = 0
657677
r.bodyType = noBody
658678
r.boundary = boundary
679+
r.streamResponseBody = nil
659680

660681
for len(r.files) != 0 {
661682
t := r.files[0]

client/response.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,17 @@ func (r *Response) Body() []byte {
8989
return r.RawResponse.Body()
9090
}
9191

92+
// BodyStream returns the response body as a stream reader.
93+
// Note: When using BodyStream(), the response body is not copied to memory,
94+
// so calling Body() afterwards may return an empty slice.
95+
func (r *Response) BodyStream() io.Reader {
96+
if stream := r.RawResponse.BodyStream(); stream != nil {
97+
return stream
98+
}
99+
// If streaming is not enabled, return a bytes.Reader from the regular body
100+
return bytes.NewReader(r.RawResponse.Body())
101+
}
102+
92103
// String returns the response body as a trimmed string.
93104
func (r *Response) String() string {
94105
return utils.Trim(string(r.Body()), ' ')

client/response_test.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -538,3 +538,75 @@ func Test_Response_Save(t *testing.T) {
538538
require.Error(t, err)
539539
})
540540
}
541+
542+
func Test_Response_BodyStream(t *testing.T) {
543+
t.Parallel()
544+
545+
server := startTestServer(t, func(app *fiber.App) {
546+
app.Get("/stream", func(c fiber.Ctx) error {
547+
return c.SendString("streaming data")
548+
})
549+
app.Get("/large", func(c fiber.Ctx) error {
550+
data := make([]byte, 1024)
551+
for i := range data {
552+
data[i] = byte('A' + i%26)
553+
}
554+
return c.Send(data)
555+
})
556+
})
557+
defer server.stop()
558+
559+
t.Run("basic streaming", func(t *testing.T) {
560+
client := New().SetDial(server.dial()).SetStreamResponseBody(true)
561+
562+
resp, err := client.Get("http://example.com/stream")
563+
require.NoError(t, err)
564+
defer resp.Close()
565+
bodyStream := resp.BodyStream()
566+
require.NotNil(t, bodyStream)
567+
data, err := io.ReadAll(bodyStream)
568+
require.NoError(t, err)
569+
require.Equal(t, "streaming data", string(data))
570+
})
571+
572+
t.Run("large response streaming", func(t *testing.T) {
573+
client := New().SetDial(server.dial()).SetStreamResponseBody(true)
574+
resp, err := client.Get("http://example.com/large")
575+
require.NoError(t, err)
576+
defer resp.Close()
577+
bodyStream := resp.BodyStream()
578+
require.NotNil(t, bodyStream)
579+
buffer := make([]byte, 256)
580+
var totalRead []byte
581+
for {
582+
n, err := bodyStream.Read(buffer)
583+
if n > 0 {
584+
totalRead = append(totalRead, buffer[:n]...)
585+
}
586+
if err == io.EOF {
587+
break
588+
}
589+
require.NoError(t, err)
590+
}
591+
require.Equal(t, 1024, len(totalRead))
592+
for i := 0; i < 1024; i++ {
593+
expected := byte('A' + i%26)
594+
require.Equal(t, expected, totalRead[i])
595+
}
596+
})
597+
598+
t.Run("compare with regular body", func(t *testing.T) {
599+
client1 := New().SetDial(server.dial())
600+
resp1, err := client1.Get("http://example.com/stream")
601+
require.NoError(t, err)
602+
defer resp1.Close()
603+
normalBody := resp1.Body()
604+
client2 := New().SetDial(server.dial()).SetStreamResponseBody(true)
605+
resp2, err := client2.Get("http://example.com/stream")
606+
require.NoError(t, err)
607+
defer resp2.Close()
608+
streamedBody, err := io.ReadAll(resp2.BodyStream())
609+
require.NoError(t, err)
610+
require.Equal(t, normalBody, streamedBody)
611+
})
612+
}

0 commit comments

Comments
 (0)