Skip to content

Commit

Permalink
Fixes memory leak on server connection
Browse files Browse the repository at this point in the history
Signed-off-by: Marcos Candeia <[email protected]>
  • Loading branch information
mcandeia committed May 30, 2024
1 parent 0d89e38 commit ba17ff5
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 17 deletions.
2 changes: 1 addition & 1 deletion deno.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@deco/warp",
"version": "0.1.6",
"version": "0.1.7",
"exports": "./mod.ts",
"tasks": {
"check": "deno fmt && deno lint && deno check mod.ts"
Expand Down
4 changes: 2 additions & 2 deletions handlers.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ const onWsOpened: ClientMessageHandler<DataEndMessage> = async (state, message)
*/
const register: ClientMessageHandler<RegisterMessage> = async (state, message) => {
if (state.apiKeys.includes(message.apiKey)) {
state.domainsToConnections[message.domain] = state.ch;
state.controller.link(message.domain);
await state.ch.out.send({ type: "registered", domain: message.domain, id: message.id })
} else {
console.error(
Expand Down Expand Up @@ -197,7 +197,7 @@ const handlersByType: Record<ClientMessage["type"], ClientMessageHandler<any>> =
* @param {ClientMessage} message - The message received from the server.
*/
export const handleClientMessage: ClientMessageHandler = async (state, message) => {
console.info(new Date(), `[server]`, message.type, "id" in message ? message.id : "");
console.info(new Date(), message.type, "id" in message ? message.id : "");
await handlersByType?.[message.type]?.(state, message)?.catch?.(err => {
console.error("unexpected error happening when handling message", message, err);
delete state.ongoingRequests[message.id];
Expand Down
11 changes: 8 additions & 3 deletions messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,17 @@ export interface ClientState {
localAddr: string;
client?: Deno.HttpClient
}
export interface ServerState {

export interface ClientHostController {
link: (host: string) => void;
}
export interface ServerConnectionState {
clientId: string;
socket: WebSocket;
ch: DuplexChannel<ServerMessage, ClientMessage>;
domainsToConnections: Record<string, DuplexChannel<ServerMessage, ClientMessage>>;
controller: ClientHostController;
ongoingRequests: Record<string, RequestObject>;
apiKeys: string[];
}
export type ServerMessageHandler<TServerMessage extends ServerMessage = ServerMessage> = (state: ClientState, message: TServerMessage) => Promise<void> | void;
export type ClientMessageHandler<TClientMessage extends ClientMessage = ClientMessage> = (state: ServerState, message: TClientMessage) => Promise<void> | void;
export type ClientMessageHandler<TClientMessage extends ClientMessage = ClientMessage> = (state: ServerConnectionState, message: TClientMessage) => Promise<void> | void;
44 changes: 33 additions & 11 deletions server.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { type DuplexChannel, makeChan, makeChanStream, makeWebSocket } from "./channel.ts";
import { makeChan, makeChanStream, makeWebSocket } from "./channel.ts";
import { handleClientMessage } from "./handlers.server.ts";
import type { ClientMessage, RequestObject, ServerMessage, ServerState } from "./messages.ts";
import type { ClientMessage, ServerConnectionState, ServerMessage } from "./messages.ts";

/**
* Ensures that the given chunk is in the form of a Uint8Array.
Expand All @@ -16,8 +16,8 @@ export const ensureChunked = (chunk: Uint8Array | Record<string, Uint8Array[numb
}


const domainsToConnections: Record<string, DuplexChannel<ServerMessage, ClientMessage>> = {};
const ongoingRequests: Record<string, RequestObject> = {};
const serverStates: Record<string, ServerConnectionState> = {};
const hostToClientId: Record<string, string> = {};

/**
* Represents options for configuring the server.
Expand Down Expand Up @@ -71,23 +71,45 @@ export const serveHandler = (options: HandlerOptions): (request: Request) => Res
const { socket, response } = Deno.upgradeWebSocket(req);
(async () => {
const ch = await makeWebSocket<ServerMessage, ClientMessage>(socket);
const state: ServerState = {
const clientId = crypto.randomUUID();
const hosts: string[] = [];
const state: ServerConnectionState = {
clientId,
socket,
ch,
domainsToConnections,
ongoingRequests,
controller: {
link: (host) => {
hosts.push(host);
hostToClientId[host] = clientId;
}
},
ongoingRequests: {},
apiKeys,
}
for await (const message of ch.in.recv()) {
await handleClientMessage(state, message);
serverStates[state.clientId] = state;
try {
for await (const message of ch.in.recv(req.signal)) {
await handleClientMessage(state, message);
}
} catch (_err) {
// ignore
} finally {
delete serverStates[state.clientId];
for (const host of hosts) {
delete hostToClientId[host];
}
}
})()
return response;

}
const host = req.headers.get("host");
if (host && host in domainsToConnections) {
const ch = domainsToConnections[host];
if (host && host in hostToClientId) {
const serverState = serverStates[hostToClientId[host]];
if (!serverState) {
return new Response("No registration for domain and/or remote service not available", { status: 503 });
}
const { ch, ongoingRequests } = serverState;
const messageId = crypto.randomUUID();
const hasBody = !!req.body;
const url = new URL(req.url);
Expand Down

0 comments on commit ba17ff5

Please sign in to comment.