Skip to content

Commit 9b29e30

Browse files
authored
Merge pull request #31 from supabase/fix-service-not-exist
fix: return an error when service path does not exist
2 parents 39ee43f + 45962a9 commit 9b29e30

File tree

5 files changed

+61
-27
lines changed

5 files changed

+61
-27
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

base/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,5 @@ reqwest = { version = "0.11.13" }
3131
serde = { version = "1.0.149", features = ["derive"] }
3232
tokio = { version = "1.24", features = ["full"] }
3333
url = { version = "2.3.1" }
34+
uuid = { version = "1.1.2", features = ["v4"] }
3435
v8 = { version = "0.60.1", default-features = false }

base/src/js_worker/user_workers.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::worker_ctx::{CreateUserWorkerResult, UserWorkerMsgs, UserWorkerOptions};
22

3+
use anyhow::Error;
34
use deno_core::error::{custom_error, type_error, AnyError};
45
use deno_core::futures::stream::Peekable;
56
use deno_core::futures::Stream;
@@ -27,6 +28,7 @@ use std::rc::Rc;
2728
use std::task::Context;
2829
use std::task::Poll;
2930
use tokio::sync::{mpsc, oneshot};
31+
use uuid::Uuid;
3032

3133
pub fn init() -> Extension {
3234
Extension::builder("custom:user_workers")
@@ -54,7 +56,7 @@ pub async fn op_user_worker_create(
5456
) -> Result<String, AnyError> {
5557
let op_state = state.borrow();
5658
let tx = op_state.borrow::<mpsc::UnboundedSender<UserWorkerMsgs>>();
57-
let (result_tx, result_rx) = oneshot::channel::<CreateUserWorkerResult>();
59+
let (result_tx, result_rx) = oneshot::channel::<Result<CreateUserWorkerResult, Error>>();
5860

5961
let mut env_vars = HashMap::new();
6062
for (key, value) in env_vars_vec {
@@ -79,8 +81,15 @@ pub async fn op_user_worker_create(
7981
));
8082
}
8183

84+
// channel returns a Result<T, E>, we need to unwrap it first;
8285
let result = result.unwrap();
83-
Ok(result.key)
86+
if result.is_err() {
87+
return Err(custom_error(
88+
"create_user_worker_error",
89+
result.unwrap_err().to_string(),
90+
));
91+
}
92+
Ok(result.unwrap().key.to_string())
8493
}
8594

8695
#[derive(Deserialize, Debug)]
@@ -285,7 +294,8 @@ pub async fn op_user_worker_fetch_send(
285294
.ok()
286295
.expect("multiple op_user_worker_fetch_send ongoing");
287296
let (result_tx, result_rx) = oneshot::channel::<Response<Body>>();
288-
tx.send(UserWorkerMsgs::SendRequest(key, request.0, result_tx));
297+
let uuid = Uuid::parse_str(key.as_str())?;
298+
tx.send(UserWorkerMsgs::SendRequest(uuid, request.0, result_tx));
289299

290300
let result = result_rx.await;
291301
if result.is_err() {

base/src/worker_ctx.rs

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::js_worker::{MainWorker, UserWorker};
22

3-
use anyhow::Error;
3+
use anyhow::{bail, Error};
44
use hyper::{Body, Request, Response};
55
use log::{debug, error};
66
use std::collections::HashMap;
@@ -11,7 +11,9 @@ use std::thread;
1111
use tokio::net::UnixStream;
1212
use tokio::sync::RwLock;
1313
use tokio::sync::{mpsc, oneshot};
14+
use uuid::Uuid;
1415

16+
#[derive(Debug)]
1517
pub struct WorkerContext {
1618
handle: thread::JoinHandle<Result<(), Error>>,
1719
request_sender: hyper::client::conn::SendRequest<Body>,
@@ -41,6 +43,10 @@ impl WorkerContext {
4143
let import_map_path = options.import_map_path;
4244
let user_worker_msgs_tx = options.user_worker_msgs_tx;
4345

46+
if (!service_path.exists()) {
47+
return bail!("main function does not exist {:?}", &service_path);
48+
}
49+
4450
// create a unix socket pair
4551
let (sender_stream, recv_stream) = UnixStream::pair()?;
4652

@@ -90,6 +96,10 @@ impl WorkerContext {
9096
let import_map_path = options.import_map_path;
9197
let env_vars = options.env_vars;
9298

99+
if (!service_path.exists()) {
100+
return bail!("user function does not exist {:?}", &service_path);
101+
}
102+
93103
// create a unix socket pair
94104
let (sender_stream, recv_stream) = UnixStream::pair()?;
95105

@@ -141,13 +151,16 @@ impl WorkerContext {
141151

142152
#[derive(Debug)]
143153
pub struct CreateUserWorkerResult {
144-
pub key: String,
154+
pub key: Uuid,
145155
}
146156

147157
#[derive(Debug)]
148158
pub enum UserWorkerMsgs {
149-
Create(UserWorkerOptions, oneshot::Sender<CreateUserWorkerResult>),
150-
SendRequest(String, Request<Body>, oneshot::Sender<Response<Body>>),
159+
Create(
160+
UserWorkerOptions,
161+
oneshot::Sender<Result<CreateUserWorkerResult, Error>>,
162+
),
163+
SendRequest(Uuid, Request<Body>, oneshot::Sender<Response<Body>>),
151164
}
152165

153166
pub struct WorkerPool {
@@ -174,23 +187,24 @@ impl WorkerPool {
174187
let main_worker = Arc::new(RwLock::new(main_worker_ctx));
175188

176189
tokio::spawn(async move {
177-
let mut user_workers: HashMap<String, Arc<RwLock<WorkerContext>>> = HashMap::new();
190+
let mut user_workers: HashMap<Uuid, Arc<RwLock<WorkerContext>>> = HashMap::new();
178191

179192
loop {
180193
match user_worker_msgs_rx.recv().await {
181194
None => break,
182195
Some(UserWorkerMsgs::Create(worker_options, tx)) => {
183-
let key = worker_options.service_path.display().to_string();
184-
if !user_workers.contains_key(&key) {
185-
// TODO: handle errors
186-
let user_worker_ctx = WorkerContext::new_user_worker(worker_options)
187-
.await
188-
.unwrap();
189-
user_workers
190-
.insert(key.clone(), Arc::new(RwLock::new(user_worker_ctx)));
196+
let key = Uuid::new_v4();
197+
let user_worker_ctx = WorkerContext::new_user_worker(worker_options).await;
198+
if !user_worker_ctx.is_err() {
199+
user_workers.insert(
200+
key.clone(),
201+
Arc::new(RwLock::new(user_worker_ctx.unwrap())),
202+
);
203+
204+
tx.send(Ok(CreateUserWorkerResult { key }));
205+
} else {
206+
tx.send(Err(user_worker_ctx.unwrap_err()));
191207
}
192-
193-
tx.send(CreateUserWorkerResult { key });
194208
}
195209
Some(UserWorkerMsgs::SendRequest(key, req, tx)) => {
196210
// TODO: handle errors

examples/main/index.ts

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,21 @@ serve(async (req: Request) => {
2424
const no_module_cache = false;
2525
const import_map_path = null;
2626
const env_vars = [];
27-
const worker = await EdgeRuntime.userWorkers.create({
28-
service_path,
29-
memory_limit_mb,
30-
worker_timeout_ms,
31-
no_module_cache,
32-
import_map_path,
33-
env_vars
34-
});
35-
return worker.fetch(req);
27+
try {
28+
const worker = await EdgeRuntime.userWorkers.create({
29+
service_path,
30+
memory_limit_mb,
31+
worker_timeout_ms,
32+
no_module_cache,
33+
import_map_path,
34+
env_vars
35+
});
36+
return worker.fetch(req);
37+
} catch (e) {
38+
const error = { msg: e.toString() }
39+
return new Response(
40+
JSON.stringify(error),
41+
{ status: 500, headers: { "Content-Type": "application/json" } },
42+
)
43+
}
3644
})

0 commit comments

Comments
 (0)