Skip to content

Commit 39ee43f

Browse files
authored
Merge pull request #21 from supabase/accept-request-body-in-worker
fix: process request body streams
2 parents 6b0ab7e + 8ad5e8c commit 39ee43f

File tree

4 files changed

+167
-24
lines changed

4 files changed

+167
-24
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Edge Runtime
22

3-
A server based on [Deno](https://deno.land) runtime, capable of running JavaScript, TypeScript, and WASM services.
3+
A web server based on [Deno](https://deno.land) runtime, capable of running JavaScript, TypeScript, and WASM services.
44

55
### Why are we building this?
66

base/src/js_worker/js/user_workers.js

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
((window) => {
44
const { TypeError } = window.__bootstrap.primordials;
5-
const { readableStreamForRid } = window.__bootstrap.streams;
5+
const { readableStreamForRid, writableStreamForRid } = window.__bootstrap.streams;
66
const core = window.Deno.core;
77
const ops = core.ops;
88

@@ -24,7 +24,16 @@
2424
hasBody,
2525
};
2626

27-
const res = await core.opAsync("op_user_worker_fetch", this.key, userWorkerReq);
27+
const { requestRid, requestBodyRid } = await ops.op_user_worker_fetch_build(userWorkerReq);
28+
29+
// stream the request body
30+
if (hasBody) {
31+
let writableStream = writableStreamForRid(requestBodyRid);
32+
body.pipeTo(writableStream);
33+
}
34+
35+
const res = await core.opAsync("op_user_worker_fetch_send", this.key, requestRid);
36+
2837
const bodyStream = readableStreamForRid(res.bodyRid);
2938
return new Response(bodyStream, {
3039
headers: res.headers,

base/src/js_worker/user_workers.rs

Lines changed: 154 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,13 @@ use deno_core::futures::StreamExt;
77
use deno_core::include_js_files;
88
use deno_core::op;
99
use deno_core::AsyncRefCell;
10-
use deno_core::CancelTryFuture;
1110
use deno_core::Extension;
1211
use deno_core::OpState;
1312
use deno_core::Resource;
1413
use deno_core::ResourceId;
14+
use deno_core::WriteOutcome;
1515
use deno_core::{AsyncResult, BufView, ByteString, CancelHandle, RcRef};
16+
use deno_core::{CancelFuture, CancelTryFuture};
1617
use hyper::body::HttpBody;
1718
use hyper::header::{HeaderName, HeaderValue};
1819
use hyper::{Body, Request, Response};
@@ -23,6 +24,8 @@ use std::collections::HashMap;
2324
use std::path::PathBuf;
2425
use std::pin::Pin;
2526
use std::rc::Rc;
27+
use std::task::Context;
28+
use std::task::Poll;
2629
use tokio::sync::{mpsc, oneshot};
2730

2831
pub fn init() -> Extension {
@@ -33,7 +36,8 @@ pub fn init() -> Extension {
3336
))
3437
.ops(vec![
3538
op_user_worker_create::decl(),
36-
op_user_worker_fetch::decl(),
39+
op_user_worker_fetch_build::decl(),
40+
op_user_worker_fetch_send::decl(),
3741
])
3842
.build()
3943
}
@@ -88,6 +92,13 @@ pub struct UserWorkerRequest {
8892
has_body: bool,
8993
}
9094

95+
#[derive(Serialize)]
96+
#[serde(rename_all = "camelCase")]
97+
pub struct UserWorkerBuiltRequest {
98+
request_rid: ResourceId,
99+
request_body_rid: Option<ResourceId>,
100+
}
101+
91102
#[derive(Serialize)]
92103
#[serde(rename_all = "camelCase")]
93104
pub struct UserWorkerResponse {
@@ -97,14 +108,63 @@ pub struct UserWorkerResponse {
97108
body_rid: ResourceId,
98109
}
99110

100-
type BytesStream = Pin<Box<dyn Stream<Item = Result<bytes::Bytes, std::io::Error>> + Unpin>>;
111+
struct UserWorkerRequestResource(Request<Body>);
101112

102-
struct UserWorkerRquestBodyResource {
103-
writer: AsyncRefCell<Peekable<BytesStream>>,
113+
impl Resource for UserWorkerRequestResource {
114+
fn name(&self) -> Cow<str> {
115+
"userWorkerRequest".into()
116+
}
117+
}
118+
119+
struct UserWorkerRequestBodyResource {
120+
body: AsyncRefCell<mpsc::Sender<Option<bytes::Bytes>>>,
104121
cancel: CancelHandle,
105-
size: Option<u64>,
106122
}
107123

124+
impl Resource for UserWorkerRequestBodyResource {
125+
fn name(&self) -> Cow<str> {
126+
"userWorkerRequestBody".into()
127+
}
128+
129+
fn write(self: Rc<Self>, buf: BufView) -> AsyncResult<WriteOutcome> {
130+
Box::pin(async move {
131+
let bytes: bytes::Bytes = buf.into();
132+
let nwritten = bytes.len();
133+
let body = RcRef::map(&self, |r| &r.body).borrow_mut().await;
134+
let cancel = RcRef::map(self, |r| &r.cancel);
135+
body.send(Some(bytes))
136+
.or_cancel(cancel)
137+
.await?
138+
.map_err(|_| type_error("request body receiver not connected (request closed)"))?;
139+
Ok(WriteOutcome::Full { nwritten })
140+
})
141+
}
142+
143+
fn shutdown(self: Rc<Self>) -> AsyncResult<()> {
144+
Box::pin(async move {
145+
let body = RcRef::map(&self, |r| &r.body).borrow_mut().await;
146+
let cancel = RcRef::map(self, |r| &r.cancel);
147+
// There is a case where hyper knows the size of the response body up
148+
// front (through content-length header on the resp), where it will drop
149+
// the body once that content length has been reached, regardless of if
150+
// the stream is complete or not. This is expected behaviour, but it means
151+
// that if you stream a body with an up front known size (eg a Blob),
152+
// explicit shutdown can never succeed because the body (and by extension
153+
// the receiver) will have dropped by the time we try to shutdown. As such
154+
// we ignore if the receiver is closed, because we know that the request
155+
// is complete in good health in that case.
156+
body.send(None).or_cancel(cancel).await?.ok();
157+
Ok(())
158+
})
159+
}
160+
161+
fn close(self: Rc<Self>) {
162+
self.cancel.cancel()
163+
}
164+
}
165+
166+
type BytesStream = Pin<Box<dyn Stream<Item = Result<bytes::Bytes, std::io::Error>> + Unpin>>;
167+
108168
struct UserWorkerResponseBodyResource {
109169
reader: AsyncRefCell<Peekable<BytesStream>>,
110170
cancel: CancelHandle,
@@ -159,18 +219,20 @@ impl Resource for UserWorkerResponseBodyResource {
159219
}
160220

161221
#[op]
162-
pub async fn op_user_worker_fetch(
163-
state: Rc<RefCell<OpState>>,
164-
key: String,
222+
pub fn op_user_worker_fetch_build(
223+
state: &mut OpState,
165224
req: UserWorkerRequest,
166-
) -> Result<UserWorkerResponse, AnyError> {
167-
let mut op_state = state.borrow_mut();
168-
let tx = op_state.borrow::<mpsc::UnboundedSender<UserWorkerMsgs>>();
169-
let (result_tx, result_rx) = oneshot::channel::<Response<Body>>();
170-
225+
) -> Result<UserWorkerBuiltRequest, AnyError> {
171226
let mut body = Body::empty();
227+
let mut request_body_rid = None;
172228
if req.has_body {
173-
//body = Body::wrap_stream(stream);
229+
let (stream, tx) = MpscByteStream::new();
230+
body = Body::wrap_stream(stream);
231+
232+
request_body_rid = Some(state.resource_table.add(UserWorkerRequestBodyResource {
233+
body: AsyncRefCell::new(tx),
234+
cancel: CancelHandle::default(),
235+
}));
174236
}
175237

176238
let mut request = Request::builder()
@@ -183,18 +245,47 @@ pub async fn op_user_worker_fetch(
183245
for (key, value) in req.headers {
184246
if !key.is_empty() {
185247
let header_name = HeaderName::try_from(key).unwrap();
186-
let header_value = HeaderValue::try_from(value).unwrap_or(HeaderValue::from_static(""));
248+
let mut header_value =
249+
HeaderValue::try_from(value).unwrap_or(HeaderValue::from_static(""));
187250

188-
// skip content-length header, this would be re-calculated and added to the request
189-
if header_name.eq("content-length") {
190-
continue;
251+
// if request has no body explicitly set the content-length to 0
252+
if !req.has_body && header_name.eq("content-length") {
253+
header_value = HeaderValue::from(0);
191254
}
192255

193256
request.headers_mut().append(header_name, header_value);
194257
}
195258
}
196259

197-
tx.send(UserWorkerMsgs::SendRequest(key, request, result_tx));
260+
let request_rid = state.resource_table.add(UserWorkerRequestResource(request));
261+
262+
Ok(UserWorkerBuiltRequest {
263+
request_rid,
264+
request_body_rid,
265+
})
266+
}
267+
268+
#[op]
269+
pub async fn op_user_worker_fetch_send(
270+
state: Rc<RefCell<OpState>>,
271+
key: String,
272+
rid: ResourceId,
273+
) -> Result<UserWorkerResponse, AnyError> {
274+
let mut op_state = state.borrow_mut();
275+
let tx = op_state
276+
.borrow::<mpsc::UnboundedSender<UserWorkerMsgs>>()
277+
.clone();
278+
279+
let request = op_state
280+
.resource_table
281+
.take::<UserWorkerRequestResource>(rid)?;
282+
drop(op_state);
283+
284+
let request = Rc::try_unwrap(request)
285+
.ok()
286+
.expect("multiple op_user_worker_fetch_send ongoing");
287+
let (result_tx, result_rx) = oneshot::channel::<Response<Body>>();
288+
tx.send(UserWorkerMsgs::SendRequest(key, request.0, result_tx));
198289

199290
let result = result_rx.await;
200291
if result.is_err() {
@@ -227,6 +318,8 @@ pub async fn op_user_worker_fetch(
227318
.into_body()
228319
.map(|r| r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))),
229320
);
321+
322+
let mut op_state = state.borrow_mut();
230323
let body_rid = op_state.resource_table.add(UserWorkerResponseBodyResource {
231324
reader: AsyncRefCell::new(stream.peekable()),
232325
cancel: CancelHandle::default(),
@@ -241,3 +334,44 @@ pub async fn op_user_worker_fetch(
241334
};
242335
Ok(response)
243336
}
337+
338+
// [copied from https://github.com/denoland/deno/blob/v1.31.3/ext/fetch/byte_stream.rs]
339+
// [MpscByteStream] is a stream of bytes that is backed by a mpsc channel. It is
340+
// used to bridge between the fetch task and the HTTP body stream. The stream
341+
// has the special property that it errors if the channel is closed before an
342+
// explicit EOF is sent (in the form of a [None] value on the sender).
343+
pub struct MpscByteStream {
344+
receiver: mpsc::Receiver<Option<bytes::Bytes>>,
345+
shutdown: bool,
346+
}
347+
348+
impl MpscByteStream {
349+
pub fn new() -> (Self, mpsc::Sender<Option<bytes::Bytes>>) {
350+
let (sender, receiver) = mpsc::channel::<Option<bytes::Bytes>>(1);
351+
let this = Self {
352+
receiver,
353+
shutdown: false,
354+
};
355+
(this, sender)
356+
}
357+
}
358+
359+
impl Stream for MpscByteStream {
360+
type Item = Result<bytes::Bytes, std::io::Error>;
361+
362+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
363+
let val = std::task::ready!(self.receiver.poll_recv(cx));
364+
match val {
365+
None if self.shutdown => Poll::Ready(None),
366+
None => Poll::Ready(Some(Err(std::io::Error::new(
367+
std::io::ErrorKind::UnexpectedEof,
368+
"channel closed",
369+
)))),
370+
Some(None) => {
371+
self.shutdown = true;
372+
Poll::Ready(None)
373+
}
374+
Some(Some(val)) => Poll::Ready(Some(Ok(val))),
375+
}
376+
}
377+
}

examples/foo/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ interface reqPayload {
77
console.log('server started modified');
88

99
serve(async (req: Request) => {
10-
const { name } : reqPayload = await req.json()
10+
const { name } : reqPayload = await req.json();
1111
const data = {
1212
message: `Hello ${name} from foo!`,
1313
test: 'foo'

0 commit comments

Comments
 (0)