Skip to content

Commit a939135

Browse files
authored
feat: add EdgeRuntime.userWorkers.memStats (#560)
* feat: add `EdgeRuntime.userWorkers.memStats` * chore: update `rust-toolchain.toml`
1 parent d8361d0 commit a939135

File tree

10 files changed

+76
-8
lines changed

10 files changed

+76
-8
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/base/src/worker/pool.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use crate::server::ServerFlags;
33
use crate::worker::WorkerSurfaceBuilder;
44

55
use anyhow::{anyhow, bail, Context, Error};
6+
use base_mem_check::WorkerHeapStatistics;
67
use deno_config::JsxImportSourceConfig;
78
use either::Either::Left;
89
use enum_as_inner::EnumAsInner;
@@ -449,6 +450,7 @@ impl WorkerPool {
449450
permit: permit.map(Arc::new),
450451
status: status.clone(),
451452
exit: surface.exit,
453+
mem_check: surface.mem_check.clone(),
452454
cancel,
453455
};
454456

@@ -657,6 +659,23 @@ impl WorkerPool {
657659
}
658660
}
659661
}
662+
663+
fn memory_usage(&self, tx: Sender<HashMap<Uuid, Option<WorkerHeapStatistics>>>) {
664+
let mem_checks = self
665+
.user_workers
666+
.iter()
667+
.map(|it| (it.0.clone(), it.1.mem_check.clone()))
668+
.collect::<Vec<_>>();
669+
670+
drop(tokio::task::spawn_blocking(move || {
671+
let mut results = HashMap::new();
672+
for (uuid, mem_check) in mem_checks {
673+
results.insert(uuid, mem_check.read().ok().map(|it| it.current.clone()));
674+
}
675+
676+
let _ = tx.send(results);
677+
}));
678+
}
660679
}
661680

662681
pub async fn create_user_worker_pool(
@@ -750,6 +769,10 @@ pub async fn create_user_worker_pool(
750769
break;
751770
}
752771
}
772+
773+
Some(UserWorkerMsgs::InqueryMemoryUsage(tx)) => {
774+
worker_pool.memory_usage(tx);
775+
}
753776
}
754777
}
755778
}

crates/base/src/worker/worker_inner.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use crate::server::ServerFlags;
44
use crate::worker::utils::{get_event_metadata, send_event_if_event_worker_available};
55

66
use anyhow::Error;
7+
use base_mem_check::MemCheckState;
78
use base_rt::error::CloneableError;
89
use deno_core::unsync::MaskFutureAsSend;
910
use futures_util::FutureExt;
@@ -17,7 +18,7 @@ use sb_workers::context::{
1718
WorkerRequestMsg,
1819
};
1920
use std::future::ready;
20-
use std::sync::Arc;
21+
use std::sync::{Arc, RwLock};
2122
use tokio::io;
2223
use tokio::sync::{mpsc, oneshot};
2324
use tokio::time::Instant;
@@ -183,7 +184,9 @@ impl std::ops::Deref for Worker {
183184
impl Worker {
184185
pub fn start(
185186
self,
186-
booter_signal: oneshot::Sender<Result<(MetricSource, CancellationToken), Error>>,
187+
booter_signal: oneshot::Sender<
188+
Result<(MetricSource, Arc<RwLock<MemCheckState>>, CancellationToken), Error>,
189+
>,
187190
exit: WorkerExit,
188191
) {
189192
let worker_name = self.worker_name.clone();
@@ -242,7 +245,11 @@ impl Worker {
242245
}
243246
};
244247

245-
let _ = booter_signal.send(Ok((metric_src, runtime.drop_token.clone())));
248+
let _ = booter_signal.send(Ok((
249+
metric_src,
250+
runtime.mem_check_state(),
251+
runtime.drop_token.clone(),
252+
)));
246253
let supervise_fut = match imp.clone().supervise(&mut runtime) {
247254
Some(v) => v.boxed(),
248255
None if worker_kind.is_user_worker() => return None,
@@ -325,4 +332,5 @@ pub struct WorkerSurface {
325332
pub msg_tx: mpsc::UnboundedSender<WorkerRequestMsg>,
326333
pub exit: WorkerExit,
327334
pub cancel: CancellationToken,
335+
pub mem_check: Arc<RwLock<MemCheckState>>,
328336
}

crates/base/src/worker/worker_surface_creation.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
use std::{
22
path::{Path, PathBuf},
3-
sync::Arc,
3+
sync::{Arc, RwLock},
44
};
55

66
use anyhow::Context;
7+
use base_mem_check::MemCheckState;
78
use deno_config::JsxImportSourceConfig;
89
use either::Either;
910
use graph::{DecoratorType, EszipPayloadKind};
@@ -329,8 +330,9 @@ impl WorkerSurfaceBuilder {
329330
worker_builder_hook,
330331
} = self;
331332

332-
let (worker_boot_result_tx, worker_boot_result_rx) =
333-
oneshot::channel::<Result<(MetricSource, CancellationToken), anyhow::Error>>();
333+
let (worker_boot_result_tx, worker_boot_result_rx) = oneshot::channel::<
334+
Result<(MetricSource, Arc<RwLock<MemCheckState>>, CancellationToken), anyhow::Error>,
335+
>();
334336

335337
let flags = flags.unwrap_or_default();
336338
let init_opts = init_opts.context("init_opts must be specified")?;
@@ -377,7 +379,7 @@ impl WorkerSurfaceBuilder {
377379

378380
// wait for worker to be successfully booted
379381
match worker_boot_result_rx.await? {
380-
Ok((metric, cancel)) => {
382+
Ok((metric, mem_check_state, cancel)) => {
381383
let elapsed = cx.worker_boot_start_time.elapsed().as_millis();
382384

383385
send_event_if_event_worker_available(
@@ -393,6 +395,7 @@ impl WorkerSurfaceBuilder {
393395
msg_tx: worker_req_tx,
394396
exit,
395397
cancel,
398+
mem_check: mem_check_state,
396399
})
397400
}
398401

ext/workers/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ fs.workspace = true
1919

2020
sb_core.workspace = true
2121
sb_event_worker.workspace = true
22+
base_mem_check.workspace = true
2223

2324
anyhow.workspace = true
2425
uuid.workspace = true

ext/workers/context.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use anyhow::{anyhow, Error};
2+
use base_mem_check::{MemCheckState, WorkerHeapStatistics};
23
use deno_config::JsxImportSourceConfig;
34
use deno_core::FastString;
45
use enum_as_inner::EnumAsInner;
@@ -10,6 +11,7 @@ use sb_core::{MetricSource, SharedMetricSource};
1011
use sb_event_worker::events::{UncaughtExceptionEvent, WorkerEventWithMetadata};
1112
use std::path::PathBuf;
1213
use std::sync::atomic::AtomicUsize;
14+
use std::sync::RwLock;
1315
use std::{collections::HashMap, sync::Arc};
1416
use tokio::sync::mpsc::unbounded_channel;
1517
use tokio::sync::{mpsc, oneshot, Mutex, Notify, OwnedSemaphorePermit};
@@ -121,6 +123,7 @@ pub struct UserWorkerProfile {
121123
pub cancel: CancellationToken,
122124
pub status: TimingStatus,
123125
pub exit: WorkerExit,
126+
pub mem_check: Arc<RwLock<MemCheckState>>,
124127
}
125128

126129
#[derive(Debug, Clone)]
@@ -238,6 +241,7 @@ pub enum UserWorkerMsgs {
238241
),
239242
Idle(Uuid),
240243
Shutdown(Uuid),
244+
InqueryMemoryUsage(oneshot::Sender<HashMap<Uuid, Option<WorkerHeapStatistics>>>),
241245
}
242246

243247
pub type SendRequestResult = (Response<Body>, mpsc::UnboundedSender<()>);

ext/workers/lib.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use crate::context::{
66
WorkerRuntimeOpts,
77
};
88
use anyhow::Error;
9+
use base_mem_check::WorkerHeapStatistics;
910
use context::SendRequestResult;
1011
use deno_config::JsxImportSourceConfig;
1112
use deno_core::error::{custom_error, type_error, AnyError};
@@ -31,6 +32,7 @@ use once_cell::sync::Lazy;
3132
use sb_core::conn_sync::ConnWatcher;
3233
use serde::{Deserialize, Serialize};
3334
use std::cell::RefCell;
35+
use std::collections::HashMap;
3436
use std::path::PathBuf;
3537
use std::pin::Pin;
3638
use std::rc::Rc;
@@ -45,6 +47,7 @@ deno_core::extension!(
4547
op_user_worker_create,
4648
op_user_worker_fetch_build,
4749
op_user_worker_fetch_send,
50+
op_user_worker_mem_stats,
4851
],
4952
esm_entry_point = "ext:sb_user_workers/user_workers.js",
5053
esm = ["user_workers.js",]
@@ -579,6 +582,24 @@ pub async fn op_user_worker_fetch_send(
579582
Ok(response)
580583
}
581584

585+
#[op2(async)]
586+
#[serde]
587+
pub async fn op_user_worker_mem_stats(
588+
state: Rc<RefCell<OpState>>,
589+
) -> Result<HashMap<Uuid, Option<WorkerHeapStatistics>>, Error> {
590+
let mem_rx = {
591+
let op_state = state.borrow();
592+
let tx = op_state.borrow::<mpsc::UnboundedSender<UserWorkerMsgs>>();
593+
594+
let (mem_tx, mem_rx) = oneshot::channel();
595+
let _ = tx.send(UserWorkerMsgs::InqueryMemoryUsage(mem_tx));
596+
597+
mem_rx
598+
};
599+
600+
Ok(mem_rx.await?)
601+
}
602+
582603
/// Wraps a [`mpsc::Receiver`] in a [`Stream`] that can be used as a Hyper [`Body`].
583604
pub struct BodyStream(pub mpsc::Receiver<Result<bytes::Bytes, Error>>);
584605

ext/workers/user_workers.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ const { TypeError } = primordials;
99
const {
1010
op_user_worker_fetch_send,
1111
op_user_worker_create,
12+
op_user_worker_mem_stats,
1213
} = ops;
1314

1415
const NO_SUPABASE_TAG_WARN_MSG = `Unable to find the supabase tag from the request instance.\n\
@@ -140,6 +141,10 @@ class UserWorker {
140141

141142
return new UserWorker(key);
142143
}
144+
145+
static async memStats() {
146+
return await op_user_worker_mem_stats();
147+
}
143148
}
144149

145150
const SUPABASE_USER_WORKERS = UserWorker;

rust-toolchain.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
[toolchain]
2-
channel = "1.79.0"
2+
channel = "1.79.0"
3+
components = ["rustfmt", "clippy"]

types/global.d.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ declare namespace EdgeRuntime {
9696

9797
fetch(request: Request, options?: UserWorkerFetchOptions): Promise<Response>;
9898
static create(opts: UserWorkerCreateOptions): Promise<UserWorker>;
99+
static memStats(): Promise<HeapStatistics>;
99100
}
100101

101102
export function scheduleTermination(): void;

0 commit comments

Comments
 (0)