Skip to content

Commit f42012d

Browse files
authored
feat: impl PoolWaitTimeoutError (#7)
Throw error when get connection wait time great than poolWaitTimeout. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Enhanced connection management in the RDSClient with configurable timeout options. - Introduced a custom error handling mechanism for connection timeout scenarios. - New property added to RDSClientOptions for specifying pool wait timeout. - Unique identifier for RDSTransaction instances to improve tracking. - **Bug Fixes** - Improved error handling for connection timeouts, providing clearer feedback to users. - **Tests** - Added a suite of tests to validate connection pooling behavior under timeout conditions. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent 194062e commit f42012d

7 files changed

+177
-9
lines changed

README.md

+7
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,13 @@ const db = new RDSClient({
4848
// connectionStorage: new AsyncLocalStorage(),
4949
// If create multiple RDSClient instances with the same connectionStorage, use this key to distinguish between the instances
5050
// connectionStorageKey: 'datasource',
51+
52+
// The timeout for connecting to the MySQL server. (Default: 500 milliseconds)
53+
// connectTimeout: 500,
54+
55+
// The timeout for waiting for a connection from the connection pool. (Default: 500 milliseconds)
56+
// So max timeout for get a connection is (connectTimeout + poolWaitTimeout)
57+
// poolWaitTimeout: 500,
5158
});
5259
```
5360

src/client.ts

+50-6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { AsyncLocalStorage } from 'node:async_hooks';
22
import { promisify } from 'node:util';
3+
import { setTimeout } from 'node:timers/promises';
34
import mysql, { Pool } from 'mysql2';
45
import type { PoolOptions } from 'mysql2';
56
import type { PoolConnectionPromisify, RDSClientOptions, TransactionContext, TransactionScope } from './types';
@@ -10,12 +11,17 @@ import literals from './literals';
1011
import channels from './channels';
1112
import type { ConnectionMessage, ConnectionEnqueueMessage } from './channels';
1213
import { RDSPoolConfig } from './PoolConfig';
14+
import { PoolWaitTimeoutError } from './util/PoolWaitTimeout';
15+
1316
export * from './types';
1417

1518
interface PoolPromisify extends Omit<Pool, 'query'> {
1619
query(sql: string): Promise<any>;
20+
1721
getConnection(): Promise<PoolConnectionPromisify>;
22+
1823
end(): Promise<void>;
24+
1925
_acquiringConnections: any[];
2026
_allConnections: any[];
2127
_freeConnections: any[];
@@ -30,21 +36,37 @@ export interface QueryOptions {
3036
}
3137

3238
export class RDSClient extends Operator {
33-
static get literals() { return literals; }
34-
static get escape() { return mysql.escape; }
35-
static get escapeId() { return mysql.escapeId; }
36-
static get format() { return mysql.format; }
37-
static get raw() { return mysql.raw; }
39+
static get literals() {
40+
return literals;
41+
}
42+
43+
static get escape() {
44+
return mysql.escape;
45+
}
46+
47+
static get escapeId() {
48+
return mysql.escapeId;
49+
}
50+
51+
static get format() {
52+
return mysql.format;
53+
}
54+
55+
static get raw() {
56+
return mysql.raw;
57+
}
3858

3959
static #DEFAULT_STORAGE_KEY = Symbol('RDSClient#storage#default');
4060
static #TRANSACTION_NEST_COUNT = Symbol('RDSClient#transaction#nestCount');
4161

4262
#pool: PoolPromisify;
4363
#connectionStorage: AsyncLocalStorage<TransactionContext>;
4464
#connectionStorageKey: string | symbol;
65+
#poolWaitTimeout: number;
4566

4667
constructor(options: RDSClientOptions) {
4768
super();
69+
options.connectTimeout = options.connectTimeout ?? 500;
4870
const { connectionStorage, connectionStorageKey, ...mysqlOptions } = options;
4971
// get connection options from getConnectionConfig method every time
5072
if (mysqlOptions.getConnectionConfig) {
@@ -61,6 +83,7 @@ export class RDSClient extends Operator {
6183
});
6284
this.#connectionStorage = connectionStorage || new AsyncLocalStorage();
6385
this.#connectionStorageKey = connectionStorageKey || RDSClient.#DEFAULT_STORAGE_KEY;
86+
this.#poolWaitTimeout = options.poolWaitTimeout ?? 500;
6487
// https://github.com/mysqljs/mysql#pool-events
6588
this.#pool.on('connection', (connection: PoolConnectionPromisify) => {
6689
channels.connectionNew.publish({
@@ -129,9 +152,30 @@ export class RDSClient extends Operator {
129152
};
130153
}
131154

155+
async waitPoolConnection(abortSignal: AbortSignal) {
156+
const now = performance.now();
157+
await setTimeout(this.#poolWaitTimeout, undefined, { signal: abortSignal });
158+
return performance.now() - now;
159+
}
160+
161+
async getConnectionWithTimeout() {
162+
const connPromise = this.#pool.getConnection();
163+
const timeoutAbortController = new AbortController();
164+
const timeoutPromise = this.waitPoolConnection(timeoutAbortController.signal);
165+
const connOrTimeout = await Promise.race([ connPromise, timeoutPromise ]);
166+
if (typeof connOrTimeout === 'number') {
167+
connPromise.then(conn => {
168+
conn.release();
169+
});
170+
throw new PoolWaitTimeoutError(`get connection timeout after ${connOrTimeout}ms`);
171+
}
172+
timeoutAbortController.abort();
173+
return connPromise;
174+
}
175+
132176
async getConnection() {
133177
try {
134-
const _conn = await this.#pool.getConnection();
178+
const _conn = await this.getConnectionWithTimeout();
135179
const conn = new RDSConnection(_conn);
136180
if (this.beforeQueryHandlers.length > 0) {
137181
for (const handler of this.beforeQueryHandlers) {

src/connection.ts

+6
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import assert from 'node:assert';
12
import { promisify } from 'node:util';
23
import { Operator } from './operator';
34
import type { PoolConnectionPromisify } from './types';
@@ -6,8 +7,11 @@ const kWrapToRDS = Symbol('kWrapToRDS');
67

78
export class RDSConnection extends Operator {
89
conn: PoolConnectionPromisify;
10+
#released: boolean;
11+
912
constructor(conn: PoolConnectionPromisify) {
1013
super(conn);
14+
this.#released = false;
1115
this.conn = conn;
1216
if (!this.conn[kWrapToRDS]) {
1317
[
@@ -23,6 +27,8 @@ export class RDSConnection extends Operator {
2327
}
2428

2529
release() {
30+
assert(!this.#released, 'connection was released');
31+
this.#released = true;
2632
return this.conn.release();
2733
}
2834

src/transaction.ts

+4
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
import type { RDSConnection } from './connection';
22
import { Operator } from './operator';
33

4+
let id = 0;
45
export class RDSTransaction extends Operator {
56
isCommit = false;
67
isRollback = false;
78
conn: RDSConnection | null;
9+
id: number;
10+
811
constructor(conn: RDSConnection) {
912
super(conn.conn);
13+
this.id = id++;
1014
this.conn = conn;
1115
}
1216

src/types.ts

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ export interface RDSClientOptions extends PoolOptions {
88
connectionStorageKey?: string;
99
connectionStorage?: AsyncLocalStorage<Record<PropertyKey, RDSTransaction>>;
1010
getConnectionConfig?: GetConnectionConfig;
11+
poolWaitTimeout?: number;
1112
}
1213

1314
export interface PoolConnectionPromisify extends Omit<PoolConnection, 'query'> {

src/util/PoolWaitTimeout.ts

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
export class PoolWaitTimeoutError extends Error {
2+
constructor(...args) {
3+
super(...args);
4+
this.name = 'PoolWaitTimeoutError';
5+
}
6+
}

test/client.test.ts

+103-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { AsyncLocalStorage } from 'node:async_hooks';
22
import { strict as assert } from 'node:assert';
33
import fs from 'node:fs/promises';
4+
import { setTimeout } from 'node:timers/promises';
45
import path from 'node:path';
56
import mm from 'mm';
67
import { RDSTransaction } from '../src/transaction';
@@ -298,8 +299,9 @@ describe('test/client.test.ts', () => {
298299
// recovered after unlock.
299300
await conn.query('select * from `myrds-test-user` limit 1;');
300301
} catch (err) {
301-
conn.release();
302302
throw err;
303+
} finally {
304+
conn.release();
303305
}
304306
});
305307

@@ -353,7 +355,8 @@ describe('test/client.test.ts', () => {
353355
});
354356

355357
it('should throw rollback error with cause error when rollback failed', async () => {
356-
mm(RDSTransaction.prototype, 'rollback', async () => {
358+
mm(RDSTransaction.prototype, 'rollback', async function(this: RDSTransaction) {
359+
this.conn!.release();
357360
throw new Error('fake rollback error');
358361
});
359362
await assert.rejects(
@@ -501,7 +504,9 @@ describe('test/client.test.ts', () => {
501504
});
502505
};
503506

504-
const [ p1Res, p2Res ] = await Promise.all([ p1(), p2().catch(err => err) ]);
507+
const [ p1Res, p2Res ] = await Promise.all([ p1(), p2().catch(err => {
508+
return err;
509+
}) ]);
505510
assert.strictEqual(p1Res, true);
506511
assert.strictEqual(p2Res.code, 'ER_PARSE_ERROR');
507512
const rows = await db.query('select * from ?? where email=? order by id',
@@ -680,6 +685,7 @@ describe('test/client.test.ts', () => {
680685
});
681686
return db;
682687
});
688+
conn.release();
683689
assert(connQuerySql);
684690
assert(!transactionQuerySql);
685691
});
@@ -1493,4 +1499,98 @@ describe('test/client.test.ts', () => {
14931499
assert.equal(counter2After, 4);
14941500
});
14951501
});
1502+
1503+
describe('PoolWaitTimeout', () => {
1504+
async function longQuery(timeout?: number) {
1505+
await db.beginTransactionScope(async conn => {
1506+
await setTimeout(timeout ?? 1000);
1507+
await conn.query('SELECT 1+1');
1508+
});
1509+
}
1510+
1511+
it('should throw error if pool wait timeout', async () => {
1512+
const tasks: Array<Promise<void>> = [];
1513+
for (let i = 0; i < 10; i++) {
1514+
tasks.push(longQuery());
1515+
}
1516+
const tasksPromise = Promise.all(tasks);
1517+
await assert.rejects(async () => {
1518+
await longQuery();
1519+
}, /get connection timeout after/);
1520+
await tasksPromise;
1521+
});
1522+
1523+
it('should release conn to pool', async () => {
1524+
const tasks: Array<Promise<void>> = [];
1525+
const timeoutTasks: Array<Promise<void>> = [];
1526+
// 1. fill the pool
1527+
for (let i = 0; i < 10; i++) {
1528+
tasks.push(longQuery());
1529+
}
1530+
// 2. add more conn and wait for timeout
1531+
for (let i = 0; i < 10; i++) {
1532+
timeoutTasks.push(longQuery());
1533+
}
1534+
const [ succeedTasks, failedTasks ] = await Promise.all([
1535+
Promise.allSettled(tasks),
1536+
Promise.allSettled(timeoutTasks),
1537+
]);
1538+
const succeedCount = succeedTasks.filter(t => t.status === 'fulfilled').length;
1539+
assert.equal(succeedCount, 10);
1540+
1541+
const failedCount = failedTasks.filter(t => t.status === 'rejected').length;
1542+
assert.equal(failedCount, 10);
1543+
1544+
// 3. after pool empty, create new tasks
1545+
const retryTasks: Array<Promise<void>> = [];
1546+
for (let i = 0; i < 10; i++) {
1547+
retryTasks.push(longQuery());
1548+
}
1549+
await Promise.all(retryTasks);
1550+
});
1551+
1552+
it('should not wait too long', async () => {
1553+
const tasks: Array<Promise<void>> = [];
1554+
const timeoutTasks: Array<Promise<void>> = [];
1555+
const fastTasks: Array<Promise<void>> = [];
1556+
const start = performance.now();
1557+
// 1. fill the pool
1558+
for (let i = 0; i < 10; i++) {
1559+
tasks.push(longQuery());
1560+
}
1561+
const tasksPromise = Promise.allSettled(tasks);
1562+
// 2. add more conn and wait for timeout
1563+
for (let i = 0; i < 10; i++) {
1564+
timeoutTasks.push(longQuery());
1565+
}
1566+
const timeoutTasksPromise = Promise.allSettled(timeoutTasks);
1567+
await setTimeout(600);
1568+
// 3. add fast query
1569+
for (let i = 0; i < 10; i++) {
1570+
fastTasks.push(longQuery(1));
1571+
}
1572+
const fastTasksPromise = Promise.allSettled(fastTasks);
1573+
const [ succeedTasks, failedTasks, fastTaskResults ] = await Promise.all([
1574+
tasksPromise,
1575+
timeoutTasksPromise,
1576+
fastTasksPromise,
1577+
]);
1578+
const duration = performance.now() - start;
1579+
const succeedCount = succeedTasks.filter(t => t.status === 'fulfilled').length;
1580+
assert.equal(succeedCount, 10);
1581+
1582+
const failedCount = failedTasks.filter(t => t.status === 'rejected').length;
1583+
assert.equal(failedCount, 10);
1584+
1585+
const faskTaskSucceedCount = fastTaskResults.filter(t => t.status === 'fulfilled').length;
1586+
assert.equal(faskTaskSucceedCount, 10);
1587+
1588+
// - 10 long queries cost 1000ms
1589+
// - 10 timeout queries should be timeout in long query execution so not cost time
1590+
// - 10 fast queries wait long query to finish, cost 1ms
1591+
// 1000ms + 0ms + 1ms < 1100ms
1592+
assert(duration < 1100);
1593+
});
1594+
1595+
});
14961596
});

0 commit comments

Comments
 (0)