Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: expose useful props to Worker to be able to reuse the workers in js land #382

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 20 additions & 6 deletions crates/base/src/worker/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,11 +263,13 @@ impl WorkerPool {
.as_user_worker()
.map_or(false, |it| !is_oneshot_policy && it.force_create);

if let Some(ref active_worker_uuid) = self.maybe_active_worker(&service_path, force_create)
if let Some((ref active_worker_uuid, profile)) =
self.maybe_active_worker(&service_path, force_create)
{
if tx
.send(Ok(CreateUserWorkerResult {
key: *active_worker_uuid,
token: profile.cancel.clone(),
}))
.is_err()
{
Expand Down Expand Up @@ -448,7 +450,7 @@ impl WorkerPool {
permit: permit.map(Arc::new),
status: status.clone(),
exit: surface.exit,
cancel,
cancel: cancel.clone(),
};

if worker_pool_msgs_tx
Expand All @@ -457,7 +459,13 @@ impl WorkerPool {
{
error!("user worker msgs receiver dropped")
}
if tx.send(Ok(CreateUserWorkerResult { key: uuid })).is_err() {
if tx
.send(Ok(CreateUserWorkerResult {
key: uuid,
token: cancel,
}))
.is_err()
{
error!("main worker receiver dropped")
};
}
Expand Down Expand Up @@ -632,7 +640,11 @@ impl WorkerPool {
}
}

fn maybe_active_worker(&mut self, service_path: &String, force_create: bool) -> Option<Uuid> {
fn maybe_active_worker(
&mut self,
service_path: &String,
force_create: bool,
) -> Option<(Uuid, &UserWorkerProfile)> {
if force_create {
return None;
}
Expand All @@ -648,11 +660,13 @@ impl WorkerPool {
.get(&worker_uuid)
.map(|it| it.status.is_retired.clone())
{
Some(is_retired) if !is_retired.is_raised() => Some(worker_uuid),
Some(is_retired) if !is_retired.is_raised() => {
Some((worker_uuid, self.user_workers.get(&worker_uuid).unwrap()))
}

_ => {
self.retire(&worker_uuid);
self.maybe_active_worker(service_path, force_create)
self.maybe_active_worker(service_path, false)
}
}
}
Expand Down
4 changes: 1 addition & 3 deletions crates/base/src/worker/worker_surface_creation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use sb_workers::context::{
WorkerExit, WorkerRequestMsg, WorkerRuntimeOpts,
};
use tokio::sync::{mpsc, oneshot};
use tokio_util::sync::CancellationToken;

use crate::{inspector_server::Inspector, server::ServerFlags};

Expand Down Expand Up @@ -329,8 +328,7 @@ impl WorkerSurfaceBuilder {
worker_builder_hook,
} = self;

let (worker_boot_result_tx, worker_boot_result_rx) =
oneshot::channel::<Result<(MetricSource, CancellationToken), anyhow::Error>>();
let (worker_boot_result_tx, worker_boot_result_rx) = oneshot::channel();

let flags = flags.unwrap_or_default();
let init_opts = init_opts.context("init_opts must be specified")?;
Expand Down
136 changes: 136 additions & 0 deletions examples/main-session/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// @ts-ignore
import { STATUS_CODE } from 'https://deno.land/std/http/status.ts';

const SESSION_HEADER_NAME = 'X-Edge-Runtime-Session-Id';
const WORKERS = new Map<string, EdgeRuntime.UserWorker>();

setInterval(() => {
const shouldBeRemoved: string[] = [];

for (const [uuid, worker] of WORKERS) {
if (!worker.active) {
shouldBeRemoved.push(uuid);
}
}

for (const uuid of shouldBeRemoved) {
console.log("deleted: ", uuid);
WORKERS.delete(uuid);
}
}, 2500);

console.log('main function started (session mode)');

Deno.serve(async (req: Request) => {
const headers = new Headers({
'Content-Type': 'application/json',
});

const url = new URL(req.url);
const { pathname } = url;

// handle health checks
if (pathname === '/_internal/health') {
return new Response(
JSON.stringify({ 'message': 'ok' }),
{
status: STATUS_CODE.OK,
headers,
},
);
}

if (pathname === '/_internal/metric') {
const metric = await EdgeRuntime.getRuntimeMetrics();
return Response.json(metric);
}

const path_parts = pathname.split('/');
const service_name = path_parts[1];

if (!service_name || service_name === '') {
const error = { msg: 'missing function name in request' };
return new Response(
JSON.stringify(error),
{ status: STATUS_CODE.BadRequest, headers: { 'Content-Type': 'application/json' } },
);
}

const servicePath = `./examples/${service_name}`;
const createWorker = async (): Promise<EdgeRuntime.UserWorker> => {
const memoryLimitMb = 150;
const workerTimeoutMs = 30 * 1000;
const noModuleCache = false;

const importMapPath = null;
const envVarsObj = Deno.env.toObject();
const envVars = Object.keys(envVarsObj).map((k) => [k, envVarsObj[k]]);
const forceCreate = false;
const netAccessDisabled = false;
const cpuTimeSoftLimitMs = 10000;
const cpuTimeHardLimitMs = 20000;

return await EdgeRuntime.userWorkers.create({
servicePath,
memoryLimitMb,
workerTimeoutMs,
noModuleCache,
importMapPath,
envVars,
forceCreate,
netAccessDisabled,
cpuTimeSoftLimitMs,
cpuTimeHardLimitMs,
});
};

const callWorker = async () => {

try {
let worker: EdgeRuntime.UserWorker | null = null;

if (req.headers.get(SESSION_HEADER_NAME)) {
const sessionId = req.headers.get(SESSION_HEADER_NAME)!;
const complexSessionId = `${servicePath}/${sessionId}`;

const maybeWorker = WORKERS.get(complexSessionId);

if (maybeWorker && maybeWorker.active) {
worker = maybeWorker;
}
}

if (!worker) {
worker = await createWorker();
}

const resp = await worker.fetch(req);

if (resp.headers.has(SESSION_HEADER_NAME)) {
const sessionIdFromWorker = resp.headers.get(SESSION_HEADER_NAME)!;
const complexSessionId = `${servicePath}/${sessionIdFromWorker}`;

WORKERS.set(complexSessionId, worker);
}

return resp;
} catch (e) {
console.error(e);

if (e instanceof Deno.errors.WorkerRequestCancelled) {
headers.append('Connection', 'close');
}

const error = { msg: e.toString() };
return new Response(
JSON.stringify(error),
{
status: STATUS_CODE.InternalServerError,
headers,
},
);
}
};

return callWorker();
});
63 changes: 63 additions & 0 deletions examples/serve-session/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// @ts-ignore
import { STATUS_CODE } from "https://deno.land/std/http/status.ts";

type SessionStroage = { [key: string]: unknown };

const SESSION_HEADER_NAME = "X-Edge-Runtime-Session-Id";
const SESSIONS = new Map<string, SessionStroage>();

function makeNewSession(): [string, SessionStroage] {
const uuid = crypto.randomUUID();
const storage = {};

SESSIONS.set(uuid, storage);
return [uuid, storage];
}

function getSessionStorageFromRequest(req: Request): SessionStroage | void {
const maybeSessionId = req.headers.get(SESSION_HEADER_NAME);

if (typeof maybeSessionId === "string" && SESSIONS.has(maybeSessionId)) {
return SESSIONS.get(maybeSessionId);
}
}

export default {
fetch(req: Request) {
const headers = new Headers();
let storage: SessionStroage;

if (req.headers.get(SESSION_HEADER_NAME)) {
const maybeStorage = getSessionStorageFromRequest(req);

if (!maybeStorage) {
return new Response(null, {
status: STATUS_CODE.BadRequest
});
}

storage = maybeStorage;
} else {
const [sessionId, newStorage] = makeNewSession();

headers.set(SESSION_HEADER_NAME, sessionId);

storage = newStorage;
}

if (!("count" in storage)) {
storage["count"] = 0;
} else {
(storage["count"] as number)++;
}

const count = storage["count"] as number;

return new Response(
JSON.stringify({ count }),
{
headers
}
);
}
}
1 change: 1 addition & 0 deletions ext/workers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ deno_core.workspace = true
deno_http.workspace = true
deno_config.workspace = true

base_rt.workspace = true
http_utils.workspace = true
graph.workspace = true
fs.workspace = true
Expand Down
1 change: 1 addition & 0 deletions ext/workers/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ pub type SendRequestResult = (Response<Body>, mpsc::UnboundedSender<()>);
#[derive(Debug)]
pub struct CreateUserWorkerResult {
pub key: Uuid,
pub token: CancellationToken,
}

#[derive(Debug)]
Expand Down
Loading
Loading