-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: Marcos Candeia <[email protected]>
- Loading branch information
Showing
12 changed files
with
810 additions
and
643 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,149 +1,163 @@ | ||
import { Queue } from "./queue.ts"; | ||
|
||
export interface Channel<T> { | ||
closed: Promise<void>; | ||
signal: AbortSignal; | ||
close(): void; | ||
send(value: T): Promise<void>; | ||
recv(signal?: AbortSignal): AsyncIterableIterator<T>; | ||
closed: Promise<void>; | ||
signal: AbortSignal; | ||
close(): void; | ||
send(value: T): Promise<void>; | ||
recv(signal?: AbortSignal): AsyncIterableIterator<T>; | ||
} | ||
|
||
export const link = (...signals: AbortSignal[]): AbortSignal => { | ||
const ctrl = new AbortController(); | ||
for (const signal of signals) { | ||
signal.onabort = (evt) => { | ||
if (!ctrl.signal.aborted) { | ||
ctrl.abort(evt); | ||
} | ||
} | ||
} | ||
return ctrl.signal; | ||
} | ||
const ctrl = new AbortController(); | ||
for (const signal of signals) { | ||
signal.onabort = (evt) => { | ||
if (!ctrl.signal.aborted) { | ||
ctrl.abort(evt); | ||
} | ||
}; | ||
} | ||
return ctrl.signal; | ||
}; | ||
|
||
export class ClosedChannelError extends Error { | ||
constructor() { | ||
super("Channel is closed"); | ||
} | ||
constructor() { | ||
super("Channel is closed"); | ||
} | ||
} | ||
export const ifClosedChannel = (cb: () => Promise<void> | void) => (err: unknown) => { | ||
export const ifClosedChannel = | ||
(cb: () => Promise<void> | void) => (err: unknown) => { | ||
if (err instanceof ClosedChannelError) { | ||
return cb(); | ||
return cb(); | ||
} | ||
throw err; | ||
} | ||
}; | ||
|
||
export const ignoreIfClosed = ifClosedChannel(() => { }) | ||
export const ignoreIfClosed = ifClosedChannel(() => {}); | ||
export const makeChan = <T>(): Channel<T> => { | ||
const queue: Queue<{ value: T, resolve: () => void }> = new Queue(); | ||
const ctrl = new AbortController(); | ||
const abortPromise = Promise.withResolvers<void>(); | ||
ctrl.signal.onabort = () => { | ||
abortPromise.resolve(); | ||
} | ||
const queue: Queue<{ value: T; resolve: () => void }> = new Queue(); | ||
const ctrl = new AbortController(); | ||
const abortPromise = Promise.withResolvers<void>(); | ||
ctrl.signal.onabort = () => { | ||
abortPromise.resolve(); | ||
}; | ||
|
||
const send = (value: T): Promise<void> => { | ||
return new Promise((resolve, reject) => { | ||
if (ctrl.signal.aborted) reject(new ClosedChannelError()); | ||
queue.push({ value, resolve }); | ||
}); | ||
}; | ||
const send = (value: T): Promise<void> => { | ||
return new Promise((resolve, reject) => { | ||
if (ctrl.signal.aborted) reject(new ClosedChannelError()); | ||
queue.push({ value, resolve }); | ||
}); | ||
}; | ||
|
||
const close = () => { | ||
ctrl.abort(); | ||
}; | ||
const close = () => { | ||
ctrl.abort(); | ||
}; | ||
|
||
const recv = async function* (signal?: AbortSignal): AsyncIterableIterator<T> { | ||
const linked = signal ? link(ctrl.signal, signal) : ctrl.signal; | ||
while (true) { | ||
if (linked.aborted) { | ||
return; | ||
} | ||
try { | ||
const next = await queue.pop({ signal: linked }); | ||
next.resolve(); | ||
yield next.value; | ||
} catch (_err) { | ||
if (linked.aborted) { | ||
return; | ||
} | ||
throw _err; | ||
} | ||
const recv = async function* ( | ||
signal?: AbortSignal, | ||
): AsyncIterableIterator<T> { | ||
const linked = signal ? link(ctrl.signal, signal) : ctrl.signal; | ||
while (true) { | ||
if (linked.aborted) { | ||
return; | ||
} | ||
try { | ||
const next = await queue.pop({ signal: linked }); | ||
next.resolve(); | ||
yield next.value; | ||
} catch (_err) { | ||
if (linked.aborted) { | ||
return; | ||
} | ||
}; | ||
throw _err; | ||
} | ||
} | ||
}; | ||
|
||
return { send, recv, close, signal: ctrl.signal, closed: abortPromise.promise }; | ||
return { | ||
send, | ||
recv, | ||
close, | ||
signal: ctrl.signal, | ||
closed: abortPromise.promise, | ||
}; | ||
}; | ||
|
||
export interface DuplexChannel<TSend, TReceive> { | ||
in: Channel<TReceive> | ||
out: Channel<TSend> | ||
in: Channel<TReceive>; | ||
out: Channel<TSend>; | ||
} | ||
|
||
export const makeWebSocket = <TSend, TReceive>(socket: WebSocket, parse: boolean = true): Promise<DuplexChannel<TSend, TReceive>> => { | ||
const sendChan = makeChan<TSend>(); | ||
const recvChan = makeChan<TReceive>(); | ||
const ch = Promise.withResolvers<DuplexChannel<TSend, TReceive>>(); | ||
socket.onclose = () => { | ||
sendChan.close(); | ||
recvChan.close(); | ||
export const makeWebSocket = <TSend, TReceive>( | ||
socket: WebSocket, | ||
parse: boolean = true, | ||
): Promise<DuplexChannel<TSend, TReceive>> => { | ||
const sendChan = makeChan<TSend>(); | ||
const recvChan = makeChan<TReceive>(); | ||
const ch = Promise.withResolvers<DuplexChannel<TSend, TReceive>>(); | ||
socket.onclose = () => { | ||
sendChan.close(); | ||
recvChan.close(); | ||
}; | ||
socket.onerror = (err) => { | ||
socket.close(); | ||
ch.reject(err); | ||
}; | ||
socket.onmessage = async (msg) => { | ||
let eventData = msg.data; | ||
const target = msg?.target; | ||
if ( | ||
target && "binaryType" in target && | ||
target.binaryType === "blob" && typeof eventData === "object" && | ||
"text" in eventData | ||
) { | ||
eventData = await eventData.text(); | ||
} | ||
socket.onerror = (err) => { | ||
socket.close(); | ||
ch.reject(err); | ||
const message = parse ? JSON.parse(eventData) : eventData; | ||
await recvChan.send(message); | ||
}; | ||
socket.onopen = async () => { | ||
ch.resolve({ in: recvChan, out: sendChan }); | ||
for await (const message of sendChan.recv()) { | ||
try { | ||
socket.send(parse ? JSON.stringify(message) : message as ArrayBuffer); | ||
} catch (_err) { | ||
console.error("error sending message through socket", message); | ||
} | ||
} | ||
socket.onmessage = async (msg) => { | ||
let eventData = msg.data; | ||
const target = msg?.target; | ||
if ( | ||
target && "binaryType" in target && | ||
target.binaryType === "blob" && typeof eventData === "object" && | ||
"text" in eventData | ||
) { | ||
eventData = await eventData.text(); | ||
} | ||
const message = parse ? JSON.parse(eventData) : eventData; | ||
await recvChan.send(message); | ||
} | ||
socket.onopen = async () => { | ||
ch.resolve({ in: recvChan, out: sendChan }); | ||
for await (const message of sendChan.recv()) { | ||
try { | ||
socket.send(parse ? JSON.stringify(message) : message as ArrayBuffer); | ||
} catch (_err) { | ||
console.error("error sending message through socket", message); | ||
} | ||
} | ||
socket.close(); | ||
} | ||
return ch.promise; | ||
} | ||
socket.close(); | ||
}; | ||
return ch.promise; | ||
}; | ||
|
||
export const makeReadableStream = (ch: Channel<Uint8Array>): ReadableStream<Uint8Array> => { | ||
return new ReadableStream({ | ||
async start(controller) { | ||
for await (const content of ch.recv()) { | ||
controller.enqueue(content); | ||
} | ||
controller.close(); | ||
}, | ||
cancel() { | ||
ch.close(); | ||
}, | ||
}) | ||
} | ||
export const makeReadableStream = ( | ||
ch: Channel<Uint8Array>, | ||
): ReadableStream<Uint8Array> => { | ||
return new ReadableStream({ | ||
async start(controller) { | ||
for await (const content of ch.recv()) { | ||
controller.enqueue(content); | ||
} | ||
controller.close(); | ||
}, | ||
cancel() { | ||
ch.close(); | ||
}, | ||
}); | ||
}; | ||
export const makeChanStream = (stream: ReadableStream): Channel<Uint8Array> => { | ||
const chan = makeChan<Uint8Array>(); | ||
const chan = makeChan<Uint8Array>(); | ||
|
||
// Consume the transformed stream to trigger the pipeline | ||
const reader = stream.getReader(); | ||
const processStream = async () => { | ||
while (true) { | ||
const { done, value } = await reader.read(); | ||
if (done) break; | ||
await chan.send(value); | ||
} | ||
chan.close(); | ||
}; | ||
processStream().catch(console.error); | ||
return chan; | ||
// Consume the transformed stream to trigger the pipeline | ||
const reader = stream.getReader(); | ||
const processStream = async () => { | ||
while (true) { | ||
const { done, value } = await reader.read(); | ||
if (done) break; | ||
await chan.send(value); | ||
} | ||
chan.close(); | ||
}; | ||
processStream().catch(console.error); | ||
return chan; | ||
}; |
Oops, something went wrong.