Skip to content

Commit b4a951c

Browse files
committed
refactor: implement ClientWrapper
1 parent cd94f24 commit b4a951c

28 files changed

+289
-195
lines changed

common/lib/aws_pool_client.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
export interface AwsPoolClient {
1818
connect(): Promise<any>;
1919

20-
end(poolClient: any): Promise<any>;
20+
end(): Promise<any>;
2121

2222
releaseResources(): Promise<void>;
2323

common/lib/client_wrapper.ts

+8
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,12 @@ export interface ClientWrapper {
2020
readonly client: any;
2121
readonly hostInfo: HostInfo;
2222
readonly properties: Map<string, any>;
23+
24+
query(sql: any): Promise<any>;
25+
26+
end(): Promise<void>;
27+
28+
rollback(): Promise<void>;
29+
30+
abort(): Promise<void>;
2331
}

common/lib/connection_provider.ts

-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import { ClientWrapper } from "./client_wrapper";
2121

2222
export interface ConnectionProvider {
2323
connect(hostInfo: HostInfo, pluginService: PluginService, props: Map<string, any>): Promise<ClientWrapper>;
24-
end(pluginService: PluginService, clientWrapper: ClientWrapper | undefined): Promise<void>;
2524
acceptsUrl(hostInfo: HostInfo, props: Map<string, any>): boolean;
2625
acceptsStrategy(role: HostRole, strategy: string): boolean;
2726
getHostInfoByStrategy(hosts: HostInfo[], role: HostRole, strategy: string, props?: Map<string, any>): HostInfo;

common/lib/driver_connection_provider.ts

+3-20
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ export class DriverConnectionProvider implements ConnectionProvider {
5555

5656
const driverDialect: DriverDialect = pluginService.getDriverDialect();
5757
try {
58-
const targetClient: any = await driverDialect.connect(props);
58+
const targetClient: any = await driverDialect.connect(hostInfo, props);
5959
connectionHostInfo = new HostInfoBuilder({
6060
hostAvailabilityStrategy: hostInfo.hostAvailabilityStrategy
6161
})
@@ -92,12 +92,6 @@ export class DriverConnectionProvider implements ConnectionProvider {
9292
const originalHost: string = hostInfo.host;
9393
const fixedHost: string = this.rdsUtils.removeGreenInstancePrefix(hostInfo.host);
9494
resultProps.set(WrapperProperties.HOST.name, fixedHost);
95-
connectionHostInfo = new HostInfoBuilder({
96-
hostAvailabilityStrategy: hostInfo.hostAvailabilityStrategy
97-
})
98-
.copyFrom(hostInfo)
99-
.withHost(fixedHost)
100-
.build();
10195

10296
logger.info(
10397
"Connecting to " +
@@ -108,21 +102,10 @@ export class DriverConnectionProvider implements ConnectionProvider {
108102
JSON.stringify(Object.fromEntries(maskProperties(resultProps)))
109103
);
110104

111-
resultTargetClient = driverDialect.connect(resultProps);
105+
resultTargetClient = driverDialect.connect(hostInfo, resultProps);
112106
}
113107

114-
return {
115-
client: resultTargetClient,
116-
hostInfo: connectionHostInfo,
117-
properties: resultProps
118-
};
119-
}
120-
121-
async end(pluginService: PluginService, clientWrapper: ClientWrapper | undefined): Promise<void> {
122-
if (clientWrapper === undefined) {
123-
return;
124-
}
125-
return await pluginService.getDriverDialect().end(clientWrapper);
108+
return resultTargetClient;
126109
}
127110

128111
getHostInfoByStrategy(hosts: HostInfo[], role: HostRole, strategy: string, props?: Map<string, any>): HostInfo {

common/lib/driver_dialect/driver_dialect.ts

+5-4
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,14 @@
1717
import { ClientWrapper } from "../client_wrapper";
1818
import { AwsPoolConfig } from "../aws_pool_config";
1919
import { AwsPoolClient } from "../aws_pool_client";
20+
import { HostInfo } from "../host_info";
2021

2122
export interface DriverDialect {
2223
getDialectName(): string;
23-
abort(targetClient: ClientWrapper): Promise<void>;
24-
rollback(targetClient: ClientWrapper): Promise<any>;
25-
connect(props: Map<string, any>): Promise<any>;
26-
end(targetClient: ClientWrapper | undefined): Promise<void>;
24+
25+
connect(hostInfo: HostInfo, props: Map<string, any>): Promise<ClientWrapper>;
26+
2727
preparePoolClientProperties(props: Map<string, any>, poolConfig: AwsPoolConfig | undefined): any;
28+
2829
getAwsPoolClient(props: any): AwsPoolClient;
2930
}

common/lib/internal_pooled_connection_provider.ts

+10-26
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import { logger } from "../logutils";
3737
import { RoundRobinHostSelector } from "./round_robin_host_selector";
3838
import { AwsPoolClient } from "./aws_pool_client";
3939
import { AwsPoolConfig } from "./aws_pool_config";
40+
import { PoolClientWrapper } from "./pool_client_wrapper";
4041

4142
export class InternalPooledConnectionProvider implements PooledConnectionProvider, CanReleaseResources {
4243
private static readonly acceptedStrategies: Map<string, HostSelector> = new Map([
@@ -108,28 +109,16 @@ export class InternalPooledConnectionProvider implements PooledConnectionProvide
108109
const preparedConfig = dialect.preparePoolClientProperties(props, this._poolConfig);
109110

110111
this.internalPool = this.databasePools.computeIfAbsent(
111-
new PoolKey(hostInfo.url, this.getPoolKey(hostInfo, props)),
112+
new PoolKey(connectionHostInfo.url, this.getPoolKey(connectionHostInfo, props)),
112113
() => dialect.getAwsPoolClient(preparedConfig),
113114
this.poolExpirationCheckNanos
114115
);
115116

116-
const poolClient = await this.getPoolConnection();
117-
118-
return {
119-
client: poolClient,
120-
hostInfo: connectionHostInfo,
121-
properties: props
122-
};
123-
}
124-
125-
async end(pluginService: PluginService, clientWrapper: ClientWrapper | undefined): Promise<void> {
126-
if (this.internalPool && clientWrapper) {
127-
return this.internalPool.end(clientWrapper.client);
128-
}
117+
return await this.getPoolConnection(hostInfo, props);
129118
}
130119

131-
async getPoolConnection() {
132-
return this.internalPool!.connect();
120+
async getPoolConnection(hostInfo: HostInfo, props: Map<string, string>) {
121+
return new PoolClientWrapper(await this.internalPool!.connect(), hostInfo, props);
133122
}
134123

135124
public async releaseResources() {
@@ -172,17 +161,12 @@ export class InternalPooledConnectionProvider implements PooledConnectionProvide
172161
}
173162

174163
logConnections() {
175-
for (const [key, val] of this.databasePools.entries) {
176-
logger.debug(
177-
"Internal Pooled Connection: \t\n[ " +
178-
key.getPoolKeyString() +
179-
"\n\t {\n\t\t\t" +
180-
JSON.stringify(val.item.constructor.name) +
181-
"\t(expirationTimeNs: " +
182-
val.expirationTimeNs +
183-
")\n\t }\n]"
184-
);
164+
if (this.databasePools.size === 0) {
165+
return;
185166
}
167+
168+
const l = Array.from(this.databasePools.entries).map(([v, k]) => [v.getPoolKeyString(), k.item.constructor.name]);
169+
logger.debug(`Internal Pooled Connection: [${JSON.stringify(l)}]`);
186170
}
187171

188172
setDatabasePools(connectionPools: SlidingExpirationCache<PoolKey, any>): void {

common/lib/mysql_client_wrapper.ts

+61
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License").
5+
You may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
import { ClientWrapper } from "./client_wrapper";
18+
import { HostInfo } from "./host_info";
19+
import { ClientUtils } from "./utils/client_utils";
20+
21+
/*
22+
This is an internal wrapper class for the target community driver client created by the MySQL2DriverDialect.
23+
*/
24+
export class MySQLClientWrapper implements ClientWrapper {
25+
readonly client: any;
26+
readonly hostInfo: HostInfo;
27+
readonly properties: Map<string, string>;
28+
29+
/**
30+
* Creates a wrapper for the target community driver client.
31+
*
32+
* @param targetClient The community driver client created for an instance.
33+
* @param hostInfo Host information for the connected instance.
34+
* @param properties Connection properties for the target client.
35+
*/
36+
constructor(targetClient: any, hostInfo: HostInfo, properties: Map<string, any>) {
37+
this.client = targetClient;
38+
this.hostInfo = hostInfo;
39+
this.properties = properties;
40+
}
41+
42+
query(sql: any): Promise<any> {
43+
return this.client?.query(sql);
44+
}
45+
46+
end(): Promise<void> {
47+
return this.client?.end();
48+
}
49+
50+
rollback(): Promise<void> {
51+
return this.client?.rollback();
52+
}
53+
54+
async abort(): Promise<void> {
55+
try {
56+
return await ClientUtils.queryWithTimeout(this.client?.destroy(), this.properties);
57+
} catch (error: any) {
58+
// ignore
59+
}
60+
}
61+
}

common/lib/pg_client_wrapper.ts

+60
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License").
5+
You may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
import { ClientWrapper } from "./client_wrapper";
18+
import { HostInfo } from "./host_info";
19+
20+
/*
21+
This an internal wrapper class for a target community driver client created by the NodePostgresPgDriverDialect.
22+
*/
23+
export class PgClientWrapper implements ClientWrapper {
24+
readonly client: any;
25+
readonly hostInfo: HostInfo;
26+
readonly properties: Map<string, string>;
27+
28+
/**
29+
* Creates a wrapper for the target community driver client.
30+
*
31+
* @param targetClient The community driver client created for an instance.
32+
* @param hostInfo Host information for the connected instance.
33+
* @param properties Connection properties for the target client.
34+
*/
35+
constructor(targetClient: any, hostInfo: HostInfo, properties: Map<string, any>) {
36+
this.client = targetClient;
37+
this.hostInfo = hostInfo;
38+
this.properties = properties;
39+
}
40+
41+
query(sql: any): Promise<any> {
42+
return this.client?.query(sql);
43+
}
44+
45+
end(): Promise<void> {
46+
return this.client?.end();
47+
}
48+
49+
rollback(): Promise<void> {
50+
return this.client?.rollback();
51+
}
52+
53+
async abort(): Promise<void> {
54+
try {
55+
return await this.end();
56+
} catch (error: any) {
57+
// Ignore
58+
}
59+
}
60+
}

common/lib/plugin_service.ts

+4-8
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,7 @@ export class PluginService implements ErrorHandler, HostListProviderService {
356356

357357
if (oldClient && (isInTransaction || WrapperProperties.ROLLBACK_ON_SWITCH.get(this.props))) {
358358
try {
359-
await this.getDriverDialect().rollback(oldClient);
359+
await oldClient.rollback();
360360
} catch (error: any) {
361361
// Ignore.
362362
}
@@ -400,13 +400,13 @@ export class PluginService implements ErrorHandler, HostListProviderService {
400400

401401
async abortCurrentClient(): Promise<void> {
402402
if (this._currentClient.targetClient) {
403-
await this.getDriverDialect().abort(this._currentClient.targetClient);
403+
await this._currentClient.targetClient.abort();
404404
}
405405
}
406406

407-
async abortTargetClient(targetClient: ClientWrapper | undefined): Promise<void> {
407+
async abortTargetClient(targetClient: ClientWrapper | undefined | null): Promise<void> {
408408
if (targetClient) {
409-
await this.getDriverDialect().abort(targetClient);
409+
await targetClient.abort();
410410
}
411411
}
412412

@@ -483,10 +483,6 @@ export class PluginService implements ErrorHandler, HostListProviderService {
483483
return this._hostListProvider?.getHostRole(client, this.dialect);
484484
}
485485

486-
async rollback(targetClient: ClientWrapper) {
487-
return await this.getDriverDialect().rollback(targetClient);
488-
}
489-
490486
getTelemetryFactory(): TelemetryFactory {
491487
return this.pluginServiceManagerContainer.pluginManager!.getTelemetryFactory();
492488
}

common/lib/plugins/failover/writer_failover_handler.ts

+5-11
Original file line numberDiff line numberDiff line change
@@ -225,9 +225,7 @@ class ReconnectToWriterHandlerTask {
225225

226226
try {
227227
while (latestTopology.length === 0 && Date.now() < this.endTime && !this.failoverCompleted) {
228-
if (this.currentClient) {
229-
await this.pluginService.abortTargetClient(this.currentClient);
230-
}
228+
await this.pluginService.abortTargetClient(this.currentClient);
231229

232230
try {
233231
const props = new Map(this.initialConnectionProps);
@@ -275,7 +273,7 @@ class ReconnectToWriterHandlerTask {
275273
this.failoverCompleted = true;
276274

277275
// Task B was returned.
278-
if (selectedTask && selectedTask === ClusterAwareWriterFailoverHandler.WAIT_NEW_WRITER_TASK && this.currentClient) {
276+
if (selectedTask && selectedTask === ClusterAwareWriterFailoverHandler.WAIT_NEW_WRITER_TASK) {
279277
await this.pluginService.abortTargetClient(this.currentClient);
280278
}
281279
}
@@ -428,7 +426,7 @@ class WaitForNewWriterHandlerTask {
428426
const props = new Map(this.initialConnectionProps);
429427
props.set(WrapperProperties.HOST.name, writerCandidate.host);
430428

431-
let targetClient;
429+
let targetClient = null;
432430
try {
433431
targetClient = await this.pluginService.forceConnect(writerCandidate, props);
434432
this.pluginService.setAvailability(writerCandidate.allAliases, HostAvailability.AVAILABLE);
@@ -438,9 +436,7 @@ class WaitForNewWriterHandlerTask {
438436
return true;
439437
} catch (error) {
440438
this.pluginService.setAvailability(writerCandidate.allAliases, HostAvailability.NOT_AVAILABLE);
441-
if (targetClient) {
442-
await this.pluginService.abortTargetClient(targetClient);
443-
}
439+
await this.pluginService.abortTargetClient(targetClient);
444440
return false;
445441
}
446442
}
@@ -483,8 +479,6 @@ class WaitForNewWriterHandlerTask {
483479
}
484480

485481
async callCloseClient(targetClient: ClientWrapper | null) {
486-
if (targetClient) {
487-
await this.pluginService.abortTargetClient(targetClient);
488-
}
482+
await this.pluginService.abortTargetClient(targetClient);
489483
}
490484
}

common/lib/plugins/stale_dns/stale_dns_helper.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ export class StaleDnsHelper {
124124
logger.debug(Messages.get("StaleDnsHelper.staleDnsDetected", this.writerHostInfo.host));
125125
this.staleDNSDetectedCounter.inc();
126126

127-
let targetClient;
127+
let targetClient = null;
128128
try {
129129
const newProps = new Map<string, any>(props);
130130
newProps.set(WrapperProperties.HOST.name, this.writerHostInfo.host);

0 commit comments

Comments
 (0)