Skip to content

Conversation

Abhirup-99
Copy link

@Abhirup-99 Abhirup-99 commented Aug 25, 2025

This pull request introduces support for streaming HTTP response bodies in the client, allowing responses to be read as streams rather than being fully loaded into memory. This is particularly useful for handling large responses or server-sent events. The changes include new configuration options at both the client and request levels, implementation of the streaming logic, and comprehensive tests to ensure correct behavior.

Streaming response body support:

  • Added a streamResponseBody field to the Client struct, along with SetStreamResponseBody and StreamResponseBody methods to enable or disable response body streaming at the client level (client/client.go). [1] [2]
  • Added a streamResponseBody field to the Request struct, with corresponding SetStreamResponseBody and StreamResponseBody methods to allow per-request configuration that overrides the client setting (client/request.go). [1] [2]
  • Updated the request execution logic to set and restore the underlying HTTP client's streaming option based on the request or client configuration (client/core.go).
  • Implemented the BodyStream method on the Response struct to provide an io.Reader for streaming the response body, falling back to an in-memory reader if streaming is not enabled (client/response.go).

Testing and validation:

  • Added extensive tests to verify streaming behavior, including tests for server-sent events, large responses, default and overridden settings, and chainable configuration methods (client/client_test.go, client/response_test.go). [1] [2]
  • Ensured that request resetting clears the streamResponseBody override (client/request.go).# Description

Please provide a clear and concise description of the changes you've made and the problem they address. Include the purpose of the change, any relevant issues it solves, and the benefits it brings to the project. If this change introduces new features or adjustments, highlight them here.

Related #3425

@Copilot Copilot AI review requested due to automatic review settings August 25, 2025 13:03
@Abhirup-99 Abhirup-99 requested a review from a team as a code owner August 25, 2025 13:03
Copy link

welcome bot commented Aug 25, 2025

Thanks for opening this pull request! 🎉 Please check out our contributing guidelines. If you need help or want to chat with us, join us on Discord https://gofiber.io/discord

Copy link
Contributor

coderabbitai bot commented Aug 25, 2025

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

Walkthrough

Adds client-level controls for fasthttp response streaming, Response streaming accessors and Save() streaming behavior, extensive streaming tests, reorders Request struct fields only, and duplicates documentation blocks for new methods.

Changes

Cohort / File(s) Summary of Changes
Client streaming controls
client/client.go
Adds StreamResponseBody() bool and SetStreamResponseBody(enable bool) *Client to get/set fasthttp.StreamResponseBody.
Response streaming API and Save logic
client/response.go
Adds BodyStream() io.Reader and IsStreaming() bool. Save() now copies from BodyStream() when streaming; writer handling unified and error message updated.
Client tests (streaming-related)
client/client_test.go
Adds tests for toggling streaming, SSE streaming, large-response streaming, default (disabled) behavior, and chainable configuration.
Response tests (streaming scenarios)
client/response_test.go
Adds comprehensive tests for BodyStream: basic stream, chunked/delayed reads, SSE, progressive JSON streaming, interrupted connections, large payloads, Save() writer-close behavior, and non-streaming fallbacks.
Request struct reordering
client/request.go
Reorders fields within type Request only; no fields added/removed and no behavior changes.
Docs update (duplicated blocks)
docs/client/rest.md
Inserts documentation blocks for StreamResponseBody, SetStreamResponseBody, RetryConfig, and SetRetryConfig (appears duplicated in two locations).

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor User
  participant Client
  participant fasthttp
  participant Server
  participant Response
  participant Writer as io.Writer

  rect rgb(242,248,255)
    note left of Client: configure streaming flag
    User->>Client: SetStreamResponseBody(true/false)
    Client->>fasthttp: set StreamResponseBody flag
  end

  User->>Client: Do request
  Client->>fasthttp: execute request
  fasthttp->>Server: GET /resource
  Server-->>fasthttp: HTTP response (maybe chunked)
  fasthttp-->>Client: RawResponse
  Client-->>User: Response

  alt Streaming enabled
    User->>Response: BodyStream()
    Response-->>User: io.Reader (stream)
    User->>Response: Save(Writer)
    Response->>Writer: Copy from BodyStream()
  else Streaming disabled
    User->>Response: Body()
    Response-->>User: []byte (full body)
    User->>Response: Save(Writer)
    Response->>Writer: Copy from memory buffer
  end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related issues

Suggested labels

📒 Documentation

Suggested reviewers

  • efectn
  • gaby

Poem

A rabbit taps the network stream,
Sip by sip, a bytey dream.
Toggle on, the rivers flow—
SSE whispers, chunks in tow.
I nibble bytes and softly beam. 🐇

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore or @coderabbit ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary of Changes

Hello @Abhirup-99, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a significant enhancement to the client by adding support for streaming HTTP response bodies. This allows consumers to process large responses or server-sent events incrementally, preventing excessive memory consumption. The changes include new configuration options at both the client and individual request levels, along with the necessary logic to facilitate this streaming behavior and comprehensive test coverage.

Highlights

  • Client-level Streaming Configuration: Introduced a streamResponseBody field and corresponding SetStreamResponseBody and StreamResponseBody methods to the Client struct, enabling global control over response body streaming.
  • Request-level Streaming Override: Added a streamResponseBody field and methods to the Request struct, allowing per-request configuration that can override the client's default streaming setting.
  • Dynamic Streaming Execution: Modified the core request execution logic to dynamically set and restore the underlying HTTP client's streaming option based on the active request or client configuration.
  • Streamed Body Access: Implemented a BodyStream() method on the Response struct, providing an io.Reader interface for efficient, stream-based consumption of response bodies.
  • Comprehensive Test Coverage: Added extensive unit tests to validate the new streaming functionality, covering scenarios like server-sent events, large responses, default settings, overrides, and method chaining.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in issue comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@Abhirup-99
Copy link
Author

I will update the documentation if the approach seems sound enough.

Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This pull request adds support for streaming HTTP response bodies in the client library, enabling responses to be read as streams instead of being fully loaded into memory. This is particularly useful for handling large responses or server-sent events.

  • Added streaming configuration at both client and request levels with override capability
  • Implemented BodyStream() method on Response struct to provide streaming access
  • Added comprehensive test coverage for various streaming scenarios

Reviewed Changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
client/client.go Adds streamResponseBody field and configuration methods to Client struct
client/request.go Adds per-request streaming configuration with client-level fallback
client/core.go Updates request execution to handle streaming option setting and restoration
client/response.go Implements BodyStream method for streaming response body access
client/client_test.go Comprehensive tests for client-level streaming configuration and behavior
client/response_test.go Tests for response streaming functionality including large data handling

client/core.go Outdated
Comment on lines 84 to 93
// Set StreamResponseBody option from request setting (falls back to client setting)
originalStreamResponseBody := c.client.fasthttp.StreamResponseBody
c.client.fasthttp.StreamResponseBody = c.req.StreamResponseBody()

var err error
go func() {
respv := fasthttp.AcquireResponse()
defer func() {
// Restore original StreamResponseBody setting
c.client.fasthttp.StreamResponseBody = originalStreamResponseBody
Copy link
Preview

Copilot AI Aug 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The StreamResponseBody setting is being modified on the shared client instance without proper synchronization. In a multi-threaded environment, concurrent requests could interfere with each other's streaming configuration. Consider using per-request client instances or adding proper locking mechanisms.

Suggested change
// Set StreamResponseBody option from request setting (falls back to client setting)
originalStreamResponseBody := c.client.fasthttp.StreamResponseBody
c.client.fasthttp.StreamResponseBody = c.req.StreamResponseBody()
var err error
go func() {
respv := fasthttp.AcquireResponse()
defer func() {
// Restore original StreamResponseBody setting
c.client.fasthttp.StreamResponseBody = originalStreamResponseBody
// Create a shallow copy of the fasthttp.Client to avoid data races on StreamResponseBody
clientCopy := *c.client.fasthttp
clientCopy.StreamResponseBody = c.req.StreamResponseBody()
var err error
go func() {
respv := fasthttp.AcquireResponse()
defer func() {

Copilot uses AI. Check for mistakes.

client/core.go Outdated
Comment on lines 84 to 93
// Set StreamResponseBody option from request setting (falls back to client setting)
originalStreamResponseBody := c.client.fasthttp.StreamResponseBody
c.client.fasthttp.StreamResponseBody = c.req.StreamResponseBody()

var err error
go func() {
respv := fasthttp.AcquireResponse()
defer func() {
// Restore original StreamResponseBody setting
c.client.fasthttp.StreamResponseBody = originalStreamResponseBody
Copy link
Preview

Copilot AI Aug 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The restoration of the original StreamResponseBody setting in the defer function may not execute in case of panics or early returns, potentially leaving the client in an inconsistent state. Additionally, this restoration happens in a goroutine which could create race conditions with other concurrent requests.

Suggested change
// Set StreamResponseBody option from request setting (falls back to client setting)
originalStreamResponseBody := c.client.fasthttp.StreamResponseBody
c.client.fasthttp.StreamResponseBody = c.req.StreamResponseBody()
var err error
go func() {
respv := fasthttp.AcquireResponse()
defer func() {
// Restore original StreamResponseBody setting
c.client.fasthttp.StreamResponseBody = originalStreamResponseBody
// Use a per-request copy of the fasthttp.Client to avoid data races on StreamResponseBody
clientCopy := *c.client.fasthttp
clientCopy.StreamResponseBody = c.req.StreamResponseBody()
var err error
go func() {
respv := fasthttp.AcquireResponse()
defer func() {

Copilot uses AI. Check for mistakes.

Copy link
Contributor

Warning

Gemini encountered an error creating the review. You can try again by commenting /gemini review.

@gaby gaby added this to v3 Aug 25, 2025
@gaby gaby added this to the v3 milestone Aug 25, 2025
@gaby gaby changed the title Add stream response body support 🔥 feat: Add StreamResponseBody support for the Client Aug 25, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🧹 Nitpick comments (4)
client/request.go (1)

594-605: Effective value resolution is correct and robust

Falls back to the client setting when unset; default false if no client is attached yet. Consider documenting this precedence in the method comment for discoverability.

client/client_test.go (3)

1750-1767: Optional: make SSE handler actually stream (flush between events) for stronger signal

Right now the handler writes three messages and returns. This can still pass even if the server buffers and sends a single chunk. Consider using SetBodyStreamWriter and flushing between writes to exercise true streaming behavior. Example:

app.Get("/sse", func(c fiber.Ctx) error {
	c.Set("Content-Type", "text/event-stream")
	c.Set("Cache-Control", "no-cache")
	c.Set("Connection", "keep-alive")

	c.Context().SetBodyStreamWriter(func(w *bufio.Writer) {
		msgs := []string{"data: message 1\n\n", "data: message 2\n\n", "data: message 3\n\n"}
		for _, m := range msgs {
			_, _ = w.WriteString(m)
			_ = w.Flush()
			time.Sleep(10 * time.Millisecond)
		}
	})
	return nil
})

This reduces false positives where streaming isn’t actually occurring.


1836-1841: Strengthen fallback contract when streaming is disabled by reading BodyStream

You assert BodyStream() is non-nil, but don’t verify it yields the same content as Body() in the fallback path. Read it and compare.

Apply this diff:

 	body := resp.Body()
 	require.Equal(t, "Hello, World!", string(body))

 	bodyStream := resp.BodyStream()
 	require.NotNil(t, bodyStream)
+	streamed, err := io.ReadAll(bodyStream)
+	require.NoError(t, err)
+	require.Equal(t, body, streamed)

1854-1883: Please add a follow-up request and Reset() behavior test to guard against per-request overrides leaking

You’ve already verified the getter/inheritance logic for StreamResponseBody, but we’re still missing two important checks in client/client_test.go around lines 1854–1883:

  • A second request on the same client after an override, to ensure the per-request override doesn’t persist.
  • Exercising Request.Reset() to confirm it clears any per-request streaming flag and falls back to the client default.

Suggested diff to insert after the existing assertions:

 	req2 := client.R().SetStreamResponseBody(false)
 	require.False(t, req2.StreamResponseBody())

+	// Follow-up call: ensure the override didn’t leak into actual behavior.
+	resp2b, err := req2.Get("http://" + addr + "/test")
+	require.NoError(t, err)
+	defer resp2b.Close()
+	require.Equal(t, "Hello, World!", string(resp2b.Body()))

 	clientWithStreaming := New().SetStreamResponseBody(true)
 	req3 := clientWithStreaming.R()
 	require.True(t, req3.StreamResponseBody()) // Should inherit from client
 	req4 := client.R().
 		SetStreamResponseBody(true).
 		SetTimeout(time.Second * 5).
 		SetStreamResponseBody(false)
 	require.False(t, req4.StreamResponseBody())
+
+	// After Reset, per-request override should be cleared and revert to client setting (false).
+	reqReset := client.R().SetStreamResponseBody(true)
+	reqReset.Reset()
+	require.False(t, reqReset.StreamResponseBody())

This will catch regressions where the underlying fasthttp flag isn’t properly restored and confirm Reset() clears the override.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 64a7113 and a725b97.

📒 Files selected for processing (6)
  • client/client.go (2 hunks)
  • client/client_test.go (1 hunks)
  • client/core.go (1 hunks)
  • client/request.go (3 hunks)
  • client/response.go (1 hunks)
  • client/response_test.go (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
client/response_test.go (1)
client/client.go (2)
  • Get (706-708)
  • New (647-652)
client/client_test.go (1)
client/client.go (2)
  • New (647-652)
  • Get (706-708)
🔇 Additional comments (6)
client/request.go (3)

70-72: Good choice: tri-state per-request override

Using a pointer to bool for streamResponseBody cleanly distinguishes unset from explicit true/false. LGTM.


606-612: Chainable setter is fine

No issues.


679-679: Reset clears the override as expected

Resetting streamResponseBody to nil ensures subsequent requests inherit the client default. LGTM.

client/client_test.go (3)

1737-1745: LGTM: basic client-level toggle coverage is solid

Covers default false, enable, disable. Clear, fast, and deterministic.


1793-1819: LGTM: large-response streaming vs non-streaming parity

Good end-to-end validation that BodyStream() delivers identical data to Body() for large payloads, with streaming enabled/disabled. Using 1 MiB is a reasonable balance for CI.


1846-1852: LGTM: chainable method precedence

Confirms last-set-wins across chained setters with unrelated setters in between. Clear and sufficient.

Comment on lines 1781 to 1788
buffer := make([]byte, 1024)
n, err := bodyStream.Read(buffer)
require.NoError(t, err)
require.Greater(t, n, 0)

content := string(buffer[:n])
require.Contains(t, content, "data: message 1")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Prevent flaky read assertion: allow io.EOF when n > 0

io.Reader is allowed to return (n > 0, err == io.EOF). Requiring NoError can intermittently fail. Accept EOF if bytes were read.

Apply this diff:

-	buffer := make([]byte, 1024)
-	n, err := bodyStream.Read(buffer)
-	require.NoError(t, err)
-	require.Greater(t, n, 0)
+	buffer := make([]byte, 1024)
+	n, err := bodyStream.Read(buffer)
+	require.Greater(t, n, 0)
+	if err != nil {
+		require.ErrorIs(t, err, io.EOF)
+	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
buffer := make([]byte, 1024)
n, err := bodyStream.Read(buffer)
require.NoError(t, err)
require.Greater(t, n, 0)
content := string(buffer[:n])
require.Contains(t, content, "data: message 1")
}
buffer := make([]byte, 1024)
n, err := bodyStream.Read(buffer)
require.Greater(t, n, 0)
if err != nil {
require.ErrorIs(t, err, io.EOF)
}
content := string(buffer[:n])
require.Contains(t, content, "data: message 1")
}
🤖 Prompt for AI Agents
In client/client_test.go around lines 1781-1788, the test currently requires
NoError on bodyStream.Read which is flaky because io.Reader may return (n>0,
err==io.EOF); change the assertion to allow io.EOF when bytes were read: after
reading and asserting n>0, assert either err==nil or errors.Is(err, io.EOF) (or
use require.NoError when err==nil else require.True(errors.Is(err, io.EOF))),
and add imports for "io" and/or "errors" if not already present.

Comment on lines 542 to 1019
func Test_Response_BodyStream(t *testing.T) {
t.Parallel()

server := startTestServer(t, func(app *fiber.App) {
app.Get("/stream", func(c fiber.Ctx) error {
return c.SendString("streaming data")
})
app.Get("/large", func(c fiber.Ctx) error {
data := make([]byte, 1024)
for i := range data {
data[i] = byte('A' + i%26)
}
return c.Send(data)
})
})
defer server.stop()

t.Run("basic streaming", func(t *testing.T) {
client := New().SetDial(server.dial()).SetStreamResponseBody(true)

resp, err := client.Get("http://example.com/stream")
require.NoError(t, err)
defer resp.Close()
bodyStream := resp.BodyStream()
require.NotNil(t, bodyStream)
data, err := io.ReadAll(bodyStream)
require.NoError(t, err)
require.Equal(t, "streaming data", string(data))
})

t.Run("large response streaming", func(t *testing.T) {
client := New().SetDial(server.dial()).SetStreamResponseBody(true)
resp, err := client.Get("http://example.com/large")
require.NoError(t, err)
defer resp.Close()
bodyStream := resp.BodyStream()
require.NotNil(t, bodyStream)
buffer := make([]byte, 256)
var totalRead []byte
for {
n, err := bodyStream.Read(buffer)
if n > 0 {
totalRead = append(totalRead, buffer[:n]...)
}
if err == io.EOF {
break
}
require.NoError(t, err)
}
require.Equal(t, 1024, len(totalRead))
for i := 0; i < 1024; i++ {
expected := byte('A' + i%26)
require.Equal(t, expected, totalRead[i])
}
})

t.Run("compare with regular body", func(t *testing.T) {
client1 := New().SetDial(server.dial())
resp1, err := client1.Get("http://example.com/stream")
require.NoError(t, err)
defer resp1.Close()
normalBody := resp1.Body()
client2 := New().SetDial(server.dial()).SetStreamResponseBody(true)
resp2, err := client2.Get("http://example.com/stream")
require.NoError(t, err)
defer resp2.Close()
streamedBody, err := io.ReadAll(resp2.BodyStream())
require.NoError(t, err)
require.Equal(t, normalBody, streamedBody)
})
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix flakiness: don’t retain Body() buffer after closing the Response

In “compare with regular body”, normalBody references the internal buffer of resp1. After resp1.Close(), that memory can be reused by the pool, leading to flaky comparisons. Copy the bytes before closing.

 	t.Run("compare with regular body", func(t *testing.T) {
 		client1 := New().SetDial(server.dial())
 		resp1, err := client1.Get("http://example.com/stream")
 		require.NoError(t, err)
 		defer resp1.Close()
-		normalBody := resp1.Body()
+		normalBody := append([]byte(nil), resp1.Body()...)
 		client2 := New().SetDial(server.dial()).SetStreamResponseBody(true)
 		resp2, err := client2.Get("http://example.com/stream")
 		require.NoError(t, err)
 		defer resp2.Close()
 		streamedBody, err := io.ReadAll(resp2.BodyStream())
 		require.NoError(t, err)
 		require.Equal(t, normalBody, streamedBody)
 	})

Optional coverage: add an assertion that len(resp.Body()) == 0 when streaming is enabled to codify the documented behavior.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func Test_Response_BodyStream(t *testing.T) {
t.Parallel()
server := startTestServer(t, func(app *fiber.App) {
app.Get("/stream", func(c fiber.Ctx) error {
return c.SendString("streaming data")
})
app.Get("/large", func(c fiber.Ctx) error {
data := make([]byte, 1024)
for i := range data {
data[i] = byte('A' + i%26)
}
return c.Send(data)
})
})
defer server.stop()
t.Run("basic streaming", func(t *testing.T) {
client := New().SetDial(server.dial()).SetStreamResponseBody(true)
resp, err := client.Get("http://example.com/stream")
require.NoError(t, err)
defer resp.Close()
bodyStream := resp.BodyStream()
require.NotNil(t, bodyStream)
data, err := io.ReadAll(bodyStream)
require.NoError(t, err)
require.Equal(t, "streaming data", string(data))
})
t.Run("large response streaming", func(t *testing.T) {
client := New().SetDial(server.dial()).SetStreamResponseBody(true)
resp, err := client.Get("http://example.com/large")
require.NoError(t, err)
defer resp.Close()
bodyStream := resp.BodyStream()
require.NotNil(t, bodyStream)
buffer := make([]byte, 256)
var totalRead []byte
for {
n, err := bodyStream.Read(buffer)
if n > 0 {
totalRead = append(totalRead, buffer[:n]...)
}
if err == io.EOF {
break
}
require.NoError(t, err)
}
require.Equal(t, 1024, len(totalRead))
for i := 0; i < 1024; i++ {
expected := byte('A' + i%26)
require.Equal(t, expected, totalRead[i])
}
})
t.Run("compare with regular body", func(t *testing.T) {
client1 := New().SetDial(server.dial())
resp1, err := client1.Get("http://example.com/stream")
require.NoError(t, err)
defer resp1.Close()
normalBody := resp1.Body()
client2 := New().SetDial(server.dial()).SetStreamResponseBody(true)
resp2, err := client2.Get("http://example.com/stream")
require.NoError(t, err)
defer resp2.Close()
streamedBody, err := io.ReadAll(resp2.BodyStream())
require.NoError(t, err)
require.Equal(t, normalBody, streamedBody)
})
}
t.Run("compare with regular body", func(t *testing.T) {
client1 := New().SetDial(server.dial())
resp1, err := client1.Get("http://example.com/stream")
require.NoError(t, err)
defer resp1.Close()
normalBody := append([]byte(nil), resp1.Body()...)
client2 := New().SetDial(server.dial()).SetStreamResponseBody(true)
resp2, err := client2.Get("http://example.com/stream")
require.NoError(t, err)
defer resp2.Close()
streamedBody, err := io.ReadAll(resp2.BodyStream())
require.NoError(t, err)
require.Equal(t, normalBody, streamedBody)
})
🤖 Prompt for AI Agents
In client/response_test.go around lines 542 to 612, the "compare with regular
body" subtest keeps normalBody as a reference to resp1's internal buffer and
then closes resp1, which can make the comparison flaky because the buffer may be
reused; fix by making a copy of resp1.Body() (e.g., allocate and copy the bytes)
before calling resp1.Close(), then use that copied slice for the equality check;
optionally add an assertion that when streaming is enabled the non-streaming
response's Body() is emptied (len(resp.Body()) == 0) to codify documented
behavior.

@gaby
Copy link
Member

gaby commented Aug 25, 2025

@Abhirup-99 The tests are failing. You can run them locally by running make tests

@Abhirup-99
Copy link
Author

@gaby fixed the failing tests

Copy link

codecov bot commented Aug 25, 2025

Codecov Report

❌ Patch coverage is 55.00000% with 9 lines in your changes missing coverage. Please review.
✅ Project coverage is 91.99%. Comparing base (64a7113) to head (909b1c8).
⚠️ Report is 21 commits behind head on main.

Files with missing lines Patch % Lines
client/response.go 40.00% 5 Missing and 4 partials ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3711      +/-   ##
==========================================
+ Coverage   91.79%   91.99%   +0.19%     
==========================================
  Files         114      115       +1     
  Lines       11539    11599      +60     
==========================================
+ Hits        10592    10670      +78     
+ Misses        685      673      -12     
+ Partials      262      256       -6     
Flag Coverage Δ
unittests 91.99% <55.00%> (+0.19%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (3)
client/client.go (2)

538-557: Reset should also clear streamResponseBody to default false

Client.Reset clears most flags but leaves streamResponseBody unchanged, leading to surprising behavior post-reset.

 func (c *Client) Reset() {
 	c.fasthttp = &fasthttp.Client{}
 	c.baseURL = ""
 	c.timeout = 0
 	c.userAgent = ""
 	c.referer = ""
 	c.retryConfig = nil
 	c.debug = false
+	c.streamResponseBody = false

58-61: Guard temporary per-request flips of fasthttp.Client.StreamResponseBody with a dedicated mutex on Client

If core executes by toggling the shared fasthttp.Client.StreamResponseBody between requests, concurrent requests can race. Add a small private mutex dedicated to this toggle. This keeps the public API unchanged and avoids overloading c.mu, which guards other state.

Apply:

 type Client struct {
  	logger   log.CommonLogger
  	fasthttp *fasthttp.Client
@@
-	timeout            time.Duration
-	mu                 sync.RWMutex
-	debug              bool
-	streamResponseBody bool
+	timeout            time.Duration
+	mu                 sync.RWMutex
+	debug              bool
+	streamResponseBody bool
+	// protects temporary flips of fasthttp.Client.StreamResponseBody during per-request execution
+	streamRespMu       sync.Mutex
 }

To confirm whether the toggle approach is used (vs. cloning a temp fasthttp.Client), run:

#!/bin/bash
# Inspect how StreamResponseBody is applied during execution.
rg -n -C2 --type=go 'StreamResponseBody' client/core.go client | sed -n '1,200p'
client/response_test.go (1)

598-611: Fix flakiness: copy Body() before closing resp1

normalBody aliases resp1’s internal buffer; after Close(), pooled buffers may be reused, causing nondeterministic failures. Copy the bytes before closing.

-		defer resp1.Close()
-		normalBody := resp1.Body()
+		defer resp1.Close()
+		normalBody := append([]byte(nil), resp1.Body()...)
🧹 Nitpick comments (2)
client/client.go (2)

439-451: API shape LGTM; document precedence explicitly in the comment

The getter/setter are clear and chainable. Add one line noting that a request-level setting overrides the client-level default for discoverability.

 // SetStreamResponseBody enables or disables response body streaming.
 // When enabled, the response body can be read as a stream using BodyStream()
 // instead of being fully loaded into memory. This is useful for large responses
-// or server-sent events.
+// or server-sent events. Note: a per-request setting (Request.SetStreamResponseBody)
+// overrides this client-level default.
 func (c *Client) SetStreamResponseBody(enable bool) *Client {
 	c.streamResponseBody = enable
 	return c
 }

563-575: Optional: plumb StreamResponseBody through Config for convenience

Many helpers accept Config; adding an optional pointer makes it easy to set streaming per call without constructing a Request manually.

 type Config struct {
 	Ctx          context.Context //nolint:containedctx // It's needed to be stored in the config.
 	Body         any
 	Header       map[string]string
 	Param        map[string]string
 	Cookie       map[string]string
 	PathParam    map[string]string
 	FormData     map[string]string
 	UserAgent    string
 	Referer      string
 	File         []*File
 	Timeout      time.Duration
 	MaxRedirects int
+	// Optional per-request override; when set, overrides the client default.
+	StreamResponseBody *bool
 }
@@
 	if cfg.MaxRedirects != 0 {
 		req.SetMaxRedirects(cfg.MaxRedirects)
 	}
 
+	if cfg.StreamResponseBody != nil {
+		req.SetStreamResponseBody(*cfg.StreamResponseBody)
+	}
+
 	if cfg.Body != nil {
 		req.SetJSON(cfg.Body)
 		return
 	}

Also applies to: 577-635

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between a725b97 and 9c12637.

📒 Files selected for processing (6)
  • client/client.go (2 hunks)
  • client/client_test.go (1 hunks)
  • client/core.go (2 hunks)
  • client/request.go (3 hunks)
  • client/response.go (1 hunks)
  • client/response_test.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • client/response.go
  • client/client_test.go
  • client/core.go
🔇 Additional comments (5)
client/response_test.go (1)

542-596: Solid coverage for basic and large streaming paths

The test server and the two subtests validate BodyStream() on small and larger responses. Clear and deterministic.

client/request.go (4)

70-72: *Good choice using bool to represent tri-state (unset vs true/false)

This enables request-level override while preserving client default. No action needed.


594-605: Precedence and fallback logic reads correctly

Returns request override when set; otherwise falls back to client-level; defaults to false if no client. Matches the intended design.


606-612: Setter is fine; pointer escape is safe

Using &enable is idiomatic (escapes to heap). Chainable API is consistent with others.


679-679: Reset correctly clears the per-request override

Resetting streamResponseBody to nil ensures the request returns to default behavior. Good.

@gaby gaby moved this to In Progress in v3 Aug 25, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
client/core.go (1)

132-141: Minor duplication: unify Do vs DoRedirects callsites

The four callsites using fastHTTPClient.Do/DoRedirects under retry and non-retry paths duplicate logic. A tiny helper makes it harder to regress:

- if cfg != nil {
-   err = retry.NewExponentialBackoff(*cfg).Retry(func() error {
-     if shouldRedirect {
-       return fastHTTPClient.DoRedirects(reqv, respv, c.req.maxRedirects)
-     }
-     return fastHTTPClient.Do(reqv, respv)
-   })
- } else {
-   if shouldRedirect {
-     err = fastHTTPClient.DoRedirects(reqv, respv, c.req.maxRedirects)
-   } else {
-     err = fastHTTPClient.Do(reqv, respv)
-   }
- }
+call := func() error {
+  if shouldRedirect {
+    return fastHTTPClient.DoRedirects(reqv, respv, c.req.maxRedirects)
+  }
+  return fastHTTPClient.Do(reqv, respv)
+}
+if cfg != nil {
+  err = retry.NewExponentialBackoff(*cfg).Retry(call)
+} else {
+  err = call()
+}

Low impact, but improves readability.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 9c12637 and 4d32c17.

📒 Files selected for processing (6)
  • client/client.go (2 hunks)
  • client/client_test.go (1 hunks)
  • client/core.go (2 hunks)
  • client/request.go (3 hunks)
  • client/response.go (1 hunks)
  • client/response_test.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (5)
  • client/response.go
  • client/client.go
  • client/request.go
  • client/response_test.go
  • client/client_test.go
🧰 Additional context used
🧬 Code graph analysis (1)
client/core.go (1)
client/client.go (1)
  • Client (32-62)
🔇 Additional comments (3)
client/core.go (3)

84-118: Kudos: avoids flipping shared fasthttp.Client.StreamResponseBody

Not mutating the shared client’s flag sidesteps the prior data race concerns. Good move. If you adopt the dual-client approach suggested above, you’ll keep this benefit while restoring pooling.


149-151: No action needed: CopyTo + ReleaseResponse ordering is safe

Verified that fasthttp.Response.CopyTo performs a synchronous copy of the body into resp.RawResponse (rather than merely reassigning a stream pointer), and once CopyTo returns, you can immediately call ReleaseResponse(respv) without affecting the destination’s BodyStream. The existing code at client/core.go (lines 149–151) is correct and requires no changes.


88-118: Verify comprehensive cloning of fasthttp.Client fields
I wasn’t able to locate client/core.go in the repository, but manually copying each exported field from fasthttp.Client is brittle and prone to omissions (e.g., new fields like MaxConnWaitTimeout, RetryIf, etc., won’t be copied and can silently change behavior). Please verify that you:

  • Mirror all exported fields when creating a temporary client for per-request overrides.
  • Centralize this logic in a helper (e.g., reflection-based) to automatically pick up new fields.
  • Add a unit test that fails if any exported field in fasthttp.Client isn’t present in the clone.
  • Or consider toggling StreamResponseBody on the shared client under a lock instead of cloning.

Let me know if you’d like a PR-ready helper and reflection-based test snippet for this.

client/core.go Outdated
Comment on lines 84 to 118
// Determine which client to use - create a new one if StreamResponseBody differs
var fastHTTPClient *fasthttp.Client
requestStreamResponseBody := c.req.StreamResponseBody()

if requestStreamResponseBody != c.client.streamResponseBody {
// Request setting differs from client setting, create a temporary client
c.client.mu.RLock()
original := c.client.fasthttp
fastHTTPClient = &fasthttp.Client{
Dial: original.Dial,
DialDualStack: original.DialDualStack,
TLSConfig: original.TLSConfig,
MaxConnsPerHost: original.MaxConnsPerHost,
MaxIdleConnDuration: original.MaxIdleConnDuration,
MaxConnDuration: original.MaxConnDuration,
ReadTimeout: original.ReadTimeout,
WriteTimeout: original.WriteTimeout,
ReadBufferSize: original.ReadBufferSize,
WriteBufferSize: original.WriteBufferSize,
MaxResponseBodySize: original.MaxResponseBodySize,
NoDefaultUserAgentHeader: original.NoDefaultUserAgentHeader,
DisableHeaderNamesNormalizing: original.DisableHeaderNamesNormalizing,
DisablePathNormalizing: original.DisablePathNormalizing,
MaxIdemponentCallAttempts: original.MaxIdemponentCallAttempts,
Name: original.Name,
ConfigureClient: original.ConfigureClient,

// Request-specific override
StreamResponseBody: requestStreamResponseBody,
}
c.client.mu.RUnlock()
} else {
// Use the client's fasthttp client directly
fastHTTPClient = c.client.fasthttp
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Per-request allocating a new fasthttp.Client loses connection pooling and adds substantial latency/TLS overhead

Constructing a fresh *fasthttp.Client whenever the per-request streaming flag differs discards the shared connection pool and TLS session cache, causing extra TCP/TLS handshakes and reduced throughput under mixed workloads. This will regress performance for SSE/large downloads that toggle streaming on/off across requests.

Prefer one of these approaches:

  • Maintain two long-lived internal clients (streaming vs non-streaming) on Client and route each request accordingly (preferred).
  • Or, serialize only the underlying Do/DoRedirects around a temporary flip of the shared client's StreamResponseBody via a dedicated mutex (simpler, but reduces parallelism across mixed-mode requests).

Proposed change within this hunk (assuming we add a helper on Client as shown below) to avoid per-request allocation:

-	// Determine which client to use - create a new one if StreamResponseBody differs
-	var fastHTTPClient *fasthttp.Client
-	requestStreamResponseBody := c.req.StreamResponseBody()
-
-	if requestStreamResponseBody != c.client.streamResponseBody {
-		// Request setting differs from client setting, create a temporary client
-		c.client.mu.RLock()
-		original := c.client.fasthttp
-		fastHTTPClient = &fasthttp.Client{
-			Dial:                          original.Dial,
-			DialDualStack:                 original.DialDualStack,
-			TLSConfig:                     original.TLSConfig,
-			MaxConnsPerHost:               original.MaxConnsPerHost,
-			MaxIdleConnDuration:           original.MaxIdleConnDuration,
-			MaxConnDuration:               original.MaxConnDuration,
-			ReadTimeout:                   original.ReadTimeout,
-			WriteTimeout:                  original.WriteTimeout,
-			ReadBufferSize:                original.ReadBufferSize,
-			WriteBufferSize:               original.WriteBufferSize,
-			MaxResponseBodySize:           original.MaxResponseBodySize,
-			NoDefaultUserAgentHeader:      original.NoDefaultUserAgentHeader,
-			DisableHeaderNamesNormalizing: original.DisableHeaderNamesNormalizing,
-			DisablePathNormalizing:        original.DisablePathNormalizing,
-			MaxIdemponentCallAttempts:     original.MaxIdemponentCallAttempts,
-			Name:                          original.Name,
-			ConfigureClient:               original.ConfigureClient,
-
-			// Request-specific override
-			StreamResponseBody: requestStreamResponseBody,
-		}
-		c.client.mu.RUnlock()
-	} else {
-		// Use the client's fasthttp client directly
-		fastHTTPClient = c.client.fasthttp
-	}
+	// Determine which client to use without sacrificing connection pooling
+	requestStreamResponseBody := c.req.StreamResponseBody()
+	fastHTTPClient := c.client.pickFastHTTPClient(requestStreamResponseBody)

Add the following (outside this file/hunk, shown here for context) to client/client.go:

// In Client struct:
streamingFHC *fasthttp.Client // lazily initialized clone with StreamResponseBody=true

// pickFastHTTPClient returns a pooled client configured for the requested streaming mode.
func (c *Client) pickFastHTTPClient(stream bool) *fasthttp.Client {
	c.mu.RLock()
	defer c.mu.RUnlock()

	if stream == c.streamResponseBody {
		return c.fasthttp
	}
	// Lazily build the streaming variant once and reuse it.
	if stream && c.streamingFHC != nil {
		return c.streamingFHC
	}
	return c.initStreamingClientLocked(stream)
}

// initStreamingClientLocked clones exported config into a new fasthttp.Client.
// c.mu must be held by the caller.
func (c *Client) initStreamingClientLocked(stream bool) *fasthttp.Client {
	o := c.fasthttp
	clone := &fasthttp.Client{
		Dial:                          o.Dial,
		DialDualStack:                 o.DialDualStack,
		TLSConfig:                     o.TLSConfig,
		MaxConnsPerHost:               o.MaxConnsPerHost,
		MaxIdleConnDuration:           o.MaxIdleConnDuration,
		MaxConnDuration:               o.MaxConnDuration,
		ReadTimeout:                   o.ReadTimeout,
		WriteTimeout:                  o.WriteTimeout,
		ReadBufferSize:                o.ReadBufferSize,
		WriteBufferSize:               o.WriteBufferSize,
		MaxResponseBodySize:           o.MaxResponseBodySize,
		NoDefaultUserAgentHeader:      o.NoDefaultUserAgentHeader,
		DisableHeaderNamesNormalizing: o.DisableHeaderNamesNormalizing,
		DisablePathNormalizing:        o.DisablePathNormalizing,
		MaxIdemponentCallAttempts:     o.MaxIdemponentCallAttempts,
		Name:                          o.Name,
		ConfigureClient:               o.ConfigureClient,
		StreamResponseBody:            stream,
		// TODO: include any additional exported fields (see next comment)
	}
	if stream {
		c.streamingFHC = clone
	}
	return clone
}

This keeps pooling intact and avoids repeated allocation while maintaining thread safety. If you prefer the “flip with lock” model, I can provide a minimal diff as well.

🤖 Prompt for AI Agents
client/core.go lines 84-118: currently you allocate a new *fasthttp.Client
per-request when StreamResponseBody differs, which breaks connection/TLS pooling
and adds latency; instead, add a long-lived streaming variant on Client (e.g.
streamingFHC *fasthttp.Client), implement a pickFastHTTPClient(stream bool) that
returns the existing pooled client or lazily initializes and caches a clone
configured with StreamResponseBody=true under the client's mutex, and replace
this per-request allocation with a call to
pickFastHTTPClient(c.req.StreamResponseBody()) so requests reuse pooled
connections and preserve thread-safety.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (3)
client/response.go (2)

92-101: Tighten BodyStream docs: lifetime, connection reuse, and Body() emptiness when streaming

The code is fine. Please clarify the contract and call order to prevent misuse with pooled responses and fasthttp’s connection reuse.

Apply this doc tweak:

-// BodyStream returns the response body as a stream reader.
-// Note: When using BodyStream(), the response body is not copied to memory,
-// so calling Body() afterwards may return an empty slice.
+// BodyStream returns an io.Reader for the response body.
+// If streaming was enabled (via Request.SetStreamResponseBody, Client.SetStreamResponseBody,
+// or the underlying fasthttp.Client.StreamResponseBody), the reader yields bytes directly from
+// the connection without buffering in memory.
+//
+// Important:
+// - Consume the reader before calling Response.Close().
+// - Read the stream to EOF to allow connection reuse by the underlying client.
+// - When streaming is enabled, Body() (and helpers like String/JSON/XML/CBOR/Save) may be empty;
+//   prefer io.Copy with BodyStream() for large responses.
+// If streaming is not enabled, BodyStream returns a bytes.Reader over Body().
 func (r *Response) BodyStream() io.Reader {

150-165: Save() ignores streaming and closes caller’s writer — stream from BodyStream() and don’t close external writers

  • Using Body() makes Save() write an empty file/output when streaming is enabled.
  • Closing an io.Writer that the function didn’t open is surprising and can break callers.

Apply this minimal change:

-    if _, err = io.Copy(outFile, bytes.NewReader(r.Body())); err != nil {
+    if _, err = io.Copy(outFile, r.BodyStream()); err != nil {
       return fmt.Errorf("failed to write response body to file: %w", err)
     }
@@
-  case io.Writer:
-    if _, err := io.Copy(p, bytes.NewReader(r.Body())); err != nil {
+  case io.Writer:
+    if _, err := io.Copy(p, r.BodyStream()); err != nil {
       return fmt.Errorf("failed to write response body to io.Writer: %w", err)
     }
-    defer func() {
-      if pc, ok := p.(io.WriteCloser); ok {
-        _ = pc.Close() //nolint:errcheck // not needed
-      }
-    }()
     return nil

Follow-up (optional): consider adding a short note in Save()’s doc that it respects streaming and doesn’t close external writers.

client/response_test.go (1)

606-629: Avoid retaining Body()’s internal buffer across Close; also assert Body() is empty when streaming

To prevent future flakiness with pooled buffers and to codify the streaming contract, copy the non-streamed body and add an assertion that Body() is empty when streaming is enabled.

Apply:

-    defer resp1.Close()
-    normalBody := resp1.Body()
+    defer resp1.Close()
+    normalBody := append([]byte(nil), resp1.Body()...) // copy before pool reuse
@@
-    streamedBody, err := io.ReadAll(resp2.BodyStream())
+    streamedBody, err := io.ReadAll(resp2.BodyStream())
     require.NoError(t, err)
     require.Equal(t, normalBody, streamedBody)
+    // streaming mode shouldn't buffer Body()
+    require.Len(t, resp2.Body(), 0)
🧹 Nitpick comments (1)
client/response_test.go (1)

542-629: Add a streaming Save() regression test (uses BodyStream and does not close external writer)

Given the Save() fix, add coverage to ensure we don’t regress: the writer must not be closed by Save(), and content must be streamed correctly when streaming is enabled.

Insert this new subtest at the end of Test_Response_BodyStream (before the final closing brace):

+   t.Run("Save streams and does not close external writer", func(t *testing.T) {
+     t.Parallel()
+
+     server := startTestServer(t, func(app *fiber.App) {
+       app.Get("/stream", func(c fiber.Ctx) error {
+         return c.SendString("streaming data")
+       })
+     })
+     defer server.stop()
+
+     client := New().SetDial(server.dial()).SetStreamResponseBody(true)
+     resp, err := client.Get("http://example.com/stream")
+     require.NoError(t, err)
+     defer resp.Close()
+
+     type trackingBuf struct {
+       bytes.Buffer
+       closed bool
+     }
+     // implements io.WriteCloser to verify Save() does not close it
+     var w = &trackingBuf{}
+     // wrap with NopCloser-like shim
+     writer := struct {
+       io.Writer
+       io.Closer
+     }{
+       Writer: w,
+       Closer: io.CloserFunc(func() error {
+         w.closed = true
+         return nil
+       }),
+     }
+
+     // Save to writer; it must not close it
+     err = resp.Save(writer)
+     require.NoError(t, err)
+     require.Equal(t, "streaming data", w.String())
+     require.False(t, w.closed, "Save must not close external writers")
+   })

If you prefer, we can place a similar subtest under Test_Response_Save.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 4d32c17 and 28342fa.

📒 Files selected for processing (6)
  • client/client.go (2 hunks)
  • client/client_test.go (1 hunks)
  • client/core.go (2 hunks)
  • client/request.go (3 hunks)
  • client/response.go (1 hunks)
  • client/response_test.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
  • client/client.go
  • client/request.go
  • client/core.go
  • client/client_test.go
🧰 Additional context used
🧬 Code graph analysis (1)
client/response_test.go (1)
client/client.go (2)
  • Get (706-708)
  • New (647-652)

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (3)
client/response.go (2)

92-101: Tighten BodyStream contract; clarify lifetime and fallback behavior

Doc is still a bit vague about ordering/lifetime and the fallback to an in-memory reader. Propose expanding the comment to state the contract explicitly and reduce surprises.

-// BodyStream returns the response body as a stream reader.
-// Note: When using BodyStream(), the response body is not copied to memory,
-// so calling Body() afterwards may return an empty slice.
+// BodyStream returns an io.Reader for the response body.
+//
+// If streaming was enabled (via Request.SetStreamResponseBody, Client.SetStreamResponseBody,
+// or the underlying fasthttp.Client.StreamResponseBody), the reader yields data directly from
+// the connection without buffering. Otherwise, it returns a bytes.Reader over Body().
+//
+// Important:
+// - Consume the reader before calling Response.Close().
+// - When streaming is enabled, Body() (and helpers like String/JSON/XML/CBOR/Save) may observe
+//   an empty body. Prefer io.Copy with BodyStream() for large responses.
 func (r *Response) BodyStream() io.Reader {
   if stream := r.RawResponse.BodyStream(); stream != nil {
     return stream
   }
   // If streaming is not enabled, return a bytes.Reader from the regular body
   return bytes.NewReader(r.RawResponse.Body())
 }

161-172: Make Save() always stream via BodyStream(); also stream when saving to a file

The writer branch partially streams, but the file path branch still buffers via Body(). Since BodyStream() already falls back to a bytes.Reader when streaming isn’t enabled, you can unconditionally copy from BodyStream() in both branches. This avoids large in-memory buffers and simplifies the code.

@@
-    if _, err = io.Copy(outFile, bytes.NewReader(r.Body())); err != nil {
+    if _, err = io.Copy(outFile, r.BodyStream()); err != nil {
       return fmt.Errorf("failed to write response body to file: %w", err)
     }
@@
-  case io.Writer:
-    var err error
-    if r.IsStreaming() {
-      _, err = io.Copy(p, r.BodyStream())
-    } else {
-      _, err = io.Copy(p, bytes.NewReader(r.Body()))
-    }
-
-    if err != nil {
-      return fmt.Errorf("failed to write response body to writer: %w", err)
-    }
-
-    return nil
+  case io.Writer:
+    if _, err := io.Copy(p, r.BodyStream()); err != nil {
+      return fmt.Errorf("failed to write response body to writer: %w", err)
+    }
+    return nil

Follow-up: consider adding/adjusting tests to cover Save(filePath) with streaming enabled (e.g., large response) to ensure we don’t regress. I can draft that if helpful.

client/core.go (1)

84-120: Per-request allocating a new fasthttp.Client loses connection pooling and adds substantial latency/TLS overhead

Constructing a fresh *fasthttp.Client whenever the per-request streaming flag differs discards the shared connection pool and TLS session cache, causing extra TCP/TLS handshakes and reduced throughput under mixed workloads. This will regress performance for SSE/large downloads that toggle streaming on/off across requests.

Consider maintaining two long-lived internal clients (streaming vs non-streaming) on Client and routing each request accordingly. Here's a suggested implementation:

Add to client/client.go:

+// In Client struct:
+streamingFHC *fasthttp.Client // lazily initialized clone with StreamResponseBody=true
+
+// pickFastHTTPClient returns a pooled client configured for the requested streaming mode.
+func (c *Client) pickFastHTTPClient(stream bool) *fasthttp.Client {
+	c.mu.RLock()
+	defer c.mu.RUnlock()
+
+	if stream == c.streamResponseBody {
+		return c.fasthttp
+	}
+	// Lazily build the streaming variant once and reuse it.
+	if stream && c.streamingFHC != nil {
+		return c.streamingFHC
+	}
+	return c.initStreamingClientLocked(stream)
+}
+
+// initStreamingClientLocked clones exported config into a new fasthttp.Client.
+// c.mu must be held by the caller.
+func (c *Client) initStreamingClientLocked(stream bool) *fasthttp.Client {
+	o := c.fasthttp
+	clone := &fasthttp.Client{
+		Dial:                          o.Dial,
+		DialDualStack:                 o.DialDualStack,
+		TLSConfig:                     o.TLSConfig,
+		MaxConnsPerHost:               o.MaxConnsPerHost,
+		MaxIdleConnDuration:           o.MaxIdleConnDuration,
+		MaxConnDuration:               o.MaxConnDuration,
+		ReadTimeout:                   o.ReadTimeout,
+		WriteTimeout:                  o.WriteTimeout,
+		ReadBufferSize:                o.ReadBufferSize,
+		WriteBufferSize:               o.WriteBufferSize,
+		MaxResponseBodySize:           o.MaxResponseBodySize,
+		NoDefaultUserAgentHeader:      o.NoDefaultUserAgentHeader,
+		DisableHeaderNamesNormalizing: o.DisableHeaderNamesNormalizing,
+		DisablePathNormalizing:        o.DisablePathNormalizing,
+		MaxIdemponentCallAttempts:     o.MaxIdemponentCallAttempts,
+		Name:                          o.Name,
+		ConfigureClient:               o.ConfigureClient,
+		StreamResponseBody:            stream,
+	}
+	if stream {
+		c.streamingFHC = clone
+	}
+	return clone
+}

Then simplify this code block:

-	// Determine which client to use - create a new one if StreamResponseBody differs
-	var fastHTTPClient *fasthttp.Client
-	requestStreamResponseBody := c.req.StreamResponseBody()
-	c.client.mu.RLock()
-	clientStream := c.client.streamResponseBody
-	original := c.client.fasthttp
-
-	if requestStreamResponseBody != clientStream {
-		// Request setting differs from client setting, create a temporary client
-
-		fastHTTPClient = &fasthttp.Client{
-			Dial:                          original.Dial,
-			DialDualStack:                 original.DialDualStack,
-			TLSConfig:                     original.TLSConfig,
-			MaxConnsPerHost:               original.MaxConnsPerHost,
-			MaxIdleConnDuration:           original.MaxIdleConnDuration,
-			MaxConnDuration:               original.MaxConnDuration,
-			ReadTimeout:                   original.ReadTimeout,
-			WriteTimeout:                  original.WriteTimeout,
-			ReadBufferSize:                original.ReadBufferSize,
-			WriteBufferSize:               original.WriteBufferSize,
-			MaxResponseBodySize:           original.MaxResponseBodySize,
-			NoDefaultUserAgentHeader:      original.NoDefaultUserAgentHeader,
-			DisableHeaderNamesNormalizing: original.DisableHeaderNamesNormalizing,
-			DisablePathNormalizing:        original.DisablePathNormalizing,
-			MaxIdemponentCallAttempts:     original.MaxIdemponentCallAttempts,
-			Name:                          original.Name,
-			ConfigureClient:               original.ConfigureClient,
-
-			// Request-specific override
-			StreamResponseBody: requestStreamResponseBody,
-		}
-	} else {
-		fastHTTPClient = original
-	}
-	c.client.mu.RUnlock()
+	// Determine which client to use without sacrificing connection pooling
+	requestStreamResponseBody := c.req.StreamResponseBody()
+	fastHTTPClient := c.client.pickFastHTTPClient(requestStreamResponseBody)

This keeps pooling intact and avoids repeated allocation while maintaining thread safety.

🧹 Nitpick comments (1)
client/response.go (1)

103-106: IsStreaming semantics are OK; consider documenting that it reflects fasthttp.BodyStream presence

Minor: IsStreaming simply mirrors RawResponse.BodyStream() != nil. A short note in the comment that this reports whether the underlying fasthttp response is streaming (not whether data remains unread) would avoid confusion.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 28342fa and ddbb831.

📒 Files selected for processing (6)
  • client/client.go (2 hunks)
  • client/client_test.go (1 hunks)
  • client/core.go (2 hunks)
  • client/request.go (3 hunks)
  • client/response.go (2 hunks)
  • client/response_test.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
  • client/response_test.go
  • client/client_test.go
  • client/client.go
  • client/request.go
🧰 Additional context used
🧬 Code graph analysis (1)
client/core.go (1)
client/client.go (1)
  • Client (32-62)
🔇 Additional comments (1)
client/core.go (1)

133-142: LGTM!

The routing to use fastHTTPClient instead of the hardcoded client is correct and properly handles both retry and non-retry paths with redirects.

@gaby
Copy link
Member

gaby commented Aug 25, 2025

@Abhirup-99 To fix the alignment you can run make betteralign it will fix it for you

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
client/client_test.go (1)

1781-1788: Prevent flaky read assertion: allow io.EOF and avoid require.Positive

io.Reader may return (n > 0, err == io.EOF). Requiring NoError can flake under fast streams. Also prefer require.Greater over require.Positive for broader testify compatibility.

Apply this diff:

-	buffer := make([]byte, 1024)
-	n, err := bodyStream.Read(buffer)
-	require.NoError(t, err)
-	require.Positive(t, n)
+	buffer := make([]byte, 1024)
+	n, err := bodyStream.Read(buffer)
+	require.Greater(t, n, 0)
+	if err != nil {
+		require.ErrorIs(t, err, io.EOF)
+	}
🧹 Nitpick comments (3)
client/client_test.go (3)

1836-1841: Also read from BodyStream when streaming is disabled (fallback path)

This ensures BodyStream returns an in-memory reader consistent with Body() when streaming is off.

Apply this diff:

 	body := resp.Body()
 	require.Equal(t, "Hello, World!", string(body))
 
 	bodyStream := resp.BodyStream()
 	require.NotNil(t, bodyStream)
+	bs, err := io.ReadAll(bodyStream)
+	require.NoError(t, err)
+	require.Equal(t, "Hello, World!", string(bs))
+	// Optionally, assert the runtime flag too:
+	// require.False(t, resp.IsStreaming())

1774-1780: Assert streaming state in SSE client test

We’ve confirmed that Response.IsStreaming() is implemented (see client/response.go:104), so it’s safe to assert the streaming state in the test. This is an optional refactor to make the test more self-verifying.

• File: client/client_test.go (around lines 1774–1780)
Add the following assertion after require.NotNil(t, bodyStream):

 	resp, err := client.Get("http://" + addr + "/sse")
 	require.NoError(t, err)
 	defer resp.Close()

 	bodyStream := resp.BodyStream()
 	require.NotNil(t, bodyStream)
+	require.True(t, resp.IsStreaming())

This ensures the test not only checks for a non-nil stream but also that the response is indeed flagged as streaming.


1798-1819: Strengthen streaming assertions in client tests

We can make the test more robust by explicitly asserting the IsStreaming() flag on each response. This verifies that SetStreamResponseBody(true) indeed enables streaming and that the default client does not stream.

Locations to update:

  • client/client_test.go (around lines 1800–1810)

Proposed diff:

  	resp, err := client.Get("http://" + addr + "/large")
  	require.NoError(t, err)
  	defer resp.Close()
+	require.True(t, resp.IsStreaming(), "streaming client should report IsStreaming()==true")
  	bodyStream := resp.BodyStream()
  	require.NotNil(t, bodyStream)
  	streamedData, err := io.ReadAll(bodyStream)
  	require.NoError(t, err)
  	require.Equal(t, largeData, streamedData)
  	
  	client2 := New()
  	resp2, err := client2.Get("http://" + addr + "/large")
  	require.NoError(t, err)
  	defer resp2.Close()
+	require.False(t, resp2.IsStreaming(), "default client should report IsStreaming()==false")
  	body := resp2.Body()
  	require.Equal(t, largeData, body)

This optional refactor guards against future regressions in the streaming logic.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between ddbb831 and f821027.

📒 Files selected for processing (6)
  • client/client.go (2 hunks)
  • client/client_test.go (1 hunks)
  • client/core.go (2 hunks)
  • client/request.go (3 hunks)
  • client/response.go (2 hunks)
  • client/response_test.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (5)
  • client/response_test.go
  • client/client.go
  • client/core.go
  • client/response.go
  • client/request.go
🧰 Additional context used
🧬 Code graph analysis (1)
client/client_test.go (1)
client/client.go (2)
  • New (647-652)
  • Get (706-708)
🔇 Additional comments (3)
client/client_test.go (3)

1737-1745: LGTM: basic toggle behavior is covered

Good sanity test for the client-level StreamResponseBody toggle.


1846-1852: LGTM: chainable methods resolve to final state

Final false is correctly asserted after toggling and mixing with SetTimeout.


1864-1883: Remove or update the Reset verification suggestion

The Request.Reset() method does not exist in the codebase, so the part of the review that asks to verify its behavior is no longer applicable. You can either implement a Reset method to clear per-request overrides or remove the related test snippet.

• Location to adjust:

  • client/client_test.go (around lines 1875–1883)
  • client/request.go (no existing Reset method)

Revised review comment:

Cover per-request override end-to-end behavior

  • Assert IsStreaming() true/false on responses to prove the override took effect and does not leak.
  • (Optional) If you intend to support clearing per-request flags, consider adding a Request.Reset() method that clears overrides.

Likely an incorrect or invalid review comment.

@JIeJaitt
Copy link
Contributor

@gaby @Abhirup-99 hi, I have some questions.

I don't think the current tests cover streaming response scenarios. For example, Test_Client_StreamResponseBody_ServerSentEvents only reads from bodyStream once and asserts it contains the first message — it doesn't verify that subsequent messages arrive incrementally, nor can it detect mid-stream disconnects or cases where the server writes and returns data progressively. In Test_Request_StreamResponseBody the server immediately returns the full string (c.SendString("Hello, World!")) with no chunked or delayed writes, so it can't validate chunked/streaming delivery or on-demand reads — the client, even in streaming mode, would read the whole response at once.

Copy link
Member

@gaby gaby left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reset() method is missing resetting the value for StreamResponseBody


server := startTestServer(t, func(app *fiber.App) {
app.Get("/stream", func(c fiber.Ctx) error {
return c.SendString("streaming data")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not streaming the server response. Ths fiber server needs to run with the streaming option enabled

@gaby
Copy link
Member

gaby commented Aug 26, 2025

@JIeJaitt Correct, the tests are not testing streaming, the server needs to stream the resp

@gaby
Copy link
Member

gaby commented Aug 27, 2025

@codex review

Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Codex Review: Here are some suggestions.

Reply with @codex fix comments to fix any unresolved comments.

About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you open a pull request for review, mark a draft as ready, or comment "@codex review". If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex fix this CI failure" or "@codex address that feedback".

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
client/client_test.go (1)

1781-1785: Allow (n > 0, err == io.EOF) on Reader.Read to avoid flakes

Same as prior feedback; accept EOF when bytes were read.

- n, err := bodyStream.Read(buffer)
- require.NoError(t, err)
- require.Positive(t, n)
+ n, err := bodyStream.Read(buffer)
+ require.Greater(t, n, 0)
+ if err != nil {
+   require.ErrorIs(t, err, io.EOF)
+ }
🧹 Nitpick comments (3)
client/request.go (1)

47-65: Field reorder: run betteralign and confirm no reflection/ABI assumptions

Reordering private fields is fine, but please run make betteralign to lock optimal layout, and confirm no reflection-based code depends on order. No functional diffs spotted here.

client/client_test.go (2)

1750-1769: SSE test isn’t truly streaming; write and flush incrementally

Emit chunks with delays and flushes so BodyStream reads incrementally.

 app.Get("/sse", func(c fiber.Ctx) error {
-  c.Set("Content-Type", "text/event-stream")
-  c.Set("Cache-Control", "no-cache")
-  c.Set("Connection", "keep-alive")
-
-  messages := []string{
-    "data: message 1\n\n",
-    "data: message 2\n\n",
-    "data: message 3\n\n",
-  }
-
-  for _, msg := range messages {
-    if _, err := c.WriteString(msg); err != nil {
-      return err
-    }
-  }
-  return nil
+  c.Set("Content-Type", "text/event-stream")
+  c.Set("Cache-Control", "no-cache")
+  c.Set("Connection", "keep-alive")
+  c.Context().SetBodyStreamWriter(func(w *bufio.Writer) {
+    for i := 1; i <= 3; i++ {
+      _, _ = w.WriteString(fmt.Sprintf("data: message %d\n\n", i))
+      _ = w.Flush()
+      time.Sleep(50 * time.Millisecond)
+    }
+  })
+  return nil
 })

Add imports (outside this hunk):

  • "bufio"
  • "fmt"
  • "time"

1836-1841: Also assert non-streaming state

Confirm API reports non-streaming when disabled.

 body := resp.Body()
 require.Equal(t, "Hello, World!", string(body))

 bodyStream := resp.BodyStream()
 require.NotNil(t, bodyStream)
+require.False(t, resp.IsStreaming())
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between f821027 and 5cbdb70.

📒 Files selected for processing (5)
  • client/client.go (1 hunks)
  • client/client_test.go (1 hunks)
  • client/request.go (1 hunks)
  • client/response.go (2 hunks)
  • client/response_test.go (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • client/client.go
  • client/response.go
  • client/response_test.go
🧰 Additional context used
📓 Path-based instructions (1)
**/*.go

📄 CodeRabbit inference engine (AGENTS.md)

**/*.go: Format Go code using gofumpt (enforced via make format)
Ensure code passes golangci-lint checks (enforced via make lint)
Optimize struct field alignment using betteralign (enforced via make betteralign)
Modernize Go code using gopls modernize (enforced via make modernize)

Files:

  • client/client_test.go
  • client/request.go
🧬 Code graph analysis (2)
client/client_test.go (1)
client/client.go (2)
  • New (646-651)
  • Get (705-707)
client/request.go (1)
client/client.go (1)
  • Client (32-61)
🔇 Additional comments (3)
client/client_test.go (3)

1737-1745: Toggle test LGTM

Covers default state and idempotent toggling.


1793-1819: Large response stream test LGTM

Validates streamed vs non-streamed paths; ok for 1 MiB payload.


1846-1852: Chainable methods assertion LGTM

Final state assertion is clear.

@Abhirup-99
Copy link
Author

@gaby have updated the code.

Copy link
Member

@ReneWerner87 ReneWerner87 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LG
but documentation is missing

@ReneWerner87
Copy link
Member

@Abhirup-99

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
client/response_test.go (1)

650-672: Fix flakiness: don’t retain Body() buffer after closing the Response.

Copy the bytes before resp1.Close(); otherwise the pool may reuse the memory, making equality flaky. Also assert that streaming keeps Body() empty.

-	defer resp1.Close()
-	normalBody := resp1.Body()
+	normalBody := append([]byte(nil), resp1.Body()...)
+	defer resp1.Close()
@@
 	streamedBody, err := io.ReadAll(resp2.BodyStream())
 	require.NoError(t, err)
 	require.Equal(t, normalBody, streamedBody)
+	require.Len(t, resp2.Body(), 0)
🧹 Nitpick comments (9)
client/response_test.go (9)

6-13: Imports look correct and used.

No issues. If you adopt the SSE incremental-reading refactor below, add bufio to imports.

Apply if needed:

 import (
+	"bufio"
 	"bytes"
 	"crypto/tls"
 	"encoding/json"
 	"encoding/xml"
 	"io"
 	"net"
 	"os"
 	"strings"
 	"testing"
 	"time"
 )

544-571: Good test: Save() must not close provided writer. Add streaming invariants.

Also assert IsStreaming() and that Body() is empty to codify behavior when streaming is enabled.

 	resp, err := client.Get("http://example.com/stream")
 	require.NoError(t, err)
 	defer resp.Close()

+	require.True(t, resp.IsStreaming())
+	require.Len(t, resp.Body(), 0)

589-609: Assert streaming invariants.

Strengthen the test by asserting IsStreaming() and that Body() is empty.

 	bodyStream := resp.BodyStream()
 	require.NotNil(t, bodyStream)
 	data, err := io.ReadAll(bodyStream)
 	require.NoError(t, err)
 	require.Equal(t, "streaming data", string(data))
+	require.True(t, resp.IsStreaming())
+	require.Len(t, resp.Body(), 0)

611-648: Large streaming test looks good. Add Body() emptiness assertion.

Validates chunked reads; add one more invariant for clarity.

 	require.Len(t, totalRead, 1024)
 	for i := 0; i < 1024; i++ {
 		expected := byte('A' + i%26)
 		require.Equal(t, expected, totalRead[i])
 	}
+	require.True(t, resp.IsStreaming())
+	require.Len(t, resp.Body(), 0)

674-723: Don’t set Transfer-Encoding header manually; prefer true streaming writes.

Manually forcing “Transfer-Encoding: chunked” is unnecessary and can be brittle. Use progressive writes (with ImmediateHeaderFlush) or SetBodyStreamWriter for clearer intent.

-				c.Set("Transfer-Encoding", "chunked")
 				chunks := []string{"chunk1", "chunk2", "chunk3"}

Optional (clearer streaming semantics via writer):

-			app.Get("/chunked", func(c fiber.Ctx) error {
-				c.Set("Content-Type", "text/plain")
-				chunks := []string{"chunk1", "chunk2", "chunk3"}
-				for i, chunk := range chunks {
-					if _, err := c.WriteString(chunk); err != nil {
-						return err
-					}
-					c.Response().ImmediateHeaderFlush = true
-					if i < len(chunks)-1 {
-						time.Sleep(10 * time.Millisecond)
-					}
-				}
-				return nil
-			})
+			app.Get("/chunked", func(c fiber.Ctx) error {
+				c.Set("Content-Type", "text/plain")
+				chunks := []string{"chunk1", "chunk2", "chunk3"}
+				for i, chunk := range chunks {
+					if _, err := c.WriteString(chunk); err != nil {
+						return err
+					}
+					c.Response().ImmediateHeaderFlush = true
+					if i < len(chunks)-1 {
+						time.Sleep(10 * time.Millisecond)
+					}
+				}
+				return nil
+			})

725-774: Test name says “incremental reads” but uses io.ReadAll. Either rename or actually read incrementally.

To validate streaming, read event-by-event and assert multiple reads occur with small time gaps.

-	t.Run("server sent events with incremental reads", func(t *testing.T) {
+	t.Run("server sent events", func(t *testing.T) {
@@
-	data, err := io.ReadAll(bodyStream)
-	require.NoError(t, err)
-
-	content := string(data)
-	require.Contains(t, content, "event 1")
-	require.Contains(t, content, "event 2")
-	require.Contains(t, content, "event 3")
-	require.Contains(t, content, "event 4")
-	require.Contains(t, content, "data: event")
-	require.Contains(t, content, "\n\n")
+	reader := bufio.NewReader(bodyStream)
+	var got []string
+	start := time.Now()
+	for len(got) < 4 {
+		line, err := reader.ReadString('\n')
+		require.NoError(t, err)
+		if strings.HasPrefix(line, "data: ") {
+			got = append(got, strings.TrimSpace(strings.TrimPrefix(line, "data: ")))
+		}
+	}
+	require.GreaterOrEqual(t, len(got), 4)
+	// Rough check that we didn't receive all data in a single read.
+	require.Greater(t, time.Since(start), 5*time.Millisecond)

824-863: Rename to reflect behavior; current test simulates client-side close, not server interruption.

Clarify the intent to avoid confusion.

-	t.Run("connection interruption handling", func(t *testing.T) {
+	t.Run("client-side close stops subsequent reads", func(t *testing.T) {

1018-1040: Assert IsStreaming() for completeness.

This subtest should also prove the state flag.

 	data, err := io.ReadAll(bodyStream)
 	require.NoError(t, err)
 	require.Len(t, data, 64*1024)
+	require.True(t, resp.IsStreaming())

1058-1087: Add invariants to contrast streaming vs non-streaming.

Explicitly assert Body() emptiness for streaming and presence for non-streaming to lock in contract.

 	resp1, err := client1.Get("http://example.com/test")
 	require.NoError(t, err)
 	defer resp1.Close()
+	require.True(t, resp1.IsStreaming())
 	bodyStream1 := resp1.BodyStream()
 	require.NotNil(t, bodyStream1)
 	data1, err := io.ReadAll(bodyStream1)
 	require.NoError(t, err)
 	require.Equal(t, "test content", string(data1))
+	require.Len(t, resp1.Body(), 0)
@@
 	require.NoError(t, err)
 	require.Equal(t, "test content", string(data2))
 	require.Equal(t, string(data1), string(data2))
+	require.Greater(t, len(resp2.Body()), 0)
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 5cbdb70 and 4bfeac4.

📒 Files selected for processing (5)
  • client/client.go (1 hunks)
  • client/client_test.go (1 hunks)
  • client/request.go (1 hunks)
  • client/response.go (2 hunks)
  • client/response_test.go (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
  • client/client_test.go
  • client/response.go
  • client/client.go
  • client/request.go
🧰 Additional context used
📓 Path-based instructions (1)
**/*.go

📄 CodeRabbit inference engine (AGENTS.md)

**/*.go: Format Go code using gofumpt (enforced via make format)
Ensure code passes golangci-lint checks (enforced via make lint)
Optimize struct field alignment using betteralign (enforced via make betteralign)
Modernize Go code using gopls modernize (enforced via make modernize)

Files:

  • client/response_test.go
🧬 Code graph analysis (1)
client/response_test.go (2)
client/client.go (1)
  • New (646-651)
client/response.go (1)
  • Response (19-25)
🔇 Additional comments (7)
client/response_test.go (7)

574-585: Helper is fine.

Closable writer stub is minimal and effective.


776-822: Progressive JSON streaming: solid coverage.

Confirms assembly and JSON validity; good.


865-927: Large streaming validation looks good.

Asserts multiple reads and data shape; nice.


930-953: Fallback to bytes.Reader is validated well.

Also checks Body() remains accessible; good.


954-974: Empty response fallback: OK.

Covers empty-body path succinctly.


975-1012: Large non-streaming fallback: OK.

Good chunked reading of bytes.Reader.


1042-1056: Non-streaming case: OK.

Validates IsStreaming() false path.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
client/response_test.go (1)

651-673: Fix flakiness: copy Body() before closing resp1; also assert Body() is empty under streaming

Holding a reference to resp1.Body() after closing can cause flaky comparisons due to buffer reuse. Also, add an assertion that Body() is empty when streaming is used.

   resp1, err := client1.Get("http://example.com/stream")
   require.NoError(t, err)
   defer resp1.Close()
-  normalBody := resp1.Body()
+  // Copy to avoid aliasing pooled buffer after resp1.Close()
+  normalBody := append([]byte(nil), resp1.Body()...)

   client2 := New().SetDial(server.dial()).SetStreamResponseBody(true)
   resp2, err := client2.Get("http://example.com/stream")
   require.NoError(t, err)
   defer resp2.Close()
+  // Documented behavior: Body() should be empty in streaming mode
+  require.Len(t, resp2.Body(), 0, "Body() should be empty when streaming is enabled")

   streamedBody, err := io.ReadAll(resp2.BodyStream())

Run the subtest repeatedly to ensure no flakes remain:

#!/bin/bash
# Repeat the flaky subtest to validate the fix
go test ./client -run 'Test_Response_BodyStream/compare with regular body' -count=100
client/client_test.go (1)

1781-1784: Prevent flaky read assertion: allow io.EOF when n > 0

If you keep the single-Read approach, io.Reader may legally return (n > 0, err == io.EOF). Don’t fail the test in that case.

-	n, err := bodyStream.Read(buffer)
-	require.NoError(t, err)
-	require.Positive(t, n)
+	n, err := bodyStream.Read(buffer)
+	require.Greater(t, n, 0)
+	if err != nil {
+		require.ErrorIs(t, err, io.EOF)
+	}
🧹 Nitpick comments (7)
client/response_test.go (5)

600-610: Codify invariants: assert IsStreaming() and empty Body() when streaming is enabled

Strengthen the “basic streaming” test by asserting the contract.

   resp, err := client.Get("http://example.com/stream")
   require.NoError(t, err)
   defer resp.Close()
+  require.True(t, resp.IsStreaming(), "Expected IsStreaming() to be true when client has streaming enabled")
+  require.Len(t, resp.Body(), 0, "Body() should be empty when streaming is enabled")

   bodyStream := resp.BodyStream()

679-694: Don’t set Transfer-Encoding explicitly

Let the server determine chunked transfer by omitting Content-Length and flushing; setting “Transfer-Encoding: chunked” manually is unnecessary and brittle.

   app.Get("/chunked", func(c fiber.Ctx) error {
     c.Set("Content-Type", "text/plain")
-    c.Set("Transfer-Encoding", "chunked")
     chunks := []string{"chunk1", "chunk2", "chunk3"}

726-776: Test name suggests incremental reads but code reads all at once

Either rename the test or assert incremental arrival. Minimal change: rename to reflect behavior.

- t.Run("server sent events with incremental reads", func(t *testing.T) {
+ t.Run("server sent events", func(t *testing.T) {

If you want to validate incrementality, switch from io.ReadAll to reading lines with a bufio.Reader and assert partial data is observed before the final write.


777-823: Optional: Stream-parse JSON progressively

To actually exercise streaming semantics, decode items incrementally with json.Decoder instead of io.ReadAll, asserting that the first item becomes available before the last is sent.


1033-1041: Also assert IsStreaming() in the IsStreaming test

Small assertion makes the intent explicit.

   resp, err := client.Get("http://example.com/large-stream")
   require.NoError(t, err)
   defer resp.Close()
+  require.True(t, resp.IsStreaming())

   bodyStream := resp.BodyStream()
client/client_test.go (2)

1790-1819: Optional: assert streaming flag to strengthen expectations

After reading via BodyStream, assert resp.IsStreaming() is true (and false for resp2) if that accessor is available.


1821-1841: Also validate BodyStream fallback equals Body() when disabled

Read BodyStream and compare with Body to verify the in-memory reader path.

 	body := resp.Body()
 	require.Equal(t, "Hello, World!", string(body))

 	bodyStream := resp.BodyStream()
 	require.NotNil(t, bodyStream)
+	streamed, err := io.ReadAll(bodyStream)
+	require.NoError(t, err)
+	require.Equal(t, body, streamed)
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 4bfeac4 and 909b1c8.

📒 Files selected for processing (5)
  • client/client.go (1 hunks)
  • client/client_test.go (1 hunks)
  • client/request.go (1 hunks)
  • client/response.go (2 hunks)
  • client/response_test.go (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • client/response.go
  • client/client.go
  • client/request.go
🧰 Additional context used
📓 Path-based instructions (1)
**/*.go

📄 CodeRabbit inference engine (AGENTS.md)

**/*.go: Format Go code using gofumpt (enforced via make format)
Ensure code passes golangci-lint checks (enforced via make lint)
Optimize struct field alignment using betteralign (enforced via make betteralign)
Modernize Go code using gopls modernize (enforced via make modernize)

Files:

  • client/response_test.go
  • client/client_test.go
🧬 Code graph analysis (2)
client/response_test.go (2)
client/client.go (1)
  • New (646-651)
client/response.go (1)
  • Response (19-25)
client/client_test.go (1)
client/client.go (2)
  • New (646-651)
  • Get (705-707)
🔇 Additional comments (7)
client/response_test.go (5)

545-573: LGTM: Save() must not close the io.Writer

Good coverage to ensure Save() doesn’t close writers even when streaming is enabled.


575-585: LGTM: purpose-built closable buffer helper

Simple and effective for asserting writer-close behavior.


825-864: LGTM: good coverage for post-close read behavior

Solid check that further reads fail after Response.Close().


866-929: LGTM: large streaming with chunk delays

Verifies multiple reads and content pattern; this meaningfully tests streaming.


931-954: LGTM: BodyStream() fallback when not streaming

Correctly exercises bytes.Reader fallback and keeps Body() intact.

client/client_test.go (2)

1737-1745: LGTM: client-level toggle sanity check covers on/off transitions

No issues spotted.


1843-1852: LGTM: chainable config produces expected final state

This covers chaining behavior clearly.

@JIeJaitt
Copy link
Contributor

JIeJaitt commented Sep 4, 2025

LGTM

@gaby
Copy link
Member

gaby commented Sep 4, 2025

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces valuable support for streaming response bodies, which is a great feature for handling large responses efficiently. The implementation of the client-level setting and the new Response methods (BodyStream, IsStreaming) is well done, and the accompanying tests are very comprehensive, covering a wide range of scenarios including SSE, large files, and connection issues.

However, there is a significant discrepancy between the pull request description and the provided code changes. The description mentions adding per-request control over streaming (streamResponseBody in the Request struct and logic in core.go), but this functionality appears to be missing from the implementation. This is a critical omission that leaves the feature incomplete as described. Please either update the code to include the per-request settings or adjust the pull request description to reflect the current scope of the changes.

Comment on lines +162 to +171
var err error
if r.IsStreaming() {
_, err = io.CopyBuffer(p, r.BodyStream(), nil)
} else {
_, err = io.Copy(p, bytes.NewReader(r.Body()))
}
defer func() {
if pc, ok := p.(io.WriteCloser); ok {
_ = pc.Close() //nolint:errcheck // not needed
}
}()

if err != nil {
return fmt.Errorf("failed to write response body to writer: %w", err)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for writing to an io.Writer can be simplified. The if r.IsStreaming() check is redundant because r.BodyStream() already provides the correct io.Reader for both streaming and non-streaming cases. You can simplify this block by removing the conditional and always using io.Copy with r.BodyStream(), which improves readability and maintainability.

        if _, err := io.Copy(p, r.BodyStream()); err != nil {
            return fmt.Errorf("failed to write response body to writer: %w", err)
        }

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 909b1c8 and 741c44f.

📒 Files selected for processing (6)
  • client/client.go (1 hunks)
  • client/client_test.go (1 hunks)
  • client/request.go (1 hunks)
  • client/response.go (2 hunks)
  • client/response_test.go (2 hunks)
  • docs/client/rest.md (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (5)
  • client/response_test.go
  • client/client.go
  • client/request.go
  • client/response.go
  • client/client_test.go
🧰 Additional context used
📓 Path-based instructions (1)
docs/**

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

Review and update the contents of the docs folder if necessary when modifying code

Files:

  • docs/client/rest.md

Comment on lines +408 to +414
## StreamResponseBody

Returns whether response body streaming is enabled. When enabled, the response body is not fully loaded into memory and can be read as a stream using `BodyStream()`. This is useful for handling large responses or server-sent events.

```go title="Signature"
func (c *Client) StreamResponseBody() bool
```
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Document default, overrides, and usage caveats for streaming.

Clarify behavior so users don’t misuse Body() and understand request-level overrides.

-## StreamResponseBody
-
-Returns whether response body streaming is enabled. When enabled, the response body is not fully loaded into memory and can be read as a stream using `BodyStream()`. This is useful for handling large responses or server-sent events.
+## StreamResponseBody
+
+Returns whether response body streaming is enabled for the client.
+
+When enabled, the response body is not fully buffered in memory and should be consumed via `Response.BodyStream()`. This is useful for large downloads and server‑sent events.
+
+:::note
+- Default: `false`.
+- Per-request setting via `Request.SetStreamResponseBody(bool)` overrides the client value.
+- When streaming is enabled, prefer `Response.BodyStream()`; avoid `Response.Body()` for large payloads as it may buffer the entire response.
+- You can check `Response.IsStreaming()` to know if the current response is streaming.
+:::
 
 ```go title="Signature"
 func (c *Client) StreamResponseBody() bool

<details>
<summary>🤖 Prompt for AI Agents</summary>

```
In docs/client/rest.md around lines 408 to 414, expand the StreamResponseBody
section to state the default value (e.g., enabled or disabled), explain how to
override it at request-level (e.g., per-request option or header) and
client-level, and note caveats: when streaming is enabled Body() will not return
full content (use BodyStream() to read incrementally), callers must consume or
close the stream to avoid leaks, and streaming is unsuitable for APIs that
require the entire body (e.g., JSON unmarshalling) unless explicitly buffered;
add a short usage example outline describing when to prefer streaming vs
buffering.
```

</details>

<!-- fingerprinting:phantom:triton:chinchilla -->

<!-- This is an auto-generated comment by CodeRabbit -->

@Abhirup-99
Copy link
Author

Increased test coverage and added documentation. @gaby @ReneWerner87

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (4)
client/client_test.go (2)

1781-1785: Prevent flaky read assertion: allow io.EOF when n > 0

io.Reader may return (n>0, EOF). Don’t fail the test in that case.

-	n, err := bodyStream.Read(buffer)
-	require.NoError(t, err)
-	require.Positive(t, n)
+	n, err := bodyStream.Read(buffer)
+	require.Greater(t, n, 0)
+	if err != nil {
+		require.ErrorIs(t, err, io.EOF)
+	}

1750-1769: SSE test isn’t exercising streaming; stream from server and assert progressive reads

Current handler buffers all events and the client reads once. Use SetBodyStreamWriter with flushes and gate with channels to deterministically validate incremental availability.

Apply this diff inside this test:

-	app, addr := startTestServerWithPort(t, func(app *fiber.App) {
-		app.Get("/sse", func(c fiber.Ctx) error {
+	firstSent := make(chan struct{})
+	cont := make(chan struct{})
+	app, addr := startTestServerWithPort(t, func(app *fiber.App) {
+		app.Get("/sse", func(c fiber.Ctx) error {
 			c.Set("Content-Type", "text/event-stream")
 			c.Set("Cache-Control", "no-cache")
 			c.Set("Connection", "keep-alive")
-
-			messages := []string{
-				"data: message 1\n\n",
-				"data: message 2\n\n",
-				"data: message 3\n\n",
-			}
-
-			for _, msg := range messages {
-				if _, err := c.WriteString(msg); err != nil {
-					return err
-				}
-			}
-
-			return nil
+			c.Context().SetBodyStreamWriter(func(w *bufio.Writer) {
+				_, _ = w.WriteString("data: message 1\n\n")
+				_ = w.Flush()
+				close(firstSent)
+				<-cont
+				_, _ = w.WriteString("data: message 2\n\n")
+				_ = w.Flush()
+				_, _ = w.WriteString("data: message 3\n\n")
+				_ = w.Flush()
+			})
+			return nil
 		})
 	})
@@
-	bodyStream := resp.BodyStream()
-	require.NotNil(t, bodyStream)
-
-	buffer := make([]byte, 1024)
-	n, err := bodyStream.Read(buffer)
-	require.NoError(t, err)
-	require.Positive(t, n)
-
-	content := string(buffer[:n])
-	require.Contains(t, content, "data: message 1")
+	bodyStream := resp.BodyStream()
+	require.NotNil(t, bodyStream)
+	select {
+	case <-firstSent:
+	case <-time.After(2 * time.Second):
+		t.Fatal("timeout waiting for first SSE message")
+	}
+	r := bufio.NewReader(bodyStream)
+	l1, err := r.ReadString('\n')
+	require.Contains(t, l1, "data: message 1")
+	_, _ = r.ReadString('\n') // consume blank line
+	close(cont)
+	l2, _ := r.ReadString('\n')
+	require.Contains(t, l2, "data: message 2")
+	_, _ = r.ReadString('\n')
+	l3, err := r.ReadString('\n')
+	require.Contains(t, l3, "data: message 3")
+	if err != nil {
+		require.ErrorIs(t, err, io.EOF)
+	}

Add import outside this hunk:

import "bufio"

Also applies to: 1778-1788

client/response_test.go (2)

709-721: Don’t retain Body() buffer from a pooled response; copy before Close()

Avoid aliasing a pooled buffer that may be reused.

-		normalBody := resp1.Body()
+		normalBody := append([]byte(nil), resp1.Body()...)

546-573: Stream the server response in Save() test to actually hit streaming path

The server uses SendString, which doesn’t guarantee streaming; use SetBodyStreamWriter with flushes so Save() exercises streaming.

-		server := startTestServer(t, func(app *fiber.App) {
-			app.Get("/stream", func(c fiber.Ctx) error {
-				return c.SendString("streaming data")
-			})
-		})
+		server := startTestServer(t, func(app *fiber.App) {
+			app.Get("/stream", func(c fiber.Ctx) error {
+				c.Context().SetBodyStreamWriter(func(w *bufio.Writer) {
+					_, _ = w.WriteString("streaming ")
+					_ = w.Flush()
+					time.Sleep(1 * time.Millisecond)
+					_, _ = w.WriteString("data")
+					_ = w.Flush()
+				})
+				return nil
+			})
+		})

Add import if missing:

import "bufio"
🧹 Nitpick comments (2)
client/response_test.go (2)

575-598: Also stream in the “error during copy” Save() test

Ensure the error path is hit while streaming, not from a buffered body.

-		server := startTestServer(t, func(app *fiber.App) {
-			app.Get("/stream", func(c fiber.Ctx) error {
-				return c.SendString("streaming data that will fail to write")
-			})
-		})
+		server := startTestServer(t, func(app *fiber.App) error {
+			app.Get("/stream", func(c fiber.Ctx) error {
+				c.Context().SetBodyStreamWriter(func(w *bufio.Writer) {
+					_, _ = w.WriteString("streaming data that will fail to write")
+					_ = w.Flush()
+				})
+				return nil
+			})
+			return nil
+		})

726-742: Prefer SetBodyStreamWriter over WriteString+ImmediateHeaderFlush for chunk tests

SetBodyStreamWriter gives clearer, deterministic streaming semantics.

-			app.Get("/chunked", func(c fiber.Ctx) error {
-				c.Set("Content-Type", "text/plain")
-				c.Set("Transfer-Encoding", "chunked")
-				chunks := []string{"chunk1", "chunk2", "chunk3"}
-				for i, chunk := range chunks {
-					if _, err := c.WriteString(chunk); err != nil {
-						return err
-					}
-					c.Response().ImmediateHeaderFlush = true
-					if i < len(chunks)-1 {
-						time.Sleep(10 * time.Millisecond)
-					}
-				}
-				return nil
-			})
+			app.Get("/chunked", func(c fiber.Ctx) error {
+				c.Set("Content-Type", "text/plain")
+				c.Context().SetBodyStreamWriter(func(w *bufio.Writer) {
+					for i, chunk := range []string{"chunk1", "chunk2", "chunk3"} {
+						_, _ = w.WriteString(chunk)
+						_ = w.Flush()
+						if i < 2 {
+							time.Sleep(10 * time.Millisecond)
+						}
+					}
+				})
+				return nil
+			})
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 741c44f and 9b16623.

📒 Files selected for processing (6)
  • client/client.go (1 hunks)
  • client/client_test.go (1 hunks)
  • client/request.go (1 hunks)
  • client/response.go (2 hunks)
  • client/response_test.go (2 hunks)
  • docs/client/rest.md (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • docs/client/rest.md
🚧 Files skipped from review as they are similar to previous changes (3)
  • client/client.go
  • client/response.go
  • client/request.go
🧰 Additional context used
📓 Path-based instructions (1)
**/*.go

📄 CodeRabbit inference engine (AGENTS.md)

**/*.go: Format Go code using gofumpt (enforced via make format)
Ensure code passes golangci-lint checks (enforced via make lint)
Optimize struct field alignment using betteralign (enforced via make betteralign)
Modernize Go code using gopls modernize (enforced via make modernize)

Files:

  • client/client_test.go
  • client/response_test.go
🧬 Code graph analysis (2)
client/client_test.go (1)
client/client.go (2)
  • New (646-651)
  • Get (705-707)
client/response_test.go (2)
client/client.go (2)
  • Get (705-707)
  • New (646-651)
client/response.go (1)
  • Response (19-25)

Comment on lines +774 to +823
t.Run("server sent events with incremental reads", func(t *testing.T) {
t.Parallel()

server := startTestServer(t, func(app *fiber.App) {
app.Get("/sse", func(c fiber.Ctx) error {
c.Set("Content-Type", "text/event-stream")
c.Set("Cache-Control", "no-cache")
c.Set("Connection", "keep-alive")

messages := []string{
"data: event 1\n\n",
"data: event 2\n\n",
"data: event 3\n\n",
"data: event 4\n\n",
}

for i, msg := range messages {
if _, err := c.WriteString(msg); err != nil {
return err
}
c.Response().ImmediateHeaderFlush = true
if i < len(messages)-1 {
time.Sleep(5 * time.Millisecond)
}
}
return nil
})
})
defer server.stop()

client := New().SetDial(server.dial()).SetStreamResponseBody(true)

resp, err := client.Get("http://example.com/sse")
require.NoError(t, err)
defer resp.Close()

bodyStream := resp.BodyStream()
require.NotNil(t, bodyStream)

data, err := io.ReadAll(bodyStream)
require.NoError(t, err)

content := string(data)
require.Contains(t, content, "event 1")
require.Contains(t, content, "event 2")
require.Contains(t, content, "event 3")
require.Contains(t, content, "event 4")
require.Contains(t, content, "data: event")
require.Contains(t, content, "\n\n")
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Test name says “incremental reads” but the test uses io.ReadAll; read progressively and stream from server

Make the test reflect its intent and avoid false positives.

-		server := startTestServer(t, func(app *fiber.App) {
-			app.Get("/sse", func(c fiber.Ctx) error {
-				c.Set("Content-Type", "text/event-stream")
-				c.Set("Cache-Control", "no-cache")
-				c.Set("Connection", "keep-alive")
-
-				messages := []string{
-					"data: event 1\n\n",
-					"data: event 2\n\n",
-					"data: event 3\n\n",
-					"data: event 4\n\n",
-				}
-
-				for i, msg := range messages {
-					if _, err := c.WriteString(msg); err != nil {
-						return err
-					}
-					c.Response().ImmediateHeaderFlush = true
-					if i < len(messages)-1 {
-						time.Sleep(5 * time.Millisecond)
-					}
-				}
-				return nil
-			})
-		})
+		server := startTestServer(t, func(app *fiber.App) {
+			app.Get("/sse", func(c fiber.Ctx) error {
+				c.Set("Content-Type", "text/event-stream")
+				c.Set("Cache-Control", "no-cache")
+				c.Set("Connection", "keep-alive")
+				c.Context().SetBodyStreamWriter(func(w *bufio.Writer) {
+					for i := 1; i <= 4; i++ {
+						_, _ = w.WriteString(fmt.Sprintf("data: event %d\n\n", i))
+						_ = w.Flush()
+						if i < 4 {
+							time.Sleep(5 * time.Millisecond)
+						}
+					}
+				})
+				return nil
+			})
+		})
@@
-		data, err := io.ReadAll(bodyStream)
-		require.NoError(t, err)
-
-		content := string(data)
-		require.Contains(t, content, "event 1")
-		require.Contains(t, content, "event 2")
-		require.Contains(t, content, "event 3")
-		require.Contains(t, content, "event 4")
-		require.Contains(t, content, "data: event")
-		require.Contains(t, content, "\n\n")
+		r := bufio.NewReader(bodyStream)
+		for i := 1; i <= 4; i++ {
+			line, err := r.ReadString('\n')
+			require.NoError(t, err)
+			require.Contains(t, line, fmt.Sprintf("data: event %d", i))
+			_, _ = r.ReadString('\n') // consume blank line
+		}

Add import if missing:

import "bufio"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: In Progress
Development

Successfully merging this pull request may close these issues.

4 participants