Skip to content

Commit 8589927

Browse files
authored
Merge pull request #19 from supabase/refactor-init-worker
fix: refactor worker init
2 parents 902a27e + 8d5016e commit 8589927

File tree

6 files changed

+53
-33
lines changed

6 files changed

+53
-33
lines changed

base/src/commands.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,20 @@ use crate::server::Server;
22
use anyhow::Error;
33

44
#[tokio::main]
5-
pub async fn start_server(ip: &str, port: u16, main_service_path: String) -> Result<(), Error> {
6-
let mut server = Server::new(ip, port, main_service_path).await?;
5+
pub async fn start_server(
6+
ip: &str,
7+
port: u16,
8+
main_service_path: String,
9+
import_map_path: Option<String>,
10+
no_module_cache: bool,
11+
) -> Result<(), Error> {
12+
let mut server = Server::new(
13+
ip,
14+
port,
15+
main_service_path,
16+
import_map_path,
17+
no_module_cache,
18+
)
19+
.await?;
720
server.listen().await
821
}

base/src/js_worker.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::utils::units::{bytes_to_display, human_elapsed, mib_to_bytes};
2-
use crate::worker_ctx::WorkerPoolMsg;
2+
use crate::worker_ctx::UserWorkerMsgs;
33

44
use anyhow::{bail, Error};
55
use deno_core::located_script_name;
@@ -66,15 +66,15 @@ fn print_import_map_diagnostics(diagnostics: &[ImportMapDiagnostic]) {
6666
pub struct MainWorker {
6767
js_runtime: JsRuntime,
6868
main_module_url: ModuleSpecifier,
69-
worker_pool_tx: mpsc::UnboundedSender<WorkerPoolMsg>,
69+
worker_pool_tx: mpsc::UnboundedSender<UserWorkerMsgs>,
7070
}
7171

7272
impl MainWorker {
7373
pub fn new(
7474
service_path: PathBuf,
7575
no_module_cache: bool,
7676
import_map_path: Option<String>,
77-
worker_pool_tx: mpsc::UnboundedSender<WorkerPoolMsg>,
77+
worker_pool_tx: mpsc::UnboundedSender<UserWorkerMsgs>,
7878
) -> Result<Self, Error> {
7979
// Note: MainWorker
8080
// - does not have memory or worker timeout [x]
@@ -179,7 +179,7 @@ impl MainWorker {
179179
let op_state_rc = self.js_runtime.op_state();
180180
let mut op_state = op_state_rc.borrow_mut();
181181
op_state.put::<mpsc::UnboundedReceiver<UnixStream>>(unix_stream_rx);
182-
op_state.put::<mpsc::UnboundedSender<WorkerPoolMsg>>(worker_pool_tx);
182+
op_state.put::<mpsc::UnboundedSender<UserWorkerMsgs>>(worker_pool_tx);
183183
op_state.put::<types::EnvVars>(env_vars);
184184
}
185185

base/src/js_worker/user_workers.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::worker_ctx::{CreateUserWorkerResult, UserWorkerOptions, WorkerPoolMsg};
1+
use crate::worker_ctx::{CreateUserWorkerResult, UserWorkerMsgs, UserWorkerOptions};
22

33
use deno_core::error::{custom_error, type_error, AnyError};
44
use deno_core::futures::stream::Peekable;
@@ -49,7 +49,7 @@ pub async fn op_user_worker_create(
4949
env_vars_vec: Vec<(String, String)>,
5050
) -> Result<String, AnyError> {
5151
let op_state = state.borrow();
52-
let tx = op_state.borrow::<mpsc::UnboundedSender<WorkerPoolMsg>>();
52+
let tx = op_state.borrow::<mpsc::UnboundedSender<UserWorkerMsgs>>();
5353
let (result_tx, result_rx) = oneshot::channel::<CreateUserWorkerResult>();
5454

5555
let mut env_vars = HashMap::new();
@@ -65,10 +65,7 @@ pub async fn op_user_worker_create(
6565
import_map_path,
6666
env_vars,
6767
};
68-
tx.send(WorkerPoolMsg::CreateUserWorker(
69-
user_worker_options,
70-
result_tx,
71-
));
68+
tx.send(UserWorkerMsgs::Create(user_worker_options, result_tx));
7269

7370
let result = result_rx.await;
7471
if result.is_err() {
@@ -168,7 +165,7 @@ pub async fn op_user_worker_fetch(
168165
req: UserWorkerRequest,
169166
) -> Result<UserWorkerResponse, AnyError> {
170167
let mut op_state = state.borrow_mut();
171-
let tx = op_state.borrow::<mpsc::UnboundedSender<WorkerPoolMsg>>();
168+
let tx = op_state.borrow::<mpsc::UnboundedSender<UserWorkerMsgs>>();
172169
let (result_tx, result_rx) = oneshot::channel::<Response<Body>>();
173170

174171
let mut body = Body::empty();
@@ -197,7 +194,7 @@ pub async fn op_user_worker_fetch(
197194
}
198195
}
199196

200-
tx.send(WorkerPoolMsg::SendRequestToWorker(key, request, result_tx));
197+
tx.send(UserWorkerMsgs::SendRequest(key, request, result_tx));
201198

202199
let result = result_rx.await;
203200
if result.is_err() {

base/src/server.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,13 @@ use std::future::Future;
66
use std::net::IpAddr;
77
use std::net::Ipv4Addr;
88
use std::net::SocketAddr;
9-
use std::path::Path;
109
use std::pin::Pin;
1110
use std::str;
1211
use std::str::FromStr;
1312
use std::sync::Arc;
1413
use std::task::Poll;
1514
use tokio::net::TcpListener;
1615
use tokio::sync::RwLock;
17-
use url::Url;
1816

1917
struct WorkerService {
2018
worker_ctx: Arc<RwLock<WorkerContext>>,
@@ -63,9 +61,16 @@ pub struct Server {
6361
}
6462

6563
impl Server {
66-
pub async fn new(ip: &str, port: u16, main_service_path: String) -> Result<Self, Error> {
64+
pub async fn new(
65+
ip: &str,
66+
port: u16,
67+
main_service_path: String,
68+
import_map_path: Option<String>,
69+
no_module_cache: bool,
70+
) -> Result<Self, Error> {
6771
// create a worker pool
68-
let worker_pool = WorkerPool::new(main_service_path, None, false).await?;
72+
let worker_pool =
73+
WorkerPool::new(main_service_path, import_map_path, no_module_cache).await?;
6974

7075
let ip = Ipv4Addr::from_str(ip)?;
7176
Ok(Self {

base/src/worker_ctx.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ pub struct UserWorkerOptions {
2929

3030
pub struct MainWorkerOptions {
3131
pub service_path: PathBuf,
32-
pub worker_pool_tx: mpsc::UnboundedSender<WorkerPoolMsg>,
32+
pub user_worker_msgs_tx: mpsc::UnboundedSender<UserWorkerMsgs>,
3333
pub no_module_cache: bool,
3434
pub import_map_path: Option<String>,
3535
}
@@ -39,7 +39,7 @@ impl WorkerContext {
3939
let service_path = options.service_path;
4040
let no_module_cache = options.no_module_cache;
4141
let import_map_path = options.import_map_path;
42-
let worker_pool_tx = options.worker_pool_tx;
42+
let user_worker_msgs_tx = options.user_worker_msgs_tx;
4343

4444
// create a unix socket pair
4545
let (sender_stream, recv_stream) = UnixStream::pair()?;
@@ -49,7 +49,7 @@ impl WorkerContext {
4949
service_path.clone(),
5050
no_module_cache,
5151
import_map_path,
52-
worker_pool_tx.clone(),
52+
user_worker_msgs_tx.clone(),
5353
)?;
5454

5555
// start the worker
@@ -145,9 +145,9 @@ pub struct CreateUserWorkerResult {
145145
}
146146

147147
#[derive(Debug)]
148-
pub enum WorkerPoolMsg {
149-
CreateUserWorker(UserWorkerOptions, oneshot::Sender<CreateUserWorkerResult>),
150-
SendRequestToWorker(String, Request<Body>, oneshot::Sender<Response<Body>>),
148+
pub enum UserWorkerMsgs {
149+
Create(UserWorkerOptions, oneshot::Sender<CreateUserWorkerResult>),
150+
SendRequest(String, Request<Body>, oneshot::Sender<Response<Body>>),
151151
}
152152

153153
pub struct WorkerPool {
@@ -160,14 +160,15 @@ impl WorkerPool {
160160
import_map_path: Option<String>,
161161
no_module_cache: bool,
162162
) -> Result<Self, Error> {
163-
let (worker_pool_tx, mut worker_pool_rx) = mpsc::unbounded_channel::<WorkerPoolMsg>();
163+
let (user_worker_msgs_tx, mut user_worker_msgs_rx) =
164+
mpsc::unbounded_channel::<UserWorkerMsgs>();
164165

165166
let main_path = Path::new(&main_path);
166167
let main_worker_ctx = WorkerContext::new_main_worker(MainWorkerOptions {
167168
service_path: main_path.to_path_buf(),
168169
import_map_path,
169170
no_module_cache,
170-
worker_pool_tx,
171+
user_worker_msgs_tx,
171172
})
172173
.await?;
173174
let main_worker = Arc::new(RwLock::new(main_worker_ctx));
@@ -176,9 +177,9 @@ impl WorkerPool {
176177
let mut user_workers: HashMap<String, Arc<RwLock<WorkerContext>>> = HashMap::new();
177178

178179
loop {
179-
match worker_pool_rx.recv().await {
180+
match user_worker_msgs_rx.recv().await {
180181
None => break,
181-
Some(WorkerPoolMsg::CreateUserWorker(worker_options, tx)) => {
182+
Some(UserWorkerMsgs::Create(worker_options, tx)) => {
182183
let key = worker_options.service_path.display().to_string();
183184
if !user_workers.contains_key(&key) {
184185
// TODO: handle errors
@@ -191,7 +192,7 @@ impl WorkerPool {
191192

192193
tx.send(CreateUserWorkerResult { key });
193194
}
194-
Some(WorkerPoolMsg::SendRequestToWorker(key, req, tx)) => {
195+
Some(UserWorkerMsgs::SendRequest(key, req, tx)) => {
195196
// TODO: handle errors
196197
let worker = user_workers.get(&key).unwrap();
197198
let mut worker = worker.write().await;

cli/src/main.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@ use anyhow::Error;
44
use base::commands::start_server;
55
use clap::builder::FalseyValueParser;
66
use clap::{arg, value_parser, ArgAction, Command};
7-
use std::env;
8-
97
fn cli() -> Command {
108
Command::new("edge-runtime")
119
.about("A server based on Deno runtime, capable of running JavaScript, TypeScript, and WASM services")
@@ -67,13 +65,19 @@ fn main() {
6765
.get_one::<String>("main-service")
6866
.cloned()
6967
.unwrap();
68+
let import_map_path = sub_matches.get_one::<String>("import-map").cloned();
7069
let no_module_cache = sub_matches
7170
.get_one::<bool>("disable-module-cache")
7271
.cloned()
7372
.unwrap();
74-
let import_map_path = sub_matches.get_one::<String>("import-map").cloned();
7573

76-
exit_with_code(start_server(&ip.as_str(), port, main_service_path))
74+
exit_with_code(start_server(
75+
&ip.as_str(),
76+
port,
77+
main_service_path,
78+
import_map_path,
79+
no_module_cache,
80+
))
7781
}
7882
_ => {
7983
// unrecognized command

0 commit comments

Comments
 (0)