diff --git a/deno.json b/deno.json index d90ec5f..a81f405 100644 --- a/deno.json +++ b/deno.json @@ -1,6 +1,6 @@ { "name": "@deco/warp", - "version": "0.1.6", + "version": "0.1.7", "exports": "./mod.ts", "tasks": { "check": "deno fmt && deno lint && deno check mod.ts" diff --git a/handlers.server.ts b/handlers.server.ts index b882b51..2941824 100644 --- a/handlers.server.ts +++ b/handlers.server.ts @@ -162,7 +162,7 @@ const onWsOpened: ClientMessageHandler = async (state, message) */ const register: ClientMessageHandler = async (state, message) => { if (state.apiKeys.includes(message.apiKey)) { - state.domainsToConnections[message.domain] = state.ch; + state.controller.link(message.domain); await state.ch.out.send({ type: "registered", domain: message.domain, id: message.id }) } else { console.error( @@ -197,7 +197,7 @@ const handlersByType: Record> = * @param {ClientMessage} message - The message received from the server. */ export const handleClientMessage: ClientMessageHandler = async (state, message) => { - console.info(new Date(), `[server]`, message.type, "id" in message ? message.id : ""); + console.info(new Date(), message.type, "id" in message ? message.id : ""); await handlersByType?.[message.type]?.(state, message)?.catch?.(err => { console.error("unexpected error happening when handling message", message, err); delete state.ongoingRequests[message.id]; diff --git a/messages.ts b/messages.ts index 16f1e27..5df6f0f 100644 --- a/messages.ts +++ b/messages.ts @@ -84,12 +84,17 @@ export interface ClientState { localAddr: string; client?: Deno.HttpClient } -export interface ServerState { + +export interface ClientHostController { + link: (host: string) => void; +} +export interface ServerConnectionState { + clientId: string; socket: WebSocket; ch: DuplexChannel; - domainsToConnections: Record>; + controller: ClientHostController; ongoingRequests: Record; apiKeys: string[]; } export type ServerMessageHandler = (state: ClientState, message: TServerMessage) => Promise | void; -export type ClientMessageHandler = (state: ServerState, message: TClientMessage) => Promise | void; +export type ClientMessageHandler = (state: ServerConnectionState, message: TClientMessage) => Promise | void; diff --git a/server.ts b/server.ts index c1e743f..a71b9ef 100644 --- a/server.ts +++ b/server.ts @@ -1,6 +1,6 @@ -import { type DuplexChannel, makeChan, makeChanStream, makeWebSocket } from "./channel.ts"; +import { makeChan, makeChanStream, makeWebSocket } from "./channel.ts"; import { handleClientMessage } from "./handlers.server.ts"; -import type { ClientMessage, RequestObject, ServerMessage, ServerState } from "./messages.ts"; +import type { ClientMessage, ServerConnectionState, ServerMessage } from "./messages.ts"; /** * Ensures that the given chunk is in the form of a Uint8Array. @@ -16,8 +16,8 @@ export const ensureChunked = (chunk: Uint8Array | Record> = {}; -const ongoingRequests: Record = {}; +const serverStates: Record = {}; +const hostToClientId: Record = {}; /** * Represents options for configuring the server. @@ -71,23 +71,45 @@ export const serveHandler = (options: HandlerOptions): (request: Request) => Res const { socket, response } = Deno.upgradeWebSocket(req); (async () => { const ch = await makeWebSocket(socket); - const state: ServerState = { + const clientId = crypto.randomUUID(); + const hosts: string[] = []; + const state: ServerConnectionState = { + clientId, socket, ch, - domainsToConnections, - ongoingRequests, + controller: { + link: (host) => { + hosts.push(host); + hostToClientId[host] = clientId; + } + }, + ongoingRequests: {}, apiKeys, } - for await (const message of ch.in.recv()) { - await handleClientMessage(state, message); + serverStates[state.clientId] = state; + try { + for await (const message of ch.in.recv(req.signal)) { + await handleClientMessage(state, message); + } + } catch (_err) { + // ignore + } finally { + delete serverStates[state.clientId]; + for (const host of hosts) { + delete hostToClientId[host]; + } } })() return response; } const host = req.headers.get("host"); - if (host && host in domainsToConnections) { - const ch = domainsToConnections[host]; + if (host && host in hostToClientId) { + const serverState = serverStates[hostToClientId[host]]; + if (!serverState) { + return new Response("No registration for domain and/or remote service not available", { status: 503 }); + } + const { ch, ongoingRequests } = serverState; const messageId = crypto.randomUUID(); const hasBody = !!req.body; const url = new URL(req.url);