Skip to content

Commit

Permalink
Resolve bug preventing affectedRows count for mutation queries (#72)
Browse files Browse the repository at this point in the history
* Ensure update queries return affectedRows count

* Fix tests that fail outside of GMT timezone

* Run prettier

* Add pgdata-test to .gitignore

* Ensure `affectedRows` is present in the last result set

- Relevant for `.exec` queries containing one or more mutation queries
- Prevent `rows` from SELECT queries bleeding over into previous result sets.
  • Loading branch information
pmooney-socraticworks authored Apr 6, 2024
1 parent 68a1f7d commit 7110f37
Show file tree
Hide file tree
Showing 9 changed files with 122 additions and 75 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
node_modules
dist
.DS_Store
.DS_Store
pgdata-test
2 changes: 1 addition & 1 deletion packages/pglite/src/initdb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ export async function initDb(dataDir?: string, debug?: DebugLevel) {
locateFile: await makeLocateFile(),
...(debugMode
? { print: console.info, printErr: console.error }
: { print: () => { }, printErr: () => { } }),
: { print: () => {}, printErr: () => {} }),
arguments: [
"--boot",
"-x1",
Expand Down
10 changes: 5 additions & 5 deletions packages/pglite/src/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export type RowMode = "array" | "object";

export interface ParserOptions {
[pgType: number]: (value: string) => any;
};
}

export interface QueryOptions {
rowMode?: RowMode;
Expand All @@ -31,14 +31,14 @@ export interface PGliteInterface {
query<T>(
query: string,
params?: any[],
options?: QueryOptions
options?: QueryOptions,
): Promise<Results<T>>;
exec(query: string, options?: QueryOptions): Promise<Array<Results>>;
transaction<T>(
callback: (tx: Transaction) => Promise<T>
callback: (tx: Transaction) => Promise<T>,
): Promise<T | undefined>;
execProtocol(
message: Uint8Array
message: Uint8Array,
): Promise<Array<[BackendMessage, Uint8Array]>>;
}

Expand All @@ -54,7 +54,7 @@ export interface Transaction {
query<T>(
query: string,
params?: any[],
options?: QueryOptions
options?: QueryOptions,
): Promise<Results<T>>;
exec(query: string, options?: QueryOptions): Promise<Array<Results>>;
rollback(): Promise<void>;
Expand Down
58 changes: 35 additions & 23 deletions packages/pglite/src/parse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,50 +13,61 @@ import { parseType } from "./types.js";
*/
export function parseResults(
messages: Array<BackendMessage>,
options?: QueryOptions
options?: QueryOptions,
): Array<Results> {
const resultSets: Results[] = [];
let currentResultSet: Results | null = null;
let currentResultSet: Results = { rows: [], fields: [] };
let affectedRows = 0;

for (const msg of messages) {
const filteredMessages = messages.filter(
(msg) =>
msg instanceof RowDescriptionMessage ||
msg instanceof DataRowMessage ||
msg instanceof CommandCompleteMessage,
);

filteredMessages.forEach((msg, index) => {
if (msg instanceof RowDescriptionMessage) {
currentResultSet = {
rows: [],
fields: msg.fields.map((field) => ({
name: field.name,
dataTypeID: field.dataTypeID,
})),
};
resultSets.push(currentResultSet);
currentResultSet.fields = msg.fields.map((field) => ({
name: field.name,
dataTypeID: field.dataTypeID,
}));
} else if (msg instanceof DataRowMessage && currentResultSet) {
if (options?.rowMode === "array") {
currentResultSet.rows.push(
msg.fields.map((field, i) =>
parseType(
field,
currentResultSet!.fields[i].dataTypeID,
options?.parsers
)
)
options?.parsers,
),
),
);
} else { // rowMode === "object"
} else {
// rowMode === "object"
currentResultSet.rows.push(
Object.fromEntries(
msg.fields.map((field, i) => [
currentResultSet!.fields[i].name,
parseType(
field,
currentResultSet!.fields[i].dataTypeID,
options?.parsers
options?.parsers,
),
])
)
]),
),
);
}
} else if (msg instanceof CommandCompleteMessage && currentResultSet) {
currentResultSet.affectedRows = affectedRows(msg);
} else if (msg instanceof CommandCompleteMessage) {
affectedRows += retrieveRowCount(msg);

if (index === filteredMessages.length - 1)
resultSets.push({ ...currentResultSet, affectedRows });
else resultSets.push(currentResultSet);

currentResultSet = { rows: [], fields: [] };
}
}
});

if (resultSets.length === 0) {
resultSets.push({
Expand All @@ -68,13 +79,14 @@ export function parseResults(
return resultSets;
}

function affectedRows(msg: CommandCompleteMessage): number {
function retrieveRowCount(msg: CommandCompleteMessage): number {
const parts = msg.text.split(" ");
switch (parts[0]) {
case "INSERT":
return parseInt(parts[2], 10);
case "UPDATE":
case "DELETE":
return parseInt(parts[1]);
return parseInt(parts[1], 10);
default:
return 0;
}
Expand Down
41 changes: 31 additions & 10 deletions packages/pglite/src/pglite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,11 @@ export class PGlite implements PGliteInterface {
* @param params Optional parameters for the query
* @returns The result of the query
*/
async query<T>(query: string, params?: any[], options?: QueryOptions): Promise<Results<T>> {
async query<T>(
query: string,
params?: any[],
options?: QueryOptions,
): Promise<Results<T>> {
await this.#checkReady();
// We wrap the public query method in the transaction mutex to ensure that
// only one query can be executed at a time and not concurrently with a
Expand Down Expand Up @@ -243,7 +247,11 @@ export class PGlite implements PGliteInterface {
* @param params Optional parameters for the query
* @returns The result of the query
*/
async #runQuery<T>(query: string, params?: any[], options?: QueryOptions): Promise<Results<T>> {
async #runQuery<T>(
query: string,
params?: any[],
options?: QueryOptions,
): Promise<Results<T>> {
return await this.#queryMutex.runExclusive(async () => {
// We need to parse, bind and execute a query with parameters
if (this.debug > 1) {
Expand All @@ -257,20 +265,23 @@ export class PGlite implements PGliteInterface {
serialize.parse({
text: query,
types: parsedParams.map(([, type]) => type),
})
}),
)),
...(await this.execProtocol(
serialize.bind({
values: parsedParams.map(([val]) => val),
})
}),
)),
...(await this.execProtocol(serialize.describe({ type: "P" }))),
...(await this.execProtocol(serialize.execute({}))),
];
} finally {
await this.execProtocol(serialize.sync());
}
return parseResults(results.map(([msg]) => msg), options)[0] as Results<T>;
return parseResults(
results.map(([msg]) => msg),
options,
)[0] as Results<T>;
});
}

Expand All @@ -281,7 +292,10 @@ export class PGlite implements PGliteInterface {
* @param params Optional parameters for the query
* @returns The result of the query
*/
async #runExec(query: string, options?: QueryOptions): Promise<Array<Results>> {
async #runExec(
query: string,
options?: QueryOptions,
): Promise<Array<Results>> {
return await this.#queryMutex.runExclusive(async () => {
// No params so we can just send the query
if (this.debug > 1) {
Expand All @@ -293,7 +307,10 @@ export class PGlite implements PGliteInterface {
} finally {
await this.execProtocol(serialize.sync());
}
return parseResults(results.map(([msg]) => msg), options) as Array<Results>;
return parseResults(
results.map(([msg]) => msg),
options,
) as Array<Results>;
});
}

Expand All @@ -303,7 +320,7 @@ export class PGlite implements PGliteInterface {
* @returns The result of the transaction
*/
async transaction<T>(
callback: (tx: Transaction) => Promise<T>
callback: (tx: Transaction) => Promise<T>,
): Promise<T | undefined> {
await this.#checkReady();
return await this.#transactionMutex.runExclusive(async () => {
Expand All @@ -319,7 +336,11 @@ export class PGlite implements PGliteInterface {

try {
const tx: Transaction = {
query: async (query: string, params?: any[], options?: QueryOptions) => {
query: async (
query: string,
params?: any[],
options?: QueryOptions,
) => {
checkClosed();
return await this.#runQuery(query, params, options);
},
Expand Down Expand Up @@ -376,7 +397,7 @@ export class PGlite implements PGliteInterface {
* @returns The result of the query
*/
async execProtocol(
message: Uint8Array
message: Uint8Array,
): Promise<Array<[BackendMessage, Uint8Array]>> {
return await this.#executeMutex.runExclusive(async () => {
if (this.#resultAccumulator.length > 0) {
Expand Down
10 changes: 6 additions & 4 deletions packages/pglite/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ export function parseArray(value: string, parser?: (s: string) => any) {
export function parseType(
x: string,
type: number,
parsers?: ParserOptions
parsers?: ParserOptions,
): any {
if (x === null) {
return null;
Expand All @@ -303,7 +303,7 @@ function typeHandlers(types: TypeHandlers) {
serializers[k] = theSerializer;
if (types[k].js) {
types[k].js.forEach((Type: any) =>
serializerInstanceof.push([Type, theSerializer])
serializerInstanceof.push([Type, theSerializer]),
);
}
if (parse) {
Expand All @@ -317,11 +317,13 @@ function typeHandlers(types: TypeHandlers) {
return { parsers, serializers, serializerInstanceof };
},
{
parsers: {} as { [key: number | string]: (x: string, typeId?: number) => any },
parsers: {} as {
[key: number | string]: (x: string, typeId?: number) => any;
},
serializers: {} as {
[key: number | string]: Serializer;
},
serializerInstanceof: [] as Array<[any, Serializer]>,
}
},
);
}
8 changes: 6 additions & 2 deletions packages/pglite/src/worker/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,11 @@ export class PGliteWorker implements PGliteInterface {
this.#closed = true;
}

async query<T>(query: string, params?: any[], options?: QueryOptions): Promise<Results<T>> {
async query<T>(
query: string,
params?: any[],
options?: QueryOptions,
): Promise<Results<T>> {
return this.#worker.query(query, params, options) as Promise<Results<T>>;
}

Expand All @@ -68,7 +72,7 @@ export class PGliteWorker implements PGliteInterface {
}

async execProtocol(
message: Uint8Array
message: Uint8Array,
): Promise<Array<[BackendMessage, Uint8Array]>> {
return this.#worker.execProtocol(message);
}
Expand Down
Loading

0 comments on commit 7110f37

Please sign in to comment.