From 86cb41033bc8611702c6f0b85ba07ee21fb5f2e0 Mon Sep 17 00:00:00 2001 From: Greg Richardson Date: Sat, 27 Jul 2024 02:33:30 -0500 Subject: [PATCH] New method `execProtocolRaw()` (#127) * feat: execProtocolRaw * chore: update disclaimer on execProtocolRaw * fix: syncToFs race condition and remove mutex --- packages/pglite/src/interface.ts | 4 + packages/pglite/src/pglite.ts | 135 +++++++++++++++----------- packages/pglite/src/worker/index.ts | 4 + packages/pglite/src/worker/process.ts | 3 + 4 files changed, 87 insertions(+), 59 deletions(-) diff --git a/packages/pglite/src/interface.ts b/packages/pglite/src/interface.ts index c2fa7976..36aa2fff 100644 --- a/packages/pglite/src/interface.ts +++ b/packages/pglite/src/interface.ts @@ -74,6 +74,10 @@ export type PGliteInterface = { transaction( callback: (tx: Transaction) => Promise, ): Promise; + execProtocolRaw( + message: Uint8Array, + options?: ExecProtocolOptions, + ): Promise; execProtocol( message: Uint8Array, options?: ExecProtocolOptions, diff --git a/packages/pglite/src/pglite.ts b/packages/pglite/src/pglite.ts index 6614ab66..7df875e4 100644 --- a/packages/pglite/src/pglite.ts +++ b/packages/pglite/src/pglite.ts @@ -562,77 +562,94 @@ export class PGlite implements PGliteInterface { } /** - * Execute a postgres wire protocol message + * Execute a postgres wire protocol message directly without wrapping the response. + * Only use if `execProtocol()` doesn't suite your needs. + * + * **Warning:** This bypasses PGlite's protocol wrappers that manage error/notice messages, + * transactions, and notification listeners. Only use if you need to bypass these wrappers and + * don't intend to use the above features. + * * @param message The postgres wire protocol message to execute - * @returns The result of the query + * @returns The direct message data response produced by Postgres */ - async execProtocol( + async execProtocolRaw( message: Uint8Array, { syncToFs = true }: ExecProtocolOptions = {}, - ): Promise> { - return await this.#executeMutex.runExclusive(async () => { - const msg_len = message.length; - const mod = this.mod!; + ) { + const msg_len = message.length; + const mod = this.mod!; - // >0 set buffer content type to wire protocol - // set buffer size so answer will be at size+0x2 pointer addr - mod._interactive_write(msg_len); + // >0 set buffer content type to wire protocol + // set buffer size so answer will be at size+0x2 pointer addr + mod._interactive_write(msg_len); - // copy whole buffer at addr 0x1 - mod.HEAPU8.set(message, 1); + // copy whole buffer at addr 0x1 + mod.HEAPU8.set(message, 1); - // execute the message - mod._interactive_one(); + // execute the message + mod._interactive_one(); - if (syncToFs) { - await this.#syncToFs(); - } + // Read responses from the buffer + const msg_start = msg_len + 2; + const msg_end = msg_start + mod._interactive_read(); + const data = mod.HEAPU8.subarray(msg_start, msg_end); - const results: Array<[BackendMessage, Uint8Array]> = []; - - // Read responses from the buffer - const msg_start = msg_len + 2; - const msg_end = msg_start + mod._interactive_read(); - const data = mod.HEAPU8.subarray(msg_start, msg_end); - - this.#parser.parse(Buffer.from(data), (msg) => { - if (msg instanceof DatabaseError) { - this.#parser = new Parser(); // Reset the parser - throw msg; - // TODO: Do we want to wrap the error in a custom error? - } else if (msg instanceof NoticeMessage && this.debug > 0) { - // Notice messages are warnings, we should log them - console.warn(msg); - } else if (msg instanceof CommandCompleteMessage) { - // Keep track of the transaction state - switch (msg.text) { - case "BEGIN": - this.#inTransaction = true; - break; - case "COMMIT": - case "ROLLBACK": - this.#inTransaction = false; - break; - } - } else if (msg instanceof NotificationResponseMessage) { - // We've received a notification, call the listeners - const listeners = this.#notifyListeners.get(msg.channel); - if (listeners) { - listeners.forEach((cb) => { - // We use queueMicrotask so that the callback is called after any - // synchronous code has finished running. - queueMicrotask(() => cb(msg.payload)); - }); - } - this.#globalNotifyListeners.forEach((cb) => { - queueMicrotask(() => cb(msg.channel, msg.payload)); + if (syncToFs) { + await this.#syncToFs(); + } + + return data; + } + + /** + * Execute a postgres wire protocol message + * @param message The postgres wire protocol message to execute + * @returns The result of the query + */ + async execProtocol( + message: Uint8Array, + { syncToFs = true }: ExecProtocolOptions = {}, + ): Promise> { + const data = await this.execProtocolRaw(message, { syncToFs }); + const results: Array<[BackendMessage, Uint8Array]> = []; + + this.#parser.parse(Buffer.from(data), (msg) => { + if (msg instanceof DatabaseError) { + this.#parser = new Parser(); // Reset the parser + throw msg; + // TODO: Do we want to wrap the error in a custom error? + } else if (msg instanceof NoticeMessage && this.debug > 0) { + // Notice messages are warnings, we should log them + console.warn(msg); + } else if (msg instanceof CommandCompleteMessage) { + // Keep track of the transaction state + switch (msg.text) { + case "BEGIN": + this.#inTransaction = true; + break; + case "COMMIT": + case "ROLLBACK": + this.#inTransaction = false; + break; + } + } else if (msg instanceof NotificationResponseMessage) { + // We've received a notification, call the listeners + const listeners = this.#notifyListeners.get(msg.channel); + if (listeners) { + listeners.forEach((cb) => { + // We use queueMicrotask so that the callback is called after any + // synchronous code has finished running. + queueMicrotask(() => cb(msg.payload)); }); } - results.push([msg, data]); - }); - - return results; + this.#globalNotifyListeners.forEach((cb) => { + queueMicrotask(() => cb(msg.channel, msg.payload)); + }); + } + results.push([msg, data]); }); + + return results; } async #execProtocolNoSync( diff --git a/packages/pglite/src/worker/index.ts b/packages/pglite/src/worker/index.ts index b69be732..4358a0d4 100644 --- a/packages/pglite/src/worker/index.ts +++ b/packages/pglite/src/worker/index.ts @@ -87,6 +87,10 @@ export class PGliteWorker implements PGliteInterface { return this.#worker.transaction(callbackProxy); } + async execProtocolRaw(message: Uint8Array): Promise { + return this.#worker.execProtocolRaw(message); + } + async execProtocol( message: Uint8Array, ): Promise> { diff --git a/packages/pglite/src/worker/process.ts b/packages/pglite/src/worker/process.ts index cdaf91e6..eac2df57 100644 --- a/packages/pglite/src/worker/process.ts +++ b/packages/pglite/src/worker/process.ts @@ -31,6 +31,9 @@ const worker = { return callback(Comlink.proxy(tx)); }); }, + async execProtocolRaw(message: Uint8Array) { + return await db.execProtocolRaw(message); + }, async execProtocol(message: Uint8Array) { return await db.execProtocol(message); },