From 7110f37426a21a7d2c11bfec06a6f9f6971eccf8 Mon Sep 17 00:00:00 2001 From: Peter Mooney <111302225+pmooney-socraticworks@users.noreply.github.com> Date: Sat, 6 Apr 2024 00:13:23 -0700 Subject: [PATCH] Resolve bug preventing `affectedRows` count for mutation queries (#72) * Ensure update queries return affectedRows count * Fix tests that fail outside of GMT timezone * Run prettier * Add pgdata-test to .gitignore * Ensure `affectedRows` is present in the last result set - Relevant for `.exec` queries containing one or more mutation queries - Prevent `rows` from SELECT queries bleeding over into previous result sets. --- .gitignore | 3 +- packages/pglite/src/initdb.ts | 2 +- packages/pglite/src/interface.ts | 10 ++--- packages/pglite/src/parse.ts | 58 +++++++++++++++++------------ packages/pglite/src/pglite.ts | 41 +++++++++++++++----- packages/pglite/src/types.ts | 10 +++-- packages/pglite/src/worker/index.ts | 8 +++- packages/pglite/tests/basic.test.js | 55 ++++++++++++++------------- packages/pglite/tests/types.test.js | 10 +++-- 9 files changed, 122 insertions(+), 75 deletions(-) diff --git a/.gitignore b/.gitignore index 6acc570d..cbc68900 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ node_modules dist -.DS_Store \ No newline at end of file +.DS_Store +pgdata-test \ No newline at end of file diff --git a/packages/pglite/src/initdb.ts b/packages/pglite/src/initdb.ts index 3896e7c0..5f17ffe1 100644 --- a/packages/pglite/src/initdb.ts +++ b/packages/pglite/src/initdb.ts @@ -61,7 +61,7 @@ export async function initDb(dataDir?: string, debug?: DebugLevel) { locateFile: await makeLocateFile(), ...(debugMode ? { print: console.info, printErr: console.error } - : { print: () => { }, printErr: () => { } }), + : { print: () => {}, printErr: () => {} }), arguments: [ "--boot", "-x1", diff --git a/packages/pglite/src/interface.ts b/packages/pglite/src/interface.ts index 581f209b..ab3719fa 100644 --- a/packages/pglite/src/interface.ts +++ b/packages/pglite/src/interface.ts @@ -8,7 +8,7 @@ export type RowMode = "array" | "object"; export interface ParserOptions { [pgType: number]: (value: string) => any; -}; +} export interface QueryOptions { rowMode?: RowMode; @@ -31,14 +31,14 @@ export interface PGliteInterface { query( query: string, params?: any[], - options?: QueryOptions + options?: QueryOptions, ): Promise>; exec(query: string, options?: QueryOptions): Promise>; transaction( - callback: (tx: Transaction) => Promise + callback: (tx: Transaction) => Promise, ): Promise; execProtocol( - message: Uint8Array + message: Uint8Array, ): Promise>; } @@ -54,7 +54,7 @@ export interface Transaction { query( query: string, params?: any[], - options?: QueryOptions + options?: QueryOptions, ): Promise>; exec(query: string, options?: QueryOptions): Promise>; rollback(): Promise; diff --git a/packages/pglite/src/parse.ts b/packages/pglite/src/parse.ts index 90a84411..bcc99c1a 100644 --- a/packages/pglite/src/parse.ts +++ b/packages/pglite/src/parse.ts @@ -13,21 +13,25 @@ import { parseType } from "./types.js"; */ export function parseResults( messages: Array, - options?: QueryOptions + options?: QueryOptions, ): Array { const resultSets: Results[] = []; - let currentResultSet: Results | null = null; + let currentResultSet: Results = { rows: [], fields: [] }; + let affectedRows = 0; - for (const msg of messages) { + const filteredMessages = messages.filter( + (msg) => + msg instanceof RowDescriptionMessage || + msg instanceof DataRowMessage || + msg instanceof CommandCompleteMessage, + ); + + filteredMessages.forEach((msg, index) => { if (msg instanceof RowDescriptionMessage) { - currentResultSet = { - rows: [], - fields: msg.fields.map((field) => ({ - name: field.name, - dataTypeID: field.dataTypeID, - })), - }; - resultSets.push(currentResultSet); + currentResultSet.fields = msg.fields.map((field) => ({ + name: field.name, + dataTypeID: field.dataTypeID, + })); } else if (msg instanceof DataRowMessage && currentResultSet) { if (options?.rowMode === "array") { currentResultSet.rows.push( @@ -35,11 +39,12 @@ export function parseResults( parseType( field, currentResultSet!.fields[i].dataTypeID, - options?.parsers - ) - ) + options?.parsers, + ), + ), ); - } else { // rowMode === "object" + } else { + // rowMode === "object" currentResultSet.rows.push( Object.fromEntries( msg.fields.map((field, i) => [ @@ -47,16 +52,22 @@ export function parseResults( parseType( field, currentResultSet!.fields[i].dataTypeID, - options?.parsers + options?.parsers, ), - ]) - ) + ]), + ), ); } - } else if (msg instanceof CommandCompleteMessage && currentResultSet) { - currentResultSet.affectedRows = affectedRows(msg); + } else if (msg instanceof CommandCompleteMessage) { + affectedRows += retrieveRowCount(msg); + + if (index === filteredMessages.length - 1) + resultSets.push({ ...currentResultSet, affectedRows }); + else resultSets.push(currentResultSet); + + currentResultSet = { rows: [], fields: [] }; } - } + }); if (resultSets.length === 0) { resultSets.push({ @@ -68,13 +79,14 @@ export function parseResults( return resultSets; } -function affectedRows(msg: CommandCompleteMessage): number { +function retrieveRowCount(msg: CommandCompleteMessage): number { const parts = msg.text.split(" "); switch (parts[0]) { case "INSERT": + return parseInt(parts[2], 10); case "UPDATE": case "DELETE": - return parseInt(parts[1]); + return parseInt(parts[1], 10); default: return 0; } diff --git a/packages/pglite/src/pglite.ts b/packages/pglite/src/pglite.ts index c6cd203f..c019045e 100644 --- a/packages/pglite/src/pglite.ts +++ b/packages/pglite/src/pglite.ts @@ -210,7 +210,11 @@ export class PGlite implements PGliteInterface { * @param params Optional parameters for the query * @returns The result of the query */ - async query(query: string, params?: any[], options?: QueryOptions): Promise> { + async query( + query: string, + params?: any[], + options?: QueryOptions, + ): Promise> { await this.#checkReady(); // We wrap the public query method in the transaction mutex to ensure that // only one query can be executed at a time and not concurrently with a @@ -243,7 +247,11 @@ export class PGlite implements PGliteInterface { * @param params Optional parameters for the query * @returns The result of the query */ - async #runQuery(query: string, params?: any[], options?: QueryOptions): Promise> { + async #runQuery( + query: string, + params?: any[], + options?: QueryOptions, + ): Promise> { return await this.#queryMutex.runExclusive(async () => { // We need to parse, bind and execute a query with parameters if (this.debug > 1) { @@ -257,12 +265,12 @@ export class PGlite implements PGliteInterface { serialize.parse({ text: query, types: parsedParams.map(([, type]) => type), - }) + }), )), ...(await this.execProtocol( serialize.bind({ values: parsedParams.map(([val]) => val), - }) + }), )), ...(await this.execProtocol(serialize.describe({ type: "P" }))), ...(await this.execProtocol(serialize.execute({}))), @@ -270,7 +278,10 @@ export class PGlite implements PGliteInterface { } finally { await this.execProtocol(serialize.sync()); } - return parseResults(results.map(([msg]) => msg), options)[0] as Results; + return parseResults( + results.map(([msg]) => msg), + options, + )[0] as Results; }); } @@ -281,7 +292,10 @@ export class PGlite implements PGliteInterface { * @param params Optional parameters for the query * @returns The result of the query */ - async #runExec(query: string, options?: QueryOptions): Promise> { + async #runExec( + query: string, + options?: QueryOptions, + ): Promise> { return await this.#queryMutex.runExclusive(async () => { // No params so we can just send the query if (this.debug > 1) { @@ -293,7 +307,10 @@ export class PGlite implements PGliteInterface { } finally { await this.execProtocol(serialize.sync()); } - return parseResults(results.map(([msg]) => msg), options) as Array; + return parseResults( + results.map(([msg]) => msg), + options, + ) as Array; }); } @@ -303,7 +320,7 @@ export class PGlite implements PGliteInterface { * @returns The result of the transaction */ async transaction( - callback: (tx: Transaction) => Promise + callback: (tx: Transaction) => Promise, ): Promise { await this.#checkReady(); return await this.#transactionMutex.runExclusive(async () => { @@ -319,7 +336,11 @@ export class PGlite implements PGliteInterface { try { const tx: Transaction = { - query: async (query: string, params?: any[], options?: QueryOptions) => { + query: async ( + query: string, + params?: any[], + options?: QueryOptions, + ) => { checkClosed(); return await this.#runQuery(query, params, options); }, @@ -376,7 +397,7 @@ export class PGlite implements PGliteInterface { * @returns The result of the query */ async execProtocol( - message: Uint8Array + message: Uint8Array, ): Promise> { return await this.#executeMutex.runExclusive(async () => { if (this.#resultAccumulator.length > 0) { diff --git a/packages/pglite/src/types.ts b/packages/pglite/src/types.ts index 4ffd9120..6260e64b 100644 --- a/packages/pglite/src/types.ts +++ b/packages/pglite/src/types.ts @@ -281,7 +281,7 @@ export function parseArray(value: string, parser?: (s: string) => any) { export function parseType( x: string, type: number, - parsers?: ParserOptions + parsers?: ParserOptions, ): any { if (x === null) { return null; @@ -303,7 +303,7 @@ function typeHandlers(types: TypeHandlers) { serializers[k] = theSerializer; if (types[k].js) { types[k].js.forEach((Type: any) => - serializerInstanceof.push([Type, theSerializer]) + serializerInstanceof.push([Type, theSerializer]), ); } if (parse) { @@ -317,11 +317,13 @@ function typeHandlers(types: TypeHandlers) { return { parsers, serializers, serializerInstanceof }; }, { - parsers: {} as { [key: number | string]: (x: string, typeId?: number) => any }, + parsers: {} as { + [key: number | string]: (x: string, typeId?: number) => any; + }, serializers: {} as { [key: number | string]: Serializer; }, serializerInstanceof: [] as Array<[any, Serializer]>, - } + }, ); } diff --git a/packages/pglite/src/worker/index.ts b/packages/pglite/src/worker/index.ts index e0510efa..2a8b309f 100644 --- a/packages/pglite/src/worker/index.ts +++ b/packages/pglite/src/worker/index.ts @@ -54,7 +54,11 @@ export class PGliteWorker implements PGliteInterface { this.#closed = true; } - async query(query: string, params?: any[], options?: QueryOptions): Promise> { + async query( + query: string, + params?: any[], + options?: QueryOptions, + ): Promise> { return this.#worker.query(query, params, options) as Promise>; } @@ -68,7 +72,7 @@ export class PGliteWorker implements PGliteInterface { } async execProtocol( - message: Uint8Array + message: Uint8Array, ): Promise> { return this.#worker.execProtocol(message); } diff --git a/packages/pglite/tests/basic.test.js b/packages/pglite/tests/basic.test.js index cfe3de3d..db59c044 100644 --- a/packages/pglite/tests/basic.test.js +++ b/packages/pglite/tests/basic.test.js @@ -9,31 +9,27 @@ test("basic exec", async (t) => { name TEXT ); `); - await db.exec("INSERT INTO test (name) VALUES ('test');"); - const res = await db.exec(` + + const multiStatementResult = await db.exec(` + INSERT INTO test (name) VALUES ('test'); + UPDATE test SET name = 'test2'; SELECT * FROM test; - `); +`); - t.deepEqual(res, [ + t.deepEqual(multiStatementResult, [ { - rows: [ - { - id: 1, - name: "test", - }, - ], - fields: [ - { - name: "id", - dataTypeID: 23, - }, - { - name: "name", - dataTypeID: 25, - }, - ], - affectedRows: 0, + rows: [], + fields: [], + }, + { + rows: [], + fields: [], }, + { + rows: [ { id: 1, name: 'test2' } ], + fields: [ { name: 'id', dataTypeID: 23 }, { name: 'name', dataTypeID: 25 } ], + affectedRows: 2 + } ]); }); @@ -46,11 +42,11 @@ test("basic query", async (t) => { ); `); await db.query("INSERT INTO test (name) VALUES ('test');"); - const res = await db.query(` + const selectResult = await db.query(` SELECT * FROM test; `); - t.deepEqual(res, { + t.deepEqual(selectResult, { rows: [ { id: 1, @@ -69,6 +65,13 @@ test("basic query", async (t) => { ], affectedRows: 0, }); + + const updateResult = await db.query("UPDATE test SET name = 'test2';"); + t.deepEqual(updateResult, { + rows: [], + fields: [], + affectedRows: 1 + }) }); test("basic types", async (t) => { @@ -116,7 +119,7 @@ test("basic types", async (t) => { SELECT * FROM test; `); - t.deepEqual(res, { + t.like(res, { rows: [ { id: 1, @@ -126,7 +129,6 @@ test("basic types", async (t) => { bigint: 9223372036854775807n, bool: true, date: new Date("2021-01-01T00:00:00.000Z"), - timestamp: new Date("2021-01-01T12:00:00.000Z"), json: { test: "test" }, blob: Uint8Array.from([1, 2, 3]), array_text: ["test1", "test2", "test,3"], @@ -190,6 +192,9 @@ test("basic types", async (t) => { ], affectedRows: 0, }); + + // standardize timestamp comparison to UTC milliseconds to ensure predictable test runs on machines in different timezones. + t.deepEqual(res.rows[0].timestamp.getUTCMilliseconds(), new Date("2021-01-01T12:00:00.000Z").getUTCMilliseconds()) }); test("basic params", async (t) => { diff --git a/packages/pglite/tests/types.test.js b/packages/pglite/tests/types.test.js index b6b721d6..5a3f395f 100644 --- a/packages/pglite/tests/types.test.js +++ b/packages/pglite/tests/types.test.js @@ -55,16 +55,18 @@ test("parse date 1082", (t) => { }); test("parse timestamp 1114", (t) => { + // standardize timestamp comparison to UTC milliseconds to ensure predictable test runs on machines in different timezones. t.deepEqual( - types.parseType("2021-01-01T12:00:00", 1114), - new Date("2021-01-01T12:00:00.000Z") + types.parseType("2021-01-01T12:00:00", 1114).getUTCMilliseconds(), + new Date("2021-01-01T12:00:00.000Z").getUTCMilliseconds() ); }); test("parse timestamptz 1184", (t) => { + // standardize timestamp comparison to UTC milliseconds to ensure predictable test runs on machines in different timezones. t.deepEqual( - types.parseType("2021-01-01T12:00:00", 1184), - new Date("2021-01-01T12:00:00.000Z") + types.parseType("2021-01-01T12:00:00", 1184).getUTCMilliseconds(), + new Date("2021-01-01T12:00:00.000Z").getUTCMilliseconds() ); });