Skip to content

Commit

Permalink
New method execProtocolRaw() (#127)
Browse files Browse the repository at this point in the history
* feat: execProtocolRaw

* chore: update disclaimer on execProtocolRaw

* fix: syncToFs race condition and remove mutex
  • Loading branch information
gregnr authored Jul 27, 2024
1 parent 5cd0a42 commit 86cb410
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 59 deletions.
4 changes: 4 additions & 0 deletions packages/pglite/src/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ export type PGliteInterface = {
transaction<T>(
callback: (tx: Transaction) => Promise<T>,
): Promise<T | undefined>;
execProtocolRaw(
message: Uint8Array,
options?: ExecProtocolOptions,
): Promise<Uint8Array>;
execProtocol(
message: Uint8Array,
options?: ExecProtocolOptions,
Expand Down
135 changes: 76 additions & 59 deletions packages/pglite/src/pglite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -562,77 +562,94 @@ export class PGlite implements PGliteInterface {
}

/**
* Execute a postgres wire protocol message
* Execute a postgres wire protocol message directly without wrapping the response.
* Only use if `execProtocol()` doesn't suite your needs.
*
* **Warning:** This bypasses PGlite's protocol wrappers that manage error/notice messages,
* transactions, and notification listeners. Only use if you need to bypass these wrappers and
* don't intend to use the above features.
*
* @param message The postgres wire protocol message to execute
* @returns The result of the query
* @returns The direct message data response produced by Postgres
*/
async execProtocol(
async execProtocolRaw(
message: Uint8Array,
{ syncToFs = true }: ExecProtocolOptions = {},
): Promise<Array<[BackendMessage, Uint8Array]>> {
return await this.#executeMutex.runExclusive(async () => {
const msg_len = message.length;
const mod = this.mod!;
) {
const msg_len = message.length;
const mod = this.mod!;

// >0 set buffer content type to wire protocol
// set buffer size so answer will be at size+0x2 pointer addr
mod._interactive_write(msg_len);
// >0 set buffer content type to wire protocol
// set buffer size so answer will be at size+0x2 pointer addr
mod._interactive_write(msg_len);

// copy whole buffer at addr 0x1
mod.HEAPU8.set(message, 1);
// copy whole buffer at addr 0x1
mod.HEAPU8.set(message, 1);

// execute the message
mod._interactive_one();
// execute the message
mod._interactive_one();

if (syncToFs) {
await this.#syncToFs();
}
// Read responses from the buffer
const msg_start = msg_len + 2;
const msg_end = msg_start + mod._interactive_read();
const data = mod.HEAPU8.subarray(msg_start, msg_end);

const results: Array<[BackendMessage, Uint8Array]> = [];

// Read responses from the buffer
const msg_start = msg_len + 2;
const msg_end = msg_start + mod._interactive_read();
const data = mod.HEAPU8.subarray(msg_start, msg_end);

this.#parser.parse(Buffer.from(data), (msg) => {
if (msg instanceof DatabaseError) {
this.#parser = new Parser(); // Reset the parser
throw msg;
// TODO: Do we want to wrap the error in a custom error?
} else if (msg instanceof NoticeMessage && this.debug > 0) {
// Notice messages are warnings, we should log them
console.warn(msg);
} else if (msg instanceof CommandCompleteMessage) {
// Keep track of the transaction state
switch (msg.text) {
case "BEGIN":
this.#inTransaction = true;
break;
case "COMMIT":
case "ROLLBACK":
this.#inTransaction = false;
break;
}
} else if (msg instanceof NotificationResponseMessage) {
// We've received a notification, call the listeners
const listeners = this.#notifyListeners.get(msg.channel);
if (listeners) {
listeners.forEach((cb) => {
// We use queueMicrotask so that the callback is called after any
// synchronous code has finished running.
queueMicrotask(() => cb(msg.payload));
});
}
this.#globalNotifyListeners.forEach((cb) => {
queueMicrotask(() => cb(msg.channel, msg.payload));
if (syncToFs) {
await this.#syncToFs();
}

return data;
}

/**
* Execute a postgres wire protocol message
* @param message The postgres wire protocol message to execute
* @returns The result of the query
*/
async execProtocol(
message: Uint8Array,
{ syncToFs = true }: ExecProtocolOptions = {},
): Promise<Array<[BackendMessage, Uint8Array]>> {
const data = await this.execProtocolRaw(message, { syncToFs });
const results: Array<[BackendMessage, Uint8Array]> = [];

this.#parser.parse(Buffer.from(data), (msg) => {
if (msg instanceof DatabaseError) {
this.#parser = new Parser(); // Reset the parser
throw msg;
// TODO: Do we want to wrap the error in a custom error?
} else if (msg instanceof NoticeMessage && this.debug > 0) {
// Notice messages are warnings, we should log them
console.warn(msg);
} else if (msg instanceof CommandCompleteMessage) {
// Keep track of the transaction state
switch (msg.text) {
case "BEGIN":
this.#inTransaction = true;
break;
case "COMMIT":
case "ROLLBACK":
this.#inTransaction = false;
break;
}
} else if (msg instanceof NotificationResponseMessage) {
// We've received a notification, call the listeners
const listeners = this.#notifyListeners.get(msg.channel);
if (listeners) {
listeners.forEach((cb) => {
// We use queueMicrotask so that the callback is called after any
// synchronous code has finished running.
queueMicrotask(() => cb(msg.payload));
});
}
results.push([msg, data]);
});

return results;
this.#globalNotifyListeners.forEach((cb) => {
queueMicrotask(() => cb(msg.channel, msg.payload));
});
}
results.push([msg, data]);
});

return results;
}

async #execProtocolNoSync(
Expand Down
4 changes: 4 additions & 0 deletions packages/pglite/src/worker/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ export class PGliteWorker implements PGliteInterface {
return this.#worker.transaction(callbackProxy);
}

async execProtocolRaw(message: Uint8Array): Promise<Uint8Array> {
return this.#worker.execProtocolRaw(message);
}

async execProtocol(
message: Uint8Array,
): Promise<Array<[BackendMessage, Uint8Array]>> {
Expand Down
3 changes: 3 additions & 0 deletions packages/pglite/src/worker/process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ const worker = {
return callback(Comlink.proxy(tx));
});
},
async execProtocolRaw(message: Uint8Array) {
return await db.execProtocolRaw(message);
},
async execProtocol(message: Uint8Array) {
return await db.execProtocol(message);
},
Expand Down

2 comments on commit 86cb410

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.