diff --git a/README.md b/README.md
index c843106..97a899f 100644
--- a/README.md
+++ b/README.md
@@ -1,21 +1,30 @@
# Warp
-**Warp** is a simple tool that allows your locally running HTTP(s) servers to have a public URL, serving as an easy-to-self-host alternative to services like `ngrok`. Warp is implemented in Deno with the goal of providing flexibility and minimal dependencies.
+**Warp** is a simple tool that allows your locally running HTTP(s) servers to
+have a public URL, serving as an easy-to-self-host alternative to services like
+`ngrok`. Warp is implemented in Deno with the goal of providing flexibility and
+minimal dependencies.
The project has two main components:
-- **Server**: Deployable on a server, it connects to the outside world and is accessible from any domain.
-- **Client**: Runs locally to connect a given HTTP endpoint running on a local or non-public network.
+- **Server**: Deployable on a server, it connects to the outside world and is
+ accessible from any domain.
+- **Client**: Runs locally to connect a given HTTP endpoint running on a local
+ or non-public network.
## Server
-The Warp server opens a single HTTP port to which the Warp client connects and upgrades to a WebSocket connection. Each request to this HTTP port is forwarded (based on the client's HOST header) to the corresponding connected Warp client connection, which then serves the request.
+The Warp server opens a single HTTP port to which the Warp client connects and
+upgrades to a WebSocket connection. Each request to this HTTP port is forwarded
+(based on the client's HOST header) to the corresponding connected Warp client
+connection, which then serves the request.
### Usage
-To start the Warp server, import the `serve` function from the Warp package and call it with the appropriate configuration.
+To start the Warp server, import the `serve` function from the Warp package and
+call it with the appropriate configuration.
#### Example
@@ -35,11 +44,13 @@ serve({ port, apiKeys });
## Client
-The Warp client connects to the Warp server. Upon connection, the client shares the given API key and the domain it wants to receive requests for.
+The Warp client connects to the Warp server. Upon connection, the client shares
+the given API key and the domain it wants to receive requests for.
### Usage
-To connect a client to the Warp server, import the `connect` function from the Warp package and call it with the appropriate configuration.
+To connect a client to the Warp server, import the `connect` function from the
+Warp package and call it with the appropriate configuration.
#### Example
@@ -69,13 +80,15 @@ closed.then(() => {
#### Parameters
- `domain`: The domain name that will be used to access your localhost service.
-- `localAddr`: The local address of the service you want to expose (e.g., `http://localhost:3000`).
+- `localAddr`: The local address of the service you want to expose (e.g.,
+ `http://localhost:3000`).
- `server`: The WebSocket URL of your Warp server (e.g., `wss://YOUR_SERVER`).
- `apiKey`: The apiKey for connecting to the Warp server.
#### Return Values
-- `registered`: A promise that resolves when the client has successfully registered with the server.
+- `registered`: A promise that resolves when the client has successfully
+ registered with the server.
- `closed`: A promise that resolves when the connection to the server is closed.
## Example Workflow
@@ -124,6 +137,9 @@ const apiKey = "API_KEY";
### Common Issues
-- **Invalid API Key**: Ensure that the API key you are using is listed in the `apiKeys` array on the server.
-- **Connection Refused**: Check that the server is running and accessible at the specified WebSocket URL.
-- **Domain Not Accessible**: Ensure that the domain name is correctly configured and pointing to the Warp server.
+- **Invalid API Key**: Ensure that the API key you are using is listed in the
+ `apiKeys` array on the server.
+- **Connection Refused**: Check that the server is running and accessible at the
+ specified WebSocket URL.
+- **Domain Not Accessible**: Ensure that the domain name is correctly configured
+ and pointing to the Warp server.
diff --git a/channel.ts b/channel.ts
index 17a2a63..04b413f 100644
--- a/channel.ts
+++ b/channel.ts
@@ -1,149 +1,163 @@
import { Queue } from "./queue.ts";
export interface Channel {
- closed: Promise;
- signal: AbortSignal;
- close(): void;
- send(value: T): Promise;
- recv(signal?: AbortSignal): AsyncIterableIterator;
+ closed: Promise;
+ signal: AbortSignal;
+ close(): void;
+ send(value: T): Promise;
+ recv(signal?: AbortSignal): AsyncIterableIterator;
}
export const link = (...signals: AbortSignal[]): AbortSignal => {
- const ctrl = new AbortController();
- for (const signal of signals) {
- signal.onabort = (evt) => {
- if (!ctrl.signal.aborted) {
- ctrl.abort(evt);
- }
- }
- }
- return ctrl.signal;
-}
+ const ctrl = new AbortController();
+ for (const signal of signals) {
+ signal.onabort = (evt) => {
+ if (!ctrl.signal.aborted) {
+ ctrl.abort(evt);
+ }
+ };
+ }
+ return ctrl.signal;
+};
export class ClosedChannelError extends Error {
- constructor() {
- super("Channel is closed");
- }
+ constructor() {
+ super("Channel is closed");
+ }
}
-export const ifClosedChannel = (cb: () => Promise | void) => (err: unknown) => {
+export const ifClosedChannel =
+ (cb: () => Promise | void) => (err: unknown) => {
if (err instanceof ClosedChannelError) {
- return cb();
+ return cb();
}
throw err;
-}
+ };
-export const ignoreIfClosed = ifClosedChannel(() => { })
+export const ignoreIfClosed = ifClosedChannel(() => {});
export const makeChan = (): Channel => {
- const queue: Queue<{ value: T, resolve: () => void }> = new Queue();
- const ctrl = new AbortController();
- const abortPromise = Promise.withResolvers();
- ctrl.signal.onabort = () => {
- abortPromise.resolve();
- }
+ const queue: Queue<{ value: T; resolve: () => void }> = new Queue();
+ const ctrl = new AbortController();
+ const abortPromise = Promise.withResolvers();
+ ctrl.signal.onabort = () => {
+ abortPromise.resolve();
+ };
- const send = (value: T): Promise => {
- return new Promise((resolve, reject) => {
- if (ctrl.signal.aborted) reject(new ClosedChannelError());
- queue.push({ value, resolve });
- });
- };
+ const send = (value: T): Promise => {
+ return new Promise((resolve, reject) => {
+ if (ctrl.signal.aborted) reject(new ClosedChannelError());
+ queue.push({ value, resolve });
+ });
+ };
- const close = () => {
- ctrl.abort();
- };
+ const close = () => {
+ ctrl.abort();
+ };
- const recv = async function* (signal?: AbortSignal): AsyncIterableIterator {
- const linked = signal ? link(ctrl.signal, signal) : ctrl.signal;
- while (true) {
- if (linked.aborted) {
- return;
- }
- try {
- const next = await queue.pop({ signal: linked });
- next.resolve();
- yield next.value;
- } catch (_err) {
- if (linked.aborted) {
- return;
- }
- throw _err;
- }
+ const recv = async function* (
+ signal?: AbortSignal,
+ ): AsyncIterableIterator {
+ const linked = signal ? link(ctrl.signal, signal) : ctrl.signal;
+ while (true) {
+ if (linked.aborted) {
+ return;
+ }
+ try {
+ const next = await queue.pop({ signal: linked });
+ next.resolve();
+ yield next.value;
+ } catch (_err) {
+ if (linked.aborted) {
+ return;
}
- };
+ throw _err;
+ }
+ }
+ };
- return { send, recv, close, signal: ctrl.signal, closed: abortPromise.promise };
+ return {
+ send,
+ recv,
+ close,
+ signal: ctrl.signal,
+ closed: abortPromise.promise,
+ };
};
export interface DuplexChannel {
- in: Channel
- out: Channel
+ in: Channel;
+ out: Channel;
}
-export const makeWebSocket = (socket: WebSocket, parse: boolean = true): Promise> => {
- const sendChan = makeChan();
- const recvChan = makeChan();
- const ch = Promise.withResolvers>();
- socket.onclose = () => {
- sendChan.close();
- recvChan.close();
+export const makeWebSocket = (
+ socket: WebSocket,
+ parse: boolean = true,
+): Promise> => {
+ const sendChan = makeChan();
+ const recvChan = makeChan();
+ const ch = Promise.withResolvers>();
+ socket.onclose = () => {
+ sendChan.close();
+ recvChan.close();
+ };
+ socket.onerror = (err) => {
+ socket.close();
+ ch.reject(err);
+ };
+ socket.onmessage = async (msg) => {
+ let eventData = msg.data;
+ const target = msg?.target;
+ if (
+ target && "binaryType" in target &&
+ target.binaryType === "blob" && typeof eventData === "object" &&
+ "text" in eventData
+ ) {
+ eventData = await eventData.text();
}
- socket.onerror = (err) => {
- socket.close();
- ch.reject(err);
+ const message = parse ? JSON.parse(eventData) : eventData;
+ await recvChan.send(message);
+ };
+ socket.onopen = async () => {
+ ch.resolve({ in: recvChan, out: sendChan });
+ for await (const message of sendChan.recv()) {
+ try {
+ socket.send(parse ? JSON.stringify(message) : message as ArrayBuffer);
+ } catch (_err) {
+ console.error("error sending message through socket", message);
+ }
}
- socket.onmessage = async (msg) => {
- let eventData = msg.data;
- const target = msg?.target;
- if (
- target && "binaryType" in target &&
- target.binaryType === "blob" && typeof eventData === "object" &&
- "text" in eventData
- ) {
- eventData = await eventData.text();
- }
- const message = parse ? JSON.parse(eventData) : eventData;
- await recvChan.send(message);
- }
- socket.onopen = async () => {
- ch.resolve({ in: recvChan, out: sendChan });
- for await (const message of sendChan.recv()) {
- try {
- socket.send(parse ? JSON.stringify(message) : message as ArrayBuffer);
- } catch (_err) {
- console.error("error sending message through socket", message);
- }
- }
- socket.close();
- }
- return ch.promise;
-}
+ socket.close();
+ };
+ return ch.promise;
+};
-export const makeReadableStream = (ch: Channel): ReadableStream => {
- return new ReadableStream({
- async start(controller) {
- for await (const content of ch.recv()) {
- controller.enqueue(content);
- }
- controller.close();
- },
- cancel() {
- ch.close();
- },
- })
-}
+export const makeReadableStream = (
+ ch: Channel,
+): ReadableStream => {
+ return new ReadableStream({
+ async start(controller) {
+ for await (const content of ch.recv()) {
+ controller.enqueue(content);
+ }
+ controller.close();
+ },
+ cancel() {
+ ch.close();
+ },
+ });
+};
export const makeChanStream = (stream: ReadableStream): Channel => {
- const chan = makeChan();
+ const chan = makeChan();
- // Consume the transformed stream to trigger the pipeline
- const reader = stream.getReader();
- const processStream = async () => {
- while (true) {
- const { done, value } = await reader.read();
- if (done) break;
- await chan.send(value);
- }
- chan.close();
- };
- processStream().catch(console.error);
- return chan;
+ // Consume the transformed stream to trigger the pipeline
+ const reader = stream.getReader();
+ const processStream = async () => {
+ while (true) {
+ const { done, value } = await reader.read();
+ if (done) break;
+ await chan.send(value);
+ }
+ chan.close();
+ };
+ processStream().catch(console.error);
+ return chan;
};
diff --git a/client.ts b/client.ts
index 3437903..590d593 100644
--- a/client.ts
+++ b/client.ts
@@ -11,10 +11,10 @@ import type { ClientMessage, ClientState, ServerMessage } from "./messages.ts";
* @property {string} localAddr - The local address for the WebSocket connection.
*/
export interface ConnectOptions {
- apiKey: string;
- domain: string;
- server: string;
- localAddr: string;
+ apiKey: string;
+ domain: string;
+ server: string;
+ localAddr: string;
}
/**
@@ -24,8 +24,8 @@ export interface ConnectOptions {
* @property {Promise} registered - A promise that resolves when the connection is registered.
*/
export interface Connected {
- closed: Promise;
- registered: Promise;
+ closed: Promise;
+ registered: Promise;
}
/**
@@ -34,48 +34,49 @@ export interface Connected {
* @returns {Promise} A promise that resolves with the connection status.
*/
export const connect = async (opts: ConnectOptions): Promise => {
- const closed = Promise.withResolvers();
- const registered = Promise.withResolvers();
- const client = typeof Deno.createHttpClient === "function" ? Deno.createHttpClient({
- allowHost: true,
- proxy: {
- url: opts.localAddr,
- }
- }) : undefined;
+ const closed = Promise.withResolvers();
+ const registered = Promise.withResolvers();
+ const client = typeof Deno.createHttpClient === "function"
+ ? Deno.createHttpClient({
+ allowHost: true,
+ proxy: {
+ url: opts.localAddr,
+ },
+ })
+ : undefined;
- const socket = new WebSocket(`${opts.server}/_connect`);
- const ch = await makeWebSocket(socket);
- await ch.out.send({
- id: crypto.randomUUID(),
- type: "register",
- apiKey: opts.apiKey,
- domain: opts.domain,
- });
- const requestBody: Record> = {};
- const wsMessages: Record> = {};
+ const socket = new WebSocket(`${opts.server}/_connect`);
+ const ch = await makeWebSocket(socket);
+ await ch.out.send({
+ id: crypto.randomUUID(),
+ type: "register",
+ apiKey: opts.apiKey,
+ domain: opts.domain,
+ });
+ const requestBody: Record> = {};
+ const wsMessages: Record> = {};
- (async () => {
- const state: ClientState = {
- client,
- localAddr: opts.localAddr,
- live: false,
- requestBody,
- wsMessages,
- ch,
- }
- for await (const message of ch.in.recv()) {
- try {
- await handleServerMessage(state, message);
- if (state.live) {
- registered.resolve();
- }
- } catch (err) {
- console.error(new Date(), "error handling message", err);
- break;
- }
+ (async () => {
+ const state: ClientState = {
+ client,
+ localAddr: opts.localAddr,
+ live: false,
+ requestBody,
+ wsMessages,
+ ch,
+ };
+ for await (const message of ch.in.recv()) {
+ try {
+ await handleServerMessage(state, message);
+ if (state.live) {
+ registered.resolve();
}
- closed.resolve();
- })()
- return { closed: closed.promise, registered: registered.promise };
-}
-
+ } catch (err) {
+ console.error(new Date(), "error handling message", err);
+ break;
+ }
+ }
+ closed.resolve();
+ })();
+ return { closed: closed.promise, registered: registered.promise };
+};
diff --git a/deno.lock b/deno.lock
index b89119f..9fab015 100644
--- a/deno.lock
+++ b/deno.lock
@@ -507,10 +507,5 @@
"https://esm.sh/v135/utils-merge@1.0.1/denonext/utils-merge.mjs": "0a8a3925b476b5a94da272bb8fbb63728b725185a80b28492825dfb3ca3a3542",
"https://esm.sh/v135/vary@1.1.2/denonext/vary.mjs": "119fef5a9e3d5a74c698ded1a4edd8acfdf75db69c6daabf631bd773964ac860",
"https://esm.sh/v135/ws@8.17.0/denonext/ws.mjs": "098384c6d6b787c4074dae8141a61bfa93a271ccb699b4b426c75141c9ef5560"
- },
- "workspace": {
- "dependencies": [
- "npm:daisyui@4.4.19"
- ]
}
}
diff --git a/handlers.client.ts b/handlers.client.ts
index a46beba..1da6030 100644
--- a/handlers.client.ts
+++ b/handlers.client.ts
@@ -1,5 +1,23 @@
-import { type Channel, ignoreIfClosed, makeChan, makeChanStream, makeReadableStream, makeWebSocket } from "./channel.ts";
-import type { ClientMessage, ClientState, ErrorMessage, RegisteredMessage, RequestDataEndMessage, RequestDataMessage, RequestStartMessage, ServerMessage, ServerMessageHandler, WSMessage } from "./messages.ts";
+import {
+ type Channel,
+ ignoreIfClosed,
+ makeChan,
+ makeChanStream,
+ makeReadableStream,
+ makeWebSocket,
+} from "./channel.ts";
+import type {
+ ClientMessage,
+ ClientState,
+ ErrorMessage,
+ RegisteredMessage,
+ RequestDataEndMessage,
+ RequestDataMessage,
+ RequestStartMessage,
+ ServerMessage,
+ ServerMessageHandler,
+ WSMessage,
+} from "./messages.ts";
import { ensureChunked } from "./server.ts";
/**
@@ -7,64 +25,77 @@ import { ensureChunked } from "./server.ts";
* @param {ClientState} state - The client state.
*/
const registered: ServerMessageHandler = (state) => {
- state.live = true;
-}
+ state.live = true;
+};
/**
* Handler for the 'error' server message.
* @param {ClientState} state - The client state.
*/
const error: ServerMessageHandler = (state) => {
- state.live = false;
-}
+ state.live = false;
+};
/**
* Handler for the 'request-start' server message.
* @param {ClientState} state - The client state.
* @param {RequestStartMessage} message - The message data.
*/
-const onRequestStart: ServerMessageHandler = async (state, message) => {
- if (message.headers["upgrade"] === "websocket") {
- await handleWebSocket(message, state);
- return;
- }
- if (!message.hasBody) {
- doFetch(message, state, state.ch.out).catch(ignoreIfClosed);
- } else {
- const bodyData = makeChan();
- state.requestBody[message.id] = bodyData;
- doFetch({ ...message, body: makeReadableStream(bodyData) }, state, state.ch.out).catch(ignoreIfClosed).finally(() => {
- delete state.requestBody[message.id];
- });
- }
-}
+const onRequestStart: ServerMessageHandler = async (
+ state,
+ message,
+) => {
+ if (message.headers["upgrade"] === "websocket") {
+ await handleWebSocket(message, state);
+ return;
+ }
+ if (!message.hasBody) {
+ doFetch(message, state, state.ch.out).catch(ignoreIfClosed);
+ } else {
+ const bodyData = makeChan();
+ state.requestBody[message.id] = bodyData;
+ doFetch(
+ { ...message, body: makeReadableStream(bodyData) },
+ state,
+ state.ch.out,
+ ).catch(ignoreIfClosed).finally(() => {
+ delete state.requestBody[message.id];
+ });
+ }
+};
/**
* Handler for the 'request-data' server message.
* @param {ClientState} state - The client state.
* @param {RequestDataMessage} message - The message data.
*/
-const onRequestData: ServerMessageHandler = async (state, message) => {
- const reqBody = state.requestBody[message.id];
- if (!reqBody) {
- console.info("[req-data] req not found", message.id);
- return;
- }
- await reqBody.send?.(ensureChunked(message.chunk));
-}
+const onRequestData: ServerMessageHandler = async (
+ state,
+ message,
+) => {
+ const reqBody = state.requestBody[message.id];
+ if (!reqBody) {
+ console.info("[req-data] req not found", message.id);
+ return;
+ }
+ await reqBody.send?.(ensureChunked(message.chunk));
+};
/**
* Handler for the 'request-data-end' server message.
* @param {ClientState} state - The client state.
* @param {RequestDataEndMessage} message - The message data.
*/
-const onRequestDataEnd: ServerMessageHandler = (state, message) => {
- const reqBody = state.requestBody[message.id];
- if (!reqBody) {
- return;
- }
- reqBody.close();
-}
+const onRequestDataEnd: ServerMessageHandler = (
+ state,
+ message,
+) => {
+ const reqBody = state.requestBody[message.id];
+ if (!reqBody) {
+ return;
+ }
+ reqBody.close();
+};
/**
* Handler for the 'ws-message' server message.
@@ -72,26 +103,30 @@ const onRequestDataEnd: ServerMessageHandler = (state, me
* @param {WSMessage} message - The message data.
*/
const onWsMessage: ServerMessageHandler = async (state, message) => {
- await state.wsMessages?.[message.id]?.send?.(message.data);
-}
+ await state.wsMessages?.[message.id]?.send?.(message.data);
+};
/**
* Handler for the 'ws-closed' server message.
* @param {ClientState} state - The client state.
* @param {RegisteredMessage} message - The message data.
*/
-const onWsClosed: ServerMessageHandler = (state, message) => {
- const messageChan = state.wsMessages[message.id];
- delete state.wsMessages[message.id];
- messageChan?.close();
-}
+const onWsClosed: ServerMessageHandler = (
+ state,
+ message,
+) => {
+ const messageChan = state.wsMessages[message.id];
+ delete state.wsMessages[message.id];
+ messageChan?.close();
+};
/**
* Handlers for various server message types.
* @type {Record>}
*/
// deno-lint-ignore no-explicit-any
-const handlersByType: Record> = {
+const handlersByType: Record> =
+ {
registered,
error,
"request-start": onRequestStart,
@@ -99,58 +134,61 @@ const handlersByType: Record> =
"request-end": onRequestDataEnd,
"ws-closed": onWsClosed,
"ws-message": onWsMessage,
-}
+ };
/**
* Handles WebSocket connections.
* @param {RequestStartMessage} message - The WebSocket request message.
* @param {ClientState} state - The client state.
*/
-async function handleWebSocket(message: RequestStartMessage, state: ClientState) {
- const ws = new WebSocket(new URL(message.url, state.localAddr));
- try {
- const wsCh = await makeWebSocket(ws, false);
- await state.ch.out.send({
- type: "ws-opened",
+async function handleWebSocket(
+ message: RequestStartMessage,
+ state: ClientState,
+) {
+ const ws = new WebSocket(new URL(message.url, state.localAddr));
+ try {
+ const wsCh = await makeWebSocket(ws, false);
+ await state.ch.out.send({
+ type: "ws-opened",
+ id: message.id,
+ });
+ state.wsMessages[message.id] = wsCh.out;
+ (async () => {
+ try {
+ for await (const data of wsCh.in.recv(state.ch.out.signal)) {
+ await state.ch.out.send({
+ type: "ws-message",
+ data,
id: message.id,
+ });
+ }
+ if (state.ch.out.signal.aborted) {
+ return;
+ }
+ await state.ch.out.send({
+ type: "ws-closed",
+ id: message.id,
});
- state.wsMessages[message.id] = wsCh.out;
- (async () => {
- try {
- for await (const data of wsCh.in.recv(state.ch.out.signal)) {
- await state.ch.out.send({
- type: "ws-message",
- data,
- id: message.id,
- });
- }
- if (state.ch.out.signal.aborted) {
- return;
- }
- await state.ch.out.send({
- type: "ws-closed",
- id: message.id,
- });
- } catch (error) {
- if (state.ch.out.signal.aborted) {
- return;
- }
- await state.ch.out.send({
- type: "ws-closed",
- id: message.id,
- }).catch(ignoreIfClosed);
- throw error;
- } finally {
- delete state.wsMessages[message.id];
- }
- })();
- } catch (err) {
+ } catch (error) {
+ if (state.ch.out.signal.aborted) {
+ return;
+ }
await state.ch.out.send({
- type: "data-end",
- error: err,
- id: message.id
+ type: "ws-closed",
+ id: message.id,
}).catch(ignoreIfClosed);
- }
+ throw error;
+ } finally {
+ delete state.wsMessages[message.id];
+ }
+ })();
+ } catch (err) {
+ await state.ch.out.send({
+ type: "data-end",
+ error: err,
+ id: message.id,
+ }).catch(ignoreIfClosed);
+ }
}
/**
@@ -159,46 +197,50 @@ async function handleWebSocket(message: RequestStartMessage, state: ClientState)
* @param {ClientState} state - The client state.
* @param {Channel} clientCh - The client channel.
*/
-async function doFetch(request: RequestStartMessage & { body?: ReadableStream; }, state: ClientState, clientCh: Channel) {
- // Read from the stream
- const signal = clientCh.signal;
- try {
- const response = await fetch(new URL(request.url, state.localAddr), {
- ...state.client ? { client: state.client } : {},
- method: request.method,
- headers: request.headers,
- body: request.body,
- signal,
- });
- await clientCh.send({
- type: "response-start",
- id: request.id,
- statusCode: response.status,
- statusMessage: response.statusText,
- headers: Object.fromEntries(response.headers.entries()),
- })
- const body = response?.body;
- const stream = body ? makeChanStream(body) : undefined;
- for await (const chunk of stream?.recv(signal) ?? []) {
- await clientCh.send({
- type: "data",
- id: request.id,
- chunk,
- });
- }
- if (signal.aborted) {
- return;
- }
- await clientCh.send({
- type: "data-end",
- id: request.id,
- });
- } catch (err) {
- if (signal.aborted) {
- return;
- }
- throw err;
+async function doFetch(
+ request: RequestStartMessage & { body?: ReadableStream },
+ state: ClientState,
+ clientCh: Channel,
+) {
+ // Read from the stream
+ const signal = clientCh.signal;
+ try {
+ const response = await fetch(new URL(request.url, state.localAddr), {
+ ...state.client ? { client: state.client } : {},
+ method: request.method,
+ headers: request.headers,
+ body: request.body,
+ signal,
+ });
+ await clientCh.send({
+ type: "response-start",
+ id: request.id,
+ statusCode: response.status,
+ statusMessage: response.statusText,
+ headers: Object.fromEntries(response.headers.entries()),
+ });
+ const body = response?.body;
+ const stream = body ? makeChanStream(body) : undefined;
+ for await (const chunk of stream?.recv(signal) ?? []) {
+ await clientCh.send({
+ type: "data",
+ id: request.id,
+ chunk,
+ });
}
+ if (signal.aborted) {
+ return;
+ }
+ await clientCh.send({
+ type: "data-end",
+ id: request.id,
+ });
+ } catch (err) {
+ if (signal.aborted) {
+ return;
+ }
+ throw err;
+ }
}
/**
@@ -206,6 +248,9 @@ async function doFetch(request: RequestStartMessage & { body?: ReadableStream; }
* @param {ClientState} state - The client state.
* @param {ServerMessage} message - The server message.
*/
-export const handleServerMessage: ServerMessageHandler = async (state, message) => {
- await handlersByType?.[message.type]?.(state, message);
-}
+export const handleServerMessage: ServerMessageHandler = async (
+ state,
+ message,
+) => {
+ await handlersByType?.[message.type]?.(state, message);
+};
diff --git a/handlers.server.ts b/handlers.server.ts
index 2941824..148e225 100644
--- a/handlers.server.ts
+++ b/handlers.server.ts
@@ -1,5 +1,14 @@
import { makeReadableStream, makeWebSocket } from "./channel.ts";
-import type { ClientMessage, ClientMessageHandler, DataEndMessage, DataMessage, RegisterMessage, ResponseStartMessage, WSConnectionClosed, WSMessage } from "./messages.ts";
+import type {
+ ClientMessage,
+ ClientMessageHandler,
+ DataEndMessage,
+ DataMessage,
+ RegisterMessage,
+ ResponseStartMessage,
+ WSConnectionClosed,
+ WSMessage,
+} from "./messages.ts";
import { ensureChunked } from "./server.ts";
/**
@@ -13,35 +22,42 @@ const NULL_BODIES = [101, 204, 205, 304];
* @param {ClientState} state - The client state.
* @param {ResponseStartMessage} message - The message data.
*/
-const onResponseStart: ClientMessageHandler = (state, message) => {
- const request = state.ongoingRequests[message.id];
- if (!request) {
- console.error(
- new Date(),
- "Didn't find response object, probably dead?",
- );
- return;
- }
- const headers = new Headers();
- Object.entries(message.headers).forEach(([key, value]: [string, string]) => {
- headers.set(key, value);
- });
- const shouldBeNullBody = NULL_BODIES.includes(message.statusCode);
- const stream = !shouldBeNullBody && request.dataChan ? makeReadableStream(request.dataChan) : undefined;
- const resp = new Response(stream, {
- status: message.statusCode,
- statusText: message.statusMessage,
- headers,
- });
+const onResponseStart: ClientMessageHandler = (
+ state,
+ message,
+) => {
+ const request = state.ongoingRequests[message.id];
+ if (!request) {
+ console.error(
+ new Date(),
+ "Didn't find response object, probably dead?",
+ );
+ return;
+ }
+ const headers = new Headers();
+ Object.entries(message.headers).forEach(([key, value]: [string, string]) => {
+ headers.set(key, value);
+ });
+ const shouldBeNullBody = NULL_BODIES.includes(message.statusCode);
+ const stream = !shouldBeNullBody && request.dataChan
+ ? makeReadableStream(request.dataChan)
+ : undefined;
+ const resp = new Response(stream, {
+ status: message.statusCode,
+ statusText: message.statusMessage,
+ headers,
+ });
- request.requestObject?.signal?.addEventListener?.("abort", () => {
- if (message.id in state.ongoingRequests) {
- delete state.ongoingRequests[message.id];
- request.responseObject.reject(new DOMException("Connection closed", "AbortError"));
- }
- });
- request.responseObject.resolve(resp);
-}
+ request.requestObject?.signal?.addEventListener?.("abort", () => {
+ if (message.id in state.ongoingRequests) {
+ delete state.ongoingRequests[message.id];
+ request.responseObject.reject(
+ new DOMException("Connection closed", "AbortError"),
+ );
+ }
+ });
+ request.responseObject.resolve(resp);
+};
/**
* Handler for the 'data' client message.
@@ -49,21 +65,21 @@ const onResponseStart: ClientMessageHandler = (state, mess
* @param {DataMessage} message - The message data.
*/
const data: ClientMessageHandler = async (state, message) => {
- const request = state.ongoingRequests[message.id];
- if (!request) {
- console.error(
- new Date(),
- "Didn't find response object, unable to send data",
- message.id,
- );
- return;
- }
- try {
- await request.dataChan?.send(ensureChunked(message.chunk));
- } catch (_err) {
- console.log("Request was aborted", _err);
- }
-}
+ const request = state.ongoingRequests[message.id];
+ if (!request) {
+ console.error(
+ new Date(),
+ "Didn't find response object, unable to send data",
+ message.id,
+ );
+ return;
+ }
+ try {
+ await request.dataChan?.send(ensureChunked(message.chunk));
+ } catch (_err) {
+ console.log("Request was aborted", _err);
+ }
+};
/**
* Handler for the 'data-end' client message.
@@ -71,35 +87,40 @@ const data: ClientMessageHandler = async (state, message) => {
* @param {DataEndMessage} message - The message data.
*/
const onDataEnd: ClientMessageHandler = (state, message) => {
- const request = state.ongoingRequests[message.id];
- if (!request) {
- console.error(
- new Date(),
- "Didn't find response object, unable to send data",
- );
- return;
- }
- if (message.error) {
- request.responseObject.reject(new DOMException("Connection closed", JSON.stringify(message.error)));
- return;
- }
- try {
- // Call ready again to ensure that all chunks are written
- // before closing the writer.
- request.dataChan?.close?.();
- } catch (_err) {
- console.log(_err);
- }
-}
+ const request = state.ongoingRequests[message.id];
+ if (!request) {
+ console.error(
+ new Date(),
+ "Didn't find response object, unable to send data",
+ );
+ return;
+ }
+ if (message.error) {
+ request.responseObject.reject(
+ new DOMException("Connection closed", JSON.stringify(message.error)),
+ );
+ return;
+ }
+ try {
+ // Call ready again to ensure that all chunks are written
+ // before closing the writer.
+ request.dataChan?.close?.();
+ } catch (_err) {
+ console.log(_err);
+ }
+};
/**
* Handler for the 'ws-closed' client message.
* @param {ClientState} state - The client state.
* @param {WSConnectionClosed} message - The message data.
*/
-const onWsClosed: ClientMessageHandler = (state, message) => {
- delete state.ongoingRequests[message.id];
-}
+const onWsClosed: ClientMessageHandler = (
+ state,
+ message,
+) => {
+ delete state.ongoingRequests[message.id];
+};
/**
* Handler for the 'ws-message' client message.
@@ -107,73 +128,93 @@ const onWsClosed: ClientMessageHandler = (state, message) =>
* @param {WSMessage} message - The message data.
*/
const onWsMessage: ClientMessageHandler = async (state, message) => {
- await state.ongoingRequests?.[message.id]?.socketChan?.send(message.data)
-}
+ await state.ongoingRequests?.[message.id]?.socketChan?.send(message.data);
+};
/**
* Handler for the 'ws-opened' client message.
* @param {ClientState} state - The client state.
* @param {DataEndMessage} message - The message data.
*/
-const onWsOpened: ClientMessageHandler = async (state, message) => {
- const request = state.ongoingRequests[message.id];
- if (!request) {
- return;
- }
- try {
- const { socket, response } = Deno.upgradeWebSocket(request.requestObject);
- request.responseObject.resolve(response);
- const socketChan = await makeWebSocket(socket, false);
- request.socketChan = socketChan.out;
- (async () => {
- const signal = state.ch.out.signal;
- try {
- for await (const msg of socketChan.in.recv(signal)) {
- await state.ch.out.send({ type: "ws-message", id: message.id, data: msg });
- }
- if (signal.aborted) {
- return;
- }
- await state.ch.out.send({ type: "ws-closed", id: message.id })
- } catch (error) {
- if (signal.aborted) {
- console.log("sending through a closed channel error", error, message);
- } else {
- console.error(`unexpected error when handling websocket message`, error, message);
- }
- } finally {
- try {
- socket.close();
- } catch (_err) {
- // ignore
- }
- }
- })()
- }
- catch (err) {
- console.error(new Date(), "Error upgrading websocket", err);
- delete state.ongoingRequests[message.id];
- }
-}
+const onWsOpened: ClientMessageHandler = async (
+ state,
+ message,
+) => {
+ const request = state.ongoingRequests[message.id];
+ if (!request) {
+ return;
+ }
+ try {
+ const { socket, response } = Deno.upgradeWebSocket(request.requestObject);
+ request.responseObject.resolve(response);
+ const socketChan = await makeWebSocket(
+ socket,
+ false,
+ );
+ request.socketChan = socketChan.out;
+ (async () => {
+ const signal = state.ch.out.signal;
+ try {
+ for await (const msg of socketChan.in.recv(signal)) {
+ await state.ch.out.send({
+ type: "ws-message",
+ id: message.id,
+ data: msg,
+ });
+ }
+ if (signal.aborted) {
+ return;
+ }
+ await state.ch.out.send({ type: "ws-closed", id: message.id });
+ } catch (error) {
+ if (signal.aborted) {
+ console.log("sending through a closed channel error", error, message);
+ } else {
+ console.error(
+ `unexpected error when handling websocket message`,
+ error,
+ message,
+ );
+ }
+ } finally {
+ try {
+ socket.close();
+ } catch (_err) {
+ // ignore
+ }
+ }
+ })();
+ } catch (err) {
+ console.error(new Date(), "Error upgrading websocket", err);
+ delete state.ongoingRequests[message.id];
+ }
+};
/**
* Handler for the 'register' client message.
* @param {ClientState} state - The client state.
* @param {RegisterMessage} message - The message data.
*/
-const register: ClientMessageHandler = async (state, message) => {
- if (state.apiKeys.includes(message.apiKey)) {
- state.controller.link(message.domain);
- await state.ch.out.send({ type: "registered", domain: message.domain, id: message.id })
- } else {
- console.error(
- new Date(),
- "Given API key is wrong/not recognised, stopping connection",
- message,
- );
- await state.ch.out.send({ type: "error", message: "Invalid API key" })
- state.socket.close();
- }
-}
+const register: ClientMessageHandler = async (
+ state,
+ message,
+) => {
+ if (state.apiKeys.includes(message.apiKey)) {
+ state.controller.link(message.domain);
+ await state.ch.out.send({
+ type: "registered",
+ domain: message.domain,
+ id: message.id,
+ });
+ } else {
+ console.error(
+ new Date(),
+ "Given API key is wrong/not recognised, stopping connection",
+ message,
+ );
+ await state.ch.out.send({ type: "error", message: "Invalid API key" });
+ state.socket.close();
+ }
+};
/**
* A record mapping client message types to their respective handlers.
@@ -181,7 +222,8 @@ const register: ClientMessageHandler = async (state, message) =
* @ignore
*/
// deno-lint-ignore no-explicit-any
-const handlersByType: Record> = {
+const handlersByType: Record> =
+ {
"response-start": onResponseStart,
data,
"data-end": onDataEnd,
@@ -189,17 +231,24 @@ const handlersByType: Record> =
"ws-message": onWsMessage,
"ws-opened": onWsOpened,
register,
-}
+ };
/**
* Handles client messages received from the server.
* @param {ClientState} state - The client state.
* @param {ClientMessage} message - The message received from the server.
*/
-export const handleClientMessage: ClientMessageHandler = async (state, message) => {
- console.info(new Date(), message.type, "id" in message ? message.id : "");
- await handlersByType?.[message.type]?.(state, message)?.catch?.(err => {
- console.error("unexpected error happening when handling message", message, err);
- delete state.ongoingRequests[message.id];
- });
-}
+export const handleClientMessage: ClientMessageHandler = async (
+ state,
+ message,
+) => {
+ console.info(new Date(), message.type, "id" in message ? message.id : "");
+ await handlersByType?.[message.type]?.(state, message)?.catch?.((err) => {
+ console.error(
+ "unexpected error happening when handling message",
+ message,
+ err,
+ );
+ delete state.ongoingRequests[message.id];
+ });
+};
diff --git a/messages.ts b/messages.ts
index 5df6f0f..0a7c8cc 100644
--- a/messages.ts
+++ b/messages.ts
@@ -1,100 +1,121 @@
import type { Channel, DuplexChannel } from "./channel.ts";
export interface RequestObject {
- id: string;
- requestObject: Request;
- responseObject: ReturnType>;
- dataChan?: Channel;
- socketChan?: Channel
+ id: string;
+ requestObject: Request;
+ responseObject: ReturnType>;
+ dataChan?: Channel;
+ socketChan?: Channel;
}
export interface RegisterMessage {
- id: string;
- type: "register";
- apiKey: string;
- domain: string;
+ id: string;
+ type: "register";
+ apiKey: string;
+ domain: string;
}
export interface ResponseStartMessage {
- type: "response-start";
- id: string;
- statusCode: number;
- statusMessage: string;
- headers: Record;
+ type: "response-start";
+ id: string;
+ statusCode: number;
+ statusMessage: string;
+ headers: Record;
}
export interface DataMessage {
- type: "data";
- id: string;
- chunk: Uint8Array;
+ type: "data";
+ id: string;
+ chunk: Uint8Array;
}
export interface DataEndMessage {
- type: "data-end";
- id: string;
- error?: unknown;
+ type: "data-end";
+ id: string;
+ error?: unknown;
}
export interface WSConnectionOpened {
- type: "ws-opened"
- id: string;
+ type: "ws-opened";
+ id: string;
}
export interface WSMessage {
- type: "ws-message"
- id: string;
- data: ArrayBuffer;
+ type: "ws-message";
+ id: string;
+ data: ArrayBuffer;
}
export interface WSConnectionClosed {
- type: "ws-closed"
- id: string;
-}
-export type ClientMessage = WSMessage | WSConnectionClosed | WSConnectionOpened | RegisterMessage | ResponseStartMessage | DataMessage | DataEndMessage;
+ type: "ws-closed";
+ id: string;
+}
+export type ClientMessage =
+ | WSMessage
+ | WSConnectionClosed
+ | WSConnectionOpened
+ | RegisterMessage
+ | ResponseStartMessage
+ | DataMessage
+ | DataEndMessage;
export interface RequestStartMessage {
- type: "request-start";
- domain: string;
- id: string;
- method: string;
- url: string;
- headers: Record;
- hasBody?: boolean;
+ type: "request-start";
+ domain: string;
+ id: string;
+ method: string;
+ url: string;
+ headers: Record;
+ hasBody?: boolean;
}
export interface RequestDataEndMessage {
- type: "request-end"
- id: string;
+ type: "request-end";
+ id: string;
}
export interface RequestDataMessage {
- type: "request-data";
- id: string;
- chunk: Uint8Array
+ type: "request-data";
+ id: string;
+ chunk: Uint8Array;
}
export interface RegisteredMessage {
- type: "registered";
- id: string;
- domain: string;
+ type: "registered";
+ id: string;
+ domain: string;
}
export interface ErrorMessage {
- type: "error";
- message: string;
-}
-export type ServerMessage = WSMessage | WSConnectionClosed | RequestStartMessage | RequestDataEndMessage | RequestDataMessage | RegisteredMessage | ErrorMessage;
+ type: "error";
+ message: string;
+}
+export type ServerMessage =
+ | WSMessage
+ | WSConnectionClosed
+ | RequestStartMessage
+ | RequestDataEndMessage
+ | RequestDataMessage
+ | RegisteredMessage
+ | ErrorMessage;
export interface ClientState {
- ch: DuplexChannel;
- requestBody: Record>;
- wsMessages: Record>;
- live: boolean;
- localAddr: string;
- client?: Deno.HttpClient
+ ch: DuplexChannel;
+ requestBody: Record>;
+ wsMessages: Record>;
+ live: boolean;
+ localAddr: string;
+ client?: Deno.HttpClient;
}
export interface ClientHostController {
- link: (host: string) => void;
+ link: (host: string) => void;
}
export interface ServerConnectionState {
- clientId: string;
- socket: WebSocket;
- ch: DuplexChannel;
- controller: ClientHostController;
- ongoingRequests: Record;
- apiKeys: string[];
-}
-export type ServerMessageHandler = (state: ClientState, message: TServerMessage) => Promise | void;
-export type ClientMessageHandler = (state: ServerConnectionState, message: TClientMessage) => Promise | void;
+ clientId: string;
+ socket: WebSocket;
+ ch: DuplexChannel;
+ controller: ClientHostController;
+ ongoingRequests: Record;
+ apiKeys: string[];
+}
+export type ServerMessageHandler<
+ TServerMessage extends ServerMessage = ServerMessage,
+> = (state: ClientState, message: TServerMessage) => Promise | void;
+export type ClientMessageHandler<
+ TClientMessage extends ClientMessage = ClientMessage,
+> = (
+ state: ServerConnectionState,
+ message: TClientMessage,
+) => Promise | void;
diff --git a/mod.ts b/mod.ts
index 7c36bcd..5486fa6 100644
--- a/mod.ts
+++ b/mod.ts
@@ -2,4 +2,3 @@ export { connect } from "./client.ts";
export type { ConnectOptions } from "./client.ts";
export { serve, serveHandler } from "./server.ts";
export type { HandlerOptions } from "./server.ts";
-
diff --git a/notify.ts b/notify.ts
index a425365..412a181 100644
--- a/notify.ts
+++ b/notify.ts
@@ -3,7 +3,7 @@
* source from: https://github.com/lambdalisue/deno-async/blob/c86ef00a3056b2436b5e90f01bf52c1cbb83b1c8/notify.ts
*/
export interface WaitOptions {
- signal?: AbortSignal;
+ signal?: AbortSignal;
}
/**
@@ -26,68 +26,68 @@ export interface WaitOptions {
* ```
*/
export class Notify {
- #waiters: {
- promise: Promise;
- resolve: () => void;
- reject: (reason?: unknown) => void;
- }[] = [];
+ #waiters: {
+ promise: Promise;
+ resolve: () => void;
+ reject: (reason?: unknown) => void;
+ }[] = [];
- /**
- * Returns the number of waiters that are waiting for notification.
- */
- get waiters(): number {
- return this.#waiters.length;
- }
+ /**
+ * Returns the number of waiters that are waiting for notification.
+ */
+ get waiters(): number {
+ return this.#waiters.length;
+ }
- /**
- * Notifies `n` waiters that are waiting for notification. Resolves each of the notified waiters.
- * If there are fewer than `n` waiters, all waiters are notified.
- */
- notify(n = 1): void {
- const head = this.#waiters.slice(0, n);
- const tail = this.#waiters.slice(n);
- for (const waiter of head) {
- waiter.resolve();
- }
- this.#waiters = tail;
+ /**
+ * Notifies `n` waiters that are waiting for notification. Resolves each of the notified waiters.
+ * If there are fewer than `n` waiters, all waiters are notified.
+ */
+ notify(n = 1): void {
+ const head = this.#waiters.slice(0, n);
+ const tail = this.#waiters.slice(n);
+ for (const waiter of head) {
+ waiter.resolve();
}
+ this.#waiters = tail;
+ }
- /**
- * Notifies all waiters that are waiting for notification. Resolves each of the notified waiters.
- */
- notifyAll(): void {
- for (const waiter of this.#waiters) {
- waiter.resolve();
- }
- this.#waiters = [];
+ /**
+ * Notifies all waiters that are waiting for notification. Resolves each of the notified waiters.
+ */
+ notifyAll(): void {
+ for (const waiter of this.#waiters) {
+ waiter.resolve();
}
+ this.#waiters = [];
+ }
- /**
- * Asynchronously waits for notification. The caller's execution is suspended until
- * the `notify` method is called. The method returns a Promise that resolves when the caller is notified.
- * Optionally takes an AbortSignal to abort the waiting if the signal is aborted.
- *
- * @param options Optional parameters.
- * @param options.signal An optional AbortSignal to abort the waiting if the signal is aborted.
- * @throws {DOMException} If the signal is aborted.
- */
- async notified({ signal }: WaitOptions = {}): Promise {
- if (signal?.aborted) {
- throw new DOMException("Aborted", "AbortError");
- }
- const waiter = Promise.withResolvers();
- const abort = () => {
- removeItem(this.#waiters, waiter);
- waiter.reject(new DOMException("Aborted", "AbortError"));
- };
- signal?.addEventListener("abort", abort, { once: true });
- this.#waiters.push(waiter);
- await waiter.promise;
- signal?.removeEventListener("abort", abort);
+ /**
+ * Asynchronously waits for notification. The caller's execution is suspended until
+ * the `notify` method is called. The method returns a Promise that resolves when the caller is notified.
+ * Optionally takes an AbortSignal to abort the waiting if the signal is aborted.
+ *
+ * @param options Optional parameters.
+ * @param options.signal An optional AbortSignal to abort the waiting if the signal is aborted.
+ * @throws {DOMException} If the signal is aborted.
+ */
+ async notified({ signal }: WaitOptions = {}): Promise {
+ if (signal?.aborted) {
+ throw new DOMException("Aborted", "AbortError");
}
+ const waiter = Promise.withResolvers();
+ const abort = () => {
+ removeItem(this.#waiters, waiter);
+ waiter.reject(new DOMException("Aborted", "AbortError"));
+ };
+ signal?.addEventListener("abort", abort, { once: true });
+ this.#waiters.push(waiter);
+ await waiter.promise;
+ signal?.removeEventListener("abort", abort);
+ }
}
function removeItem(array: T[], item: T): void {
- const index = array.indexOf(item);
- array.splice(index, 1);
-}
\ No newline at end of file
+ const index = array.indexOf(item);
+ array.splice(index, 1);
+}
diff --git a/queue.ts b/queue.ts
index 3c5623d..550679a 100644
--- a/queue.ts
+++ b/queue.ts
@@ -24,49 +24,49 @@ import { Notify, type WaitOptions } from "./notify.ts";
* @template T The type of items in the queue.
*/
export class Queue {
- #notify = new Notify();
- #items: T[] = [];
+ #notify = new Notify();
+ #items: T[] = [];
- /**
- * Gets the number of items in the queue.
- */
- get size(): number {
- return this.#items.length;
- }
+ /**
+ * Gets the number of items in the queue.
+ */
+ get size(): number {
+ return this.#items.length;
+ }
- /**
- * Returns true if the queue is currently locked.
- */
- get locked(): boolean {
- return this.#notify.waiters > 0;
- }
+ /**
+ * Returns true if the queue is currently locked.
+ */
+ get locked(): boolean {
+ return this.#notify.waiters > 0;
+ }
- /**
- * Adds an item to the end of the queue and notifies any waiting consumers.
- *
- * @param {T} value The item to add to the queue.
- */
- push(value: T): void {
- this.#items.push(value);
- this.#notify.notify();
- }
+ /**
+ * Adds an item to the end of the queue and notifies any waiting consumers.
+ *
+ * @param {T} value The item to add to the queue.
+ */
+ push(value: T): void {
+ this.#items.push(value);
+ this.#notify.notify();
+ }
- /**
- * Removes the next item from the queue, optionally waiting if the queue is currently empty.
- *
- * @param {WaitOptions} [options] Optional parameters to pass to the wait operation.
- * @param {AbortSignal} [options.signal] An optional AbortSignal used to abort the wait operation if the signal is aborted.
- * @returns {Promise} A promise that resolves to the next item in the queue.
- * @throws {DOMException} Throws a DOMException with "Aborted" and "AbortError" codes if the wait operation was aborted.
- */
- async pop({ signal }: WaitOptions = {}): Promise {
- while (!signal?.aborted) {
- const value = this.#items.shift();
- if (value) {
- return value;
- }
- await this.#notify.notified({ signal });
- }
- throw new DOMException("Aborted", "AbortError");
+ /**
+ * Removes the next item from the queue, optionally waiting if the queue is currently empty.
+ *
+ * @param {WaitOptions} [options] Optional parameters to pass to the wait operation.
+ * @param {AbortSignal} [options.signal] An optional AbortSignal used to abort the wait operation if the signal is aborted.
+ * @returns {Promise} A promise that resolves to the next item in the queue.
+ * @throws {DOMException} Throws a DOMException with "Aborted" and "AbortError" codes if the wait operation was aborted.
+ */
+ async pop({ signal }: WaitOptions = {}): Promise {
+ while (!signal?.aborted) {
+ const value = this.#items.shift();
+ if (value) {
+ return value;
+ }
+ await this.#notify.notified({ signal });
}
-}
\ No newline at end of file
+ throw new DOMException("Aborted", "AbortError");
+ }
+}
diff --git a/server.ts b/server.ts
index a71b9ef..9a20acf 100644
--- a/server.ts
+++ b/server.ts
@@ -1,6 +1,10 @@
import { makeChan, makeChanStream, makeWebSocket } from "./channel.ts";
import { handleClientMessage } from "./handlers.server.ts";
-import type { ClientMessage, ServerConnectionState, ServerMessage } from "./messages.ts";
+import type {
+ ClientMessage,
+ ServerConnectionState,
+ ServerMessage,
+} from "./messages.ts";
/**
* Ensures that the given chunk is in the form of a Uint8Array.
@@ -8,13 +12,16 @@ import type { ClientMessage, ServerConnectionState, ServerMessage } from "./mess
* @param {Uint8Array | Record} chunk - The input chunk, which can be either a Uint8Array or an object.
* @returns {Uint8Array} The chunk converted into a Uint8Array.
*/
-export const ensureChunked = (chunk: Uint8Array | Record): Uint8Array => {
+export const ensureChunked = (
+ chunk: Uint8Array | Record,
+): Uint8Array => {
if (Array.isArray(chunk)) {
return chunk as Uint8Array;
}
- return new Uint8Array(Array.from({ ...chunk, length: Object.keys(chunk).length }))
-}
-
+ return new Uint8Array(
+ Array.from({ ...chunk, length: Object.keys(chunk).length }),
+ );
+};
const serverStates: Record = {};
const hostToClientId: Record = {};
@@ -35,7 +42,7 @@ export interface ServeOptions extends HandlerOptions {
* @property {string} connectPath - A path for connecting to the server.
*/
export interface HandlerOptions {
- apiKeys: string[]
+ apiKeys: string[];
connectPath?: string;
}
@@ -45,12 +52,12 @@ export interface HandlerOptions {
* @returns {Deno.HttpServer} An instance of Deno HTTP server.
*/
export const serve = (options: ServeOptions): Deno.HttpServer => {
- const port = (options?.port ?? 8000);
+ const port = options?.port ?? 8000;
return Deno.serve({
handler: serveHandler(options),
port,
- })
-}
+ });
+};
/**
* Creates a handler function for serving requests, with support for WebSocket connections
@@ -61,7 +68,9 @@ export const serve = (options: ServeOptions): Deno.HttpServer => {
* @param {string} [options.connectPath] - The path for WebSocket connection upgrades.
* @returns {(request: Request) => Response | Promise} - The request handler function.
*/
-export const serveHandler = (options: HandlerOptions): (request: Request) => Response | Promise => {
+export const serveHandler = (
+ options: HandlerOptions,
+): (request: Request) => Response | Promise => {
const apiKeys = options.apiKeys; // array of api keys (random strings)
const connectPath = options?.connectPath ?? "/_connect";
@@ -81,11 +90,11 @@ export const serveHandler = (options: HandlerOptions): (request: Request) => Res
link: (host) => {
hosts.push(host);
hostToClientId[host] = clientId;
- }
+ },
},
ongoingRequests: {},
apiKeys,
- }
+ };
serverStates[state.clientId] = state;
try {
for await (const message of ch.in.recv(req.signal)) {
@@ -99,15 +108,17 @@ export const serveHandler = (options: HandlerOptions): (request: Request) => Res
delete hostToClientId[host];
}
}
- })()
+ })();
return response;
-
}
const host = req.headers.get("host");
if (host && host in hostToClientId) {
const serverState = serverStates[hostToClientId[host]];
if (!serverState) {
- return new Response("No registration for domain and/or remote service not available", { status: 503 });
+ return new Response(
+ "No registration for domain and/or remote service not available",
+ { status: 503 },
+ );
}
const { ch, ongoingRequests } = serverState;
const messageId = crypto.randomUUID();
@@ -133,7 +144,7 @@ export const serveHandler = (options: HandlerOptions): (request: Request) => Res
requestObject: req,
responseObject,
dataChan: makeChan(),
- }
+ };
try {
const signal = ch.out.signal;
await ch.out.send(requestForward);
@@ -155,19 +166,37 @@ export const serveHandler = (options: HandlerOptions): (request: Request) => Res
id: messageId,
});
} catch (err) {
- responseObject.resolve(new Response("Error sending request to remote client", { status: 503 }));
+ responseObject.resolve(
+ new Response("Error sending request to remote client", {
+ status: 503,
+ }),
+ );
if (signal.aborted) {
return;
}
- console.log(`unexpected error when sending request`, err, req, messageId);
+ console.log(
+ `unexpected error when sending request`,
+ err,
+ req,
+ messageId,
+ );
}
- })()
+ })();
return responseObject.promise;
} catch (err) {
- console.error(new Date(), "Error sending request to remote client", err);
- return new Response("Error sending request to remote client", { status: 503 });
+ console.error(
+ new Date(),
+ "Error sending request to remote client",
+ err,
+ );
+ return new Response("Error sending request to remote client", {
+ status: 503,
+ });
}
}
- return new Response("No registration for domain and/or remote service not available", { status: 503 });
- }
-}
\ No newline at end of file
+ return new Response(
+ "No registration for domain and/or remote service not available",
+ { status: 503 },
+ );
+ };
+};
diff --git a/test.ts b/test.ts
index 8694596..72a007b 100644
--- a/test.ts
+++ b/test.ts
@@ -8,33 +8,33 @@ const _localServer = Deno.serve({
const { socket, response } = Deno.upgradeWebSocket(req);
socket.onclose = () => {
console.log("CLOSED");
- }
+ };
socket.onopen = () => {
console.log("OPEN");
socket.send(JSON.stringify({ ping: true }));
- }
+ };
socket.onmessage = (msg) => {
console.log("MESSAGE RECEIVED", msg);
- }
+ };
return response;
}
const cp = new Headers(req.headers);
cp.set("x-server-reply", "true");
return new Response(JSON.stringify({ message: "HELLO WORLD" }), {
headers: {
- 'content-type': "application/json",
- }, status: 200
+ "content-type": "application/json",
+ },
+ status: 200,
});
},
port: LOCAL_PORT,
});
-
const KEY = "c309424a-2dc4-46fe-bfc7-a7c10df59477";
const _tunnelServer = serve({
apiKeys: [KEY],
- port: 8001
+ port: 8001,
});
const domain = "localhost:8001";
@@ -45,7 +45,6 @@ await connect({
apiKey: KEY,
});
-
const client = Deno.createHttpClient({
allowHost: true,
});
@@ -54,19 +53,18 @@ const resp = await fetch("http://localhost:8001", {
method: "POST",
headers: {
"x-client-request": "true",
- "host": domain
+ "host": domain,
},
client,
body: "Hello World",
});
console.log("TEXT", await resp.text(), resp.headers);
-
const ws = new WebSocket("ws://localhost:8001/connect-ws");
ws.onmessage = (msg) => {
console.log("MESSAGE CLIENT RECEIVED", msg);
-}
+};
ws.onopen = () => {
- ws.send(JSON.stringify({ pong: true }))
+ ws.send(JSON.stringify({ pong: true }));
ws.close();
-}
\ No newline at end of file
+};