Skip to content

Commit f968212

Browse files
authored
Merge pull request #14 from supabase/refactor-worker-limits
Refactor worker limits
2 parents 7414b77 + 2fb6d08 commit f968212

File tree

6 files changed

+71
-41
lines changed

6 files changed

+71
-41
lines changed

README.md

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,17 @@ docker build -t edge-runtime .
2222
docker run -it --rm -p 9000:9000 -v /path/to/supabase/functions:/usr/services supabase/edge-runtime start --dir /usr/services
2323
```
2424

25+
## Architecture
26+
27+
Server -> Base Worker -> User Function
28+
2529
## TODO
2630

27-
* Support import maps
2831
* Check verify-jwt
29-
* better error messages for incorrect module loading paths (local)
32+
* Workers should only have access to env variables assigned to it
3033
* handle 404 errors
34+
* better error messages for incorrect module loading paths
35+
* better error messages for invalid import map paths
3136
* Support snapshotting the runtime
3237
* Support for private modules (DENO_AUTH_TOKENS)
3338
* HTTP/2 support (need the host to support SSL)

base/src/commands.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::server::Server;
22
use anyhow::Error;
3+
use std::collections::HashMap;
34

45
#[tokio::main]
56
pub async fn start_server(
@@ -10,6 +11,7 @@ pub async fn start_server(
1011
service_timeout: u16,
1112
no_module_cache: bool,
1213
import_map_path: Option<String>,
14+
env_vars: HashMap<String, String>,
1315
) -> Result<(), Error> {
1416
let server = Server::new(
1517
ip,
@@ -19,6 +21,7 @@ pub async fn start_server(
1921
service_timeout,
2022
no_module_cache,
2123
import_map_path,
24+
env_vars,
2225
)?;
2326
server.listen().await
2427
}

base/src/js_worker.rs

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use deno_core::JsRuntime;
77
use deno_core::RuntimeOptions;
88
use import_map::{parse_from_json, ImportMap, ImportMapDiagnostic};
99
use log::{debug, error, warn};
10+
use std::collections::HashMap;
1011
use std::fs;
1112
use std::panic;
1213
use std::path::Path;
@@ -24,6 +25,7 @@ pub mod module_loader;
2425
pub mod net_override;
2526
pub mod permissions;
2627
pub mod runtime;
28+
pub mod types;
2729

2830
use module_loader::DefaultModuleLoader;
2931
use permissions::Permissions;
@@ -34,6 +36,7 @@ pub async fn serve(
3436
worker_timeout_ms: u64,
3537
no_module_cache: bool,
3638
import_map_path: Option<String>,
39+
env_vars: HashMap<String, String>,
3740
tcp_stream: TcpStream,
3841
) -> Result<(), Error> {
3942
let service_path_clone = service_path.clone();
@@ -50,11 +53,13 @@ pub async fn serve(
5053
worker_timeout_ms,
5154
no_module_cache,
5255
import_map_path,
56+
env_vars,
5357
tcp_stream_rx,
5458
shutdown_tx,
5559
)
5660
});
5761

62+
// send the tcp stram to the worker
5863
tcp_stream_tx.send(tcp_stream)?;
5964

6065
debug!("js worker for {} started", service_name);
@@ -94,12 +99,14 @@ fn print_import_map_diagnostics(diagnostics: &[ImportMapDiagnostic]) {
9499
);
95100
}
96101
}
102+
97103
fn start_runtime(
98104
service_path: PathBuf,
99105
memory_limit_mb: u64,
100106
worker_timeout_ms: u64,
101107
no_module_cache: bool,
102108
import_map_path: Option<String>,
109+
env_vars: HashMap<String, String>,
103110
tcp_stream_rx: mpsc::UnboundedReceiver<TcpStream>,
104111
shutdown_tx: oneshot::Sender<()>,
105112
) {
@@ -114,7 +121,7 @@ fn start_runtime(
114121
// TODO: check for other potential main paths (eg: index.js, index.tsx)
115122
let main_module_url = base_url.join("index.ts").unwrap();
116123

117-
// Note: this will load Mozilla's CAs (we may also need to support
124+
// Note: this will load Mozilla's CAs (we may also need to support system certs)
118125
let root_cert_store = deno_tls::create_default_root_cert_store();
119126

120127
let extensions_with_js = vec![
@@ -165,8 +172,8 @@ fn start_runtime(
165172
..Default::default()
166173
});
167174

168-
let v8_thread_safe_handle = js_runtime.v8_isolate().thread_safe_handle();
169175
let (memory_limit_tx, memory_limit_rx) = mpsc::unbounded_channel::<u64>();
176+
let (halt_execution_tx, mut halt_execution_rx) = oneshot::channel::<()>();
170177

171178
// add a callback when a worker reaches its memory limit
172179
js_runtime.add_near_heap_limit_callback(move |cur, _init| {
@@ -199,9 +206,16 @@ fn start_runtime(
199206
let op_state_rc = js_runtime.op_state();
200207
let mut op_state = op_state_rc.borrow_mut();
201208
op_state.put::<mpsc::UnboundedReceiver<TcpStream>>(tcp_stream_rx);
209+
op_state.put::<types::EnvVars>(env_vars);
202210
}
203211

204-
start_controller_thread(v8_thread_safe_handle, worker_timeout_ms, memory_limit_rx);
212+
let v8_thread_safe_handle = js_runtime.v8_isolate().thread_safe_handle();
213+
start_controller_thread(
214+
v8_thread_safe_handle,
215+
worker_timeout_ms,
216+
memory_limit_rx,
217+
halt_execution_tx,
218+
);
205219

206220
let runtime = tokio::runtime::Builder::new_current_thread()
207221
.enable_all()
@@ -211,7 +225,15 @@ fn start_runtime(
211225
let future = async move {
212226
let mod_id = js_runtime.load_main_module(&main_module_url, None).await?;
213227
let result = js_runtime.mod_evaluate(mod_id);
214-
js_runtime.run_event_loop(false).await?;
228+
229+
tokio::select! {
230+
_ = js_runtime.run_event_loop(false) => {
231+
debug!("event loop completed");
232+
}
233+
_ = &mut halt_execution_rx => {
234+
debug!("worker exectution halted");
235+
}
236+
}
215237

216238
result.await?
217239
};
@@ -230,6 +252,7 @@ fn start_controller_thread(
230252
v8_thread_safe_handle: v8::IsolateHandle,
231253
worker_timeout_ms: u64,
232254
mut memory_limit_rx: mpsc::UnboundedReceiver<u64>,
255+
halt_execution_tx: oneshot::Sender<()>,
233256
) {
234257
thread::spawn(move || {
235258
let rt = tokio::runtime::Builder::new_current_thread()
@@ -255,5 +278,9 @@ fn start_controller_thread(
255278
} else {
256279
debug!("worker already terminated");
257280
}
281+
282+
halt_execution_tx
283+
.send(())
284+
.expect("failed to send halt execution signal");
258285
});
259286
}

base/src/js_worker/env.rs

Lines changed: 10 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
use crate::js_worker::permissions::Permissions;
2+
use crate::js_worker::types::EnvVars;
23

3-
use deno_core::error::type_error;
44
use deno_core::error::AnyError;
5+
use deno_core::error::{not_supported, type_error};
56
use deno_core::include_js_files;
67
use deno_core::op;
78
use deno_core::Extension;
89
use deno_core::OpState;
910
use deno_node::NODE_ENV_VAR_ALLOWLIST;
1011
use std::collections::HashMap;
11-
use std::env;
1212

1313
pub fn init() -> Extension {
1414
Extension::builder("custom:env")
@@ -26,31 +26,15 @@ pub fn init() -> Extension {
2626
}
2727

2828
#[op]
29-
fn op_set_env(state: &mut OpState, key: String, value: String) -> Result<(), AnyError> {
30-
state.borrow_mut::<Permissions>().check_env(&key)?;
31-
if key.is_empty() {
32-
return Err(type_error("Key is an empty string."));
33-
}
34-
if key.contains(&['=', '\0'] as &[char]) {
35-
return Err(type_error(format!(
36-
"Key contains invalid characters: {:?}",
37-
key
38-
)));
39-
}
40-
if value.contains('\0') {
41-
return Err(type_error(format!(
42-
"Value contains invalid characters: {:?}",
43-
value
44-
)));
45-
}
46-
env::set_var(key, value);
47-
Ok(())
29+
fn op_set_env(_state: &mut OpState, _key: String, _value: String) -> Result<(), AnyError> {
30+
Err(not_supported())
4831
}
4932

5033
#[op]
5134
fn op_env(state: &mut OpState) -> Result<HashMap<String, String>, AnyError> {
5235
state.borrow_mut::<Permissions>().check_env_all()?;
53-
Ok(env::vars().collect())
36+
let env_vars = state.borrow::<EnvVars>();
37+
Ok(env_vars.clone())
5438
}
5539

5640
#[op]
@@ -72,19 +56,12 @@ fn op_get_env(state: &mut OpState, key: String) -> Result<Option<String>, AnyErr
7256
)));
7357
}
7458

75-
let r = match env::var(key) {
76-
Err(env::VarError::NotPresent) => None,
77-
v => Some(v?),
78-
};
59+
let env_vars = state.borrow::<EnvVars>();
60+
let r = env_vars.get(&key).map(|k| k.clone());
7961
Ok(r)
8062
}
8163

8264
#[op]
83-
fn op_delete_env(state: &mut OpState, key: String) -> Result<(), AnyError> {
84-
state.borrow_mut::<Permissions>().check_env(&key)?;
85-
if key.is_empty() || key.contains(&['=', '\0'] as &[char]) {
86-
return Err(type_error("Key contains invalid characters."));
87-
}
88-
env::remove_var(key);
89-
Ok(())
65+
fn op_delete_env(_state: &mut OpState, _key: String) -> Result<(), AnyError> {
66+
Err(not_supported())
9067
}

base/src/server.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::js_worker;
22
use anyhow::{bail, Error};
33
use log::{debug, error, info};
4+
use std::collections::HashMap;
45
use std::net::IpAddr;
56
use std::net::Ipv4Addr;
67
use std::net::SocketAddr;
@@ -17,6 +18,7 @@ async fn process_stream(
1718
service_timeout: u16,
1819
no_module_cache: bool,
1920
import_map_path: Option<String>,
21+
env_vars: HashMap<String, String>,
2022
) -> Result<(), Error> {
2123
// peek into the HTTP header
2224
// find request path
@@ -30,7 +32,7 @@ async fn process_stream(
3032

3133
let mut headers = [httparse::EMPTY_HEADER; 64];
3234
let mut req = httparse::Request::new(&mut headers);
33-
let _ = req.parse(&buf).unwrap();
35+
let _ = req.parse(&buf)?;
3436

3537
if req.path.is_none() {
3638
// if the path isn't found in first 1024 bytes it must not be a valid http
@@ -75,6 +77,7 @@ async fn process_stream(
7577
worker_timeout_ms,
7678
no_module_cache,
7779
import_map_path,
80+
env_vars,
7881
stream,
7982
)
8083
.await?;
@@ -90,6 +93,7 @@ pub struct Server {
9093
service_timeout: u16,
9194
no_module_cache: bool,
9295
import_map_path: Option<String>,
96+
env_vars: HashMap<String, String>,
9397
}
9498

9599
impl Server {
@@ -101,6 +105,7 @@ impl Server {
101105
service_timeout: u16,
102106
no_module_cache: bool,
103107
import_map_path: Option<String>,
108+
env_vars: HashMap<String, String>,
104109
) -> Result<Self, Error> {
105110
let ip = Ipv4Addr::from_str(ip)?;
106111
Ok(Self {
@@ -111,6 +116,7 @@ impl Server {
111116
service_timeout,
112117
no_module_cache,
113118
import_map_path,
119+
env_vars,
114120
})
115121
}
116122

@@ -129,8 +135,16 @@ impl Server {
129135
let service_timeout = self.service_timeout;
130136
let no_module_cache = self.no_module_cache;
131137
let import_map_path_clone = self.import_map_path.clone();
138+
let env_vars_clone = self.env_vars.clone();
132139
tokio::task::spawn(async move {
133-
let res = process_stream(stream, services_dir_clone, mem_limit, service_timeout, no_module_cache, import_map_path_clone).await;
140+
let res = process_stream(
141+
stream,
142+
services_dir_clone,
143+
mem_limit,
144+
service_timeout,
145+
no_module_cache,
146+
import_map_path_clone,
147+
env_vars_clone).await;
134148
if res.is_err() {
135149
error!("{:?}", res.err().unwrap());
136150
}

cli/src/main.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use anyhow::Error;
44
use base::commands::start_server;
55
use clap::builder::FalseyValueParser;
66
use clap::{arg, value_parser, ArgAction, Command};
7+
use std::env;
78

89
fn cli() -> Command {
910
Command::new("edge-runtime")
@@ -74,6 +75,8 @@ fn main() {
7475
.cloned()
7576
.unwrap();
7677
let import_map_path = sub_matches.get_one::<String>("import-map").cloned();
78+
// CLI will provide all OS environment variables to a function
79+
let env_vars = env::vars().collect();
7780

7881
exit_with_code(start_server(
7982
&ip.as_str(),
@@ -83,6 +86,7 @@ fn main() {
8386
service_timeout,
8487
no_module_cache,
8588
import_map_path,
89+
env_vars,
8690
))
8791
}
8892
_ => {

0 commit comments

Comments
 (0)