Skip to content

Commit

Permalink
fix: expo/fetch not closing streams
Browse files Browse the repository at this point in the history
  • Loading branch information
Vali-98 committed Dec 5, 2024
1 parent bdcb08f commit ab6fbde
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 4 deletions.
1 change: 0 additions & 1 deletion constants/API/APIBuilder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ const readableStreamResponse = async (

useInference.getState().setAbort(async () => {
Logger.debug('Running abort')
closeStream()
sse.abort()
})

Expand Down
13 changes: 10 additions & 3 deletions constants/SSEFetch.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Logger } from '@globals'
import { fetch } from 'expo/fetch'

type SSEValues = {
Expand Down Expand Up @@ -25,21 +26,27 @@ export class SSEFetch {
this.abortController = new AbortController()
const body = values.method === 'POST' ? { body: values.body } : {}

fetch(values.endpoint, {
const res = await fetch(values.endpoint, {
signal: this.abortController.signal,
method: values.method,
headers: values.headers,
...body,
}).then(async (res) => {
})
try {
if (res.status !== 200 || !res.body) return this.onError()
this.closeStream = res.body.cancel
for await (const chunk of res.body) {
const data = this.decoder.decode(chunk)
const output = parseSSE(data)
output.forEach((item) => this.onEvent(item))
}
} catch (e) {
if (this.abortController.signal.aborted) {
Logger.debug('Abort caught')
}
} finally {
this.onClose()
})
}
}

public setOnEvent(callback: (data: string) => void) {
Expand Down
97 changes: 97 additions & 0 deletions patches/expo+52.0.14.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
diff --git a/node_modules/expo/android/src/main/java/expo/modules/fetch/NativeResponse.kt b/node_modules/expo/android/src/main/java/expo/modules/fetch/NativeResponse.kt
index 8254d15..64554a4 100644
--- a/node_modules/expo/android/src/main/java/expo/modules/fetch/NativeResponse.kt
+++ b/node_modules/expo/android/src/main/java/expo/modules/fetch/NativeResponse.kt
@@ -68,6 +68,7 @@ internal class NativeResponse(appContext: AppContext, private val coroutineScope
if (isInvalidState(ResponseState.BODY_STREAMING_STARTED)) {
return
}
+
state = ResponseState.BODY_STREAMING_CANCELLED
}

@@ -118,7 +119,6 @@ internal class NativeResponse(appContext: AppContext, private val coroutineScope
val stream = response.body?.source() ?: return@launch
pumpResponseBodyStream(stream)
response.close()
-
if ([email protected] == ResponseState.BODY_STREAMING_STARTED) {
emit("didComplete")
}
@@ -157,7 +157,7 @@ internal class NativeResponse(appContext: AppContext, private val coroutineScope
)
}

- private fun pumpResponseBodyStream(stream: BufferedSource) {
+ /*private fun pumpResponseBodyStream(stream: BufferedSource) {
while (!stream.exhausted()) {
if (isInvalidState(
ResponseState.RESPONSE_RECEIVED,
@@ -165,6 +165,7 @@ internal class NativeResponse(appContext: AppContext, private val coroutineScope
ResponseState.BODY_STREAMING_CANCELLED
)
) {
+ Log.w(TAG, "State is Invalid")
break
}
if (state == ResponseState.RESPONSE_RECEIVED) {
@@ -175,7 +176,45 @@ internal class NativeResponse(appContext: AppContext, private val coroutineScope
break
}
}
- }
+ }*/
+
+ private fun pumpResponseBodyStream(stream: BufferedSource) {
+ try {
+ while (!stream.exhausted()) {
+ if (isInvalidState(
+ ResponseState.RESPONSE_RECEIVED,
+ ResponseState.BODY_STREAMING_STARTED,
+ ResponseState.BODY_STREAMING_CANCELLED
+ )
+ ) {
+ break
+ }
+ if (state == ResponseState.RESPONSE_RECEIVED) {
+ sink.appendBufferBody(stream.buffer.readByteArray())
+ } else if (state == ResponseState.BODY_STREAMING_STARTED) {
+ emit("didReceiveResponseData", stream.buffer.readByteArray())
+ } else {
+ break
+ }
+ }
+ } catch (e: IOException) {
+ Log.e(TAG, "Error while pumping response body stream: ${e.message}", e)
+ state = ResponseState.ERROR_RECEIVED
+ error = e
+ emit("didFailWithError", e)
+ } catch (e: Exception) {
+ Log.e(TAG, "Unexpected error while pumping response body stream: ${e.message}", e)
+ state = ResponseState.ERROR_RECEIVED
+ error = e
+ emit("didFailWithError", e)
+ } finally {
+ try {
+ stream.close()
+ } catch (e: IOException) {
+ Log.w(TAG, "Error while closing the stream: ${e.message}", e)
+ }
+ }
+}

//endregion Internals

diff --git a/node_modules/expo/src/winter/fetch/fetch.ts b/node_modules/expo/src/winter/fetch/fetch.ts
index e7a515b..0a0f8ea 100644
--- a/node_modules/expo/src/winter/fetch/fetch.ts
+++ b/node_modules/expo/src/winter/fetch/fetch.ts
@@ -38,7 +38,7 @@ export async function fetch(url: string, init?: FetchRequestInit): Promise<Fetch
throw new FetchError(String(e));
}
} finally {
- init?.signal?.removeEventListener('abort', abortHandler);
+ // init?.signal?.removeEventListener('abort', abortHandler);
}
return response;
}

0 comments on commit ab6fbde

Please sign in to comment.