Skip to content

Commit c7c630f

Browse files
committed
refactor: implement ClientWrapper
1 parent cd94f24 commit c7c630f

17 files changed

+176
-109
lines changed

common/lib/aws_pool_client.ts

-2
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717
export interface AwsPoolClient {
1818
connect(): Promise<any>;
1919

20-
end(poolClient: any): Promise<any>;
21-
2220
releaseResources(): Promise<void>;
2321

2422
getIdleCount(): number;

common/lib/client_wrapper.ts

+5-1
Original file line numberDiff line numberDiff line change
@@ -19,5 +19,9 @@ import { HostInfo } from "./host_info";
1919
export interface ClientWrapper {
2020
readonly client: any;
2121
readonly hostInfo: HostInfo;
22-
readonly properties: Map<string, any>;
22+
readonly properties: Map<string, string>;
23+
24+
end(): Promise<void>;
25+
rollback(): Promise<void>;
26+
destroy(): Promise<void>;
2327
}

common/lib/connection_provider.ts

-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import { ClientWrapper } from "./client_wrapper";
2121

2222
export interface ConnectionProvider {
2323
connect(hostInfo: HostInfo, pluginService: PluginService, props: Map<string, any>): Promise<ClientWrapper>;
24-
end(pluginService: PluginService, clientWrapper: ClientWrapper | undefined): Promise<void>;
2524
acceptsUrl(hostInfo: HostInfo, props: Map<string, any>): boolean;
2625
acceptsStrategy(role: HostRole, strategy: string): boolean;
2726
getHostInfoByStrategy(hosts: HostInfo[], role: HostRole, strategy: string, props?: Map<string, any>): HostInfo;

common/lib/driver_connection_provider.ts

+3-20
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ export class DriverConnectionProvider implements ConnectionProvider {
5555

5656
const driverDialect: DriverDialect = pluginService.getDriverDialect();
5757
try {
58-
const targetClient: any = await driverDialect.connect(props);
58+
const targetClient: any = await driverDialect.connect(hostInfo, props);
5959
connectionHostInfo = new HostInfoBuilder({
6060
hostAvailabilityStrategy: hostInfo.hostAvailabilityStrategy
6161
})
@@ -92,12 +92,6 @@ export class DriverConnectionProvider implements ConnectionProvider {
9292
const originalHost: string = hostInfo.host;
9393
const fixedHost: string = this.rdsUtils.removeGreenInstancePrefix(hostInfo.host);
9494
resultProps.set(WrapperProperties.HOST.name, fixedHost);
95-
connectionHostInfo = new HostInfoBuilder({
96-
hostAvailabilityStrategy: hostInfo.hostAvailabilityStrategy
97-
})
98-
.copyFrom(hostInfo)
99-
.withHost(fixedHost)
100-
.build();
10195

10296
logger.info(
10397
"Connecting to " +
@@ -108,21 +102,10 @@ export class DriverConnectionProvider implements ConnectionProvider {
108102
JSON.stringify(Object.fromEntries(maskProperties(resultProps)))
109103
);
110104

111-
resultTargetClient = driverDialect.connect(resultProps);
105+
resultTargetClient = driverDialect.connect(hostInfo, resultProps);
112106
}
113107

114-
return {
115-
client: resultTargetClient,
116-
hostInfo: connectionHostInfo,
117-
properties: resultProps
118-
};
119-
}
120-
121-
async end(pluginService: PluginService, clientWrapper: ClientWrapper | undefined): Promise<void> {
122-
if (clientWrapper === undefined) {
123-
return;
124-
}
125-
return await pluginService.getDriverDialect().end(clientWrapper);
108+
return resultTargetClient;
126109
}
127110

128111
getHostInfoByStrategy(hosts: HostInfo[], role: HostRole, strategy: string, props?: Map<string, any>): HostInfo {

common/lib/driver_dialect/driver_dialect.ts

+2-3
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,12 @@
1717
import { ClientWrapper } from "../client_wrapper";
1818
import { AwsPoolConfig } from "../aws_pool_config";
1919
import { AwsPoolClient } from "../aws_pool_client";
20+
import { HostInfo } from "../host_info";
2021

2122
export interface DriverDialect {
2223
getDialectName(): string;
2324
abort(targetClient: ClientWrapper): Promise<void>;
24-
rollback(targetClient: ClientWrapper): Promise<any>;
25-
connect(props: Map<string, any>): Promise<any>;
26-
end(targetClient: ClientWrapper | undefined): Promise<void>;
25+
connect(hostInfo: HostInfo, props: Map<string, any>): Promise<ClientWrapper>;
2726
preparePoolClientProperties(props: Map<string, any>, poolConfig: AwsPoolConfig | undefined): any;
2827
getAwsPoolClient(props: any): AwsPoolClient;
2928
}

common/lib/internal_pooled_connection_provider.ts

+5-16
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import { logger } from "../logutils";
3737
import { RoundRobinHostSelector } from "./round_robin_host_selector";
3838
import { AwsPoolClient } from "./aws_pool_client";
3939
import { AwsPoolConfig } from "./aws_pool_config";
40+
import { PoolClientWrapper } from "./pool_client_wrapper";
4041

4142
export class InternalPooledConnectionProvider implements PooledConnectionProvider, CanReleaseResources {
4243
private static readonly acceptedStrategies: Map<string, HostSelector> = new Map([
@@ -108,28 +109,16 @@ export class InternalPooledConnectionProvider implements PooledConnectionProvide
108109
const preparedConfig = dialect.preparePoolClientProperties(props, this._poolConfig);
109110

110111
this.internalPool = this.databasePools.computeIfAbsent(
111-
new PoolKey(hostInfo.url, this.getPoolKey(hostInfo, props)),
112+
new PoolKey(connectionHostInfo.url, this.getPoolKey(connectionHostInfo, props)),
112113
() => dialect.getAwsPoolClient(preparedConfig),
113114
this.poolExpirationCheckNanos
114115
);
115116

116-
const poolClient = await this.getPoolConnection();
117-
118-
return {
119-
client: poolClient,
120-
hostInfo: connectionHostInfo,
121-
properties: props
122-
};
123-
}
124-
125-
async end(pluginService: PluginService, clientWrapper: ClientWrapper | undefined): Promise<void> {
126-
if (this.internalPool && clientWrapper) {
127-
return this.internalPool.end(clientWrapper.client);
128-
}
117+
return await this.getPoolConnection(hostInfo, props);
129118
}
130119

131-
async getPoolConnection() {
132-
return this.internalPool!.connect();
120+
async getPoolConnection(hostInfo: HostInfo, props: Map<string, string>) {
121+
return new PoolClientWrapper(this.internalPool!.connect(), this.internalPool!, hostInfo, props);
133122
}
134123

135124
public async releaseResources() {

common/lib/mysql_client_wrapper.ts

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License").
5+
You may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
import { ClientWrapper } from "./client_wrapper";
18+
import { HostInfo } from "./host_info";
19+
20+
export class MySQLClientWrapper implements ClientWrapper {
21+
readonly client: any;
22+
readonly hostInfo: HostInfo;
23+
readonly properties: Map<string, string>;
24+
25+
constructor(targetClient: any, hostInfo: HostInfo, properties: Map<string, string>) {
26+
this.client = targetClient;
27+
this.hostInfo = hostInfo;
28+
this.properties = properties;
29+
}
30+
31+
end(): Promise<void> {
32+
return this.client.end();
33+
}
34+
35+
destroy(): Promise<void> {
36+
return this.client.destroy();
37+
}
38+
39+
rollback(): Promise<void> {
40+
return this.client.rollback();
41+
}
42+
}

common/lib/pg_client_wrapper.ts

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License").
5+
You may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
import { ClientWrapper } from "./client_wrapper";
18+
import { HostInfo } from "./host_info";
19+
20+
export class PgClientWrapper implements ClientWrapper {
21+
readonly client: any;
22+
readonly hostInfo: HostInfo;
23+
readonly properties: Map<string, string>;
24+
25+
constructor(targetClient: any, hostInfo: HostInfo, properties: Map<string, string>) {
26+
this.client = targetClient;
27+
this.hostInfo = hostInfo;
28+
this.properties = properties;
29+
}
30+
31+
end(): Promise<void> {
32+
return this.client.end();
33+
}
34+
35+
rollback(): Promise<void> {
36+
return this.client.rollback();
37+
}
38+
39+
destroy(): Promise<void> {
40+
return this.client.destroy();
41+
}
42+
}

common/lib/plugin_service.ts

+2-6
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,7 @@ export class PluginService implements ErrorHandler, HostListProviderService {
356356

357357
if (oldClient && (isInTransaction || WrapperProperties.ROLLBACK_ON_SWITCH.get(this.props))) {
358358
try {
359-
await this.getDriverDialect().rollback(oldClient);
359+
await oldClient.rollback();
360360
} catch (error: any) {
361361
// Ignore.
362362
}
@@ -404,7 +404,7 @@ export class PluginService implements ErrorHandler, HostListProviderService {
404404
}
405405
}
406406

407-
async abortTargetClient(targetClient: ClientWrapper | undefined): Promise<void> {
407+
async abortTargetClient(targetClient: ClientWrapper | undefined | null): Promise<void> {
408408
if (targetClient) {
409409
await this.getDriverDialect().abort(targetClient);
410410
}
@@ -483,10 +483,6 @@ export class PluginService implements ErrorHandler, HostListProviderService {
483483
return this._hostListProvider?.getHostRole(client, this.dialect);
484484
}
485485

486-
async rollback(targetClient: ClientWrapper) {
487-
return await this.getDriverDialect().rollback(targetClient);
488-
}
489-
490486
getTelemetryFactory(): TelemetryFactory {
491487
return this.pluginServiceManagerContainer.pluginManager!.getTelemetryFactory();
492488
}

common/lib/plugins/failover/writer_failover_handler.ts

+5-11
Original file line numberDiff line numberDiff line change
@@ -225,9 +225,7 @@ class ReconnectToWriterHandlerTask {
225225

226226
try {
227227
while (latestTopology.length === 0 && Date.now() < this.endTime && !this.failoverCompleted) {
228-
if (this.currentClient) {
229-
await this.pluginService.abortTargetClient(this.currentClient);
230-
}
228+
await this.pluginService.abortTargetClient(this.currentClient);
231229

232230
try {
233231
const props = new Map(this.initialConnectionProps);
@@ -275,7 +273,7 @@ class ReconnectToWriterHandlerTask {
275273
this.failoverCompleted = true;
276274

277275
// Task B was returned.
278-
if (selectedTask && selectedTask === ClusterAwareWriterFailoverHandler.WAIT_NEW_WRITER_TASK && this.currentClient) {
276+
if (selectedTask && selectedTask === ClusterAwareWriterFailoverHandler.WAIT_NEW_WRITER_TASK) {
279277
await this.pluginService.abortTargetClient(this.currentClient);
280278
}
281279
}
@@ -428,7 +426,7 @@ class WaitForNewWriterHandlerTask {
428426
const props = new Map(this.initialConnectionProps);
429427
props.set(WrapperProperties.HOST.name, writerCandidate.host);
430428

431-
let targetClient;
429+
let targetClient = null;
432430
try {
433431
targetClient = await this.pluginService.forceConnect(writerCandidate, props);
434432
this.pluginService.setAvailability(writerCandidate.allAliases, HostAvailability.AVAILABLE);
@@ -438,9 +436,7 @@ class WaitForNewWriterHandlerTask {
438436
return true;
439437
} catch (error) {
440438
this.pluginService.setAvailability(writerCandidate.allAliases, HostAvailability.NOT_AVAILABLE);
441-
if (targetClient) {
442-
await this.pluginService.abortTargetClient(targetClient);
443-
}
439+
await this.pluginService.abortTargetClient(targetClient);
444440
return false;
445441
}
446442
}
@@ -483,8 +479,6 @@ class WaitForNewWriterHandlerTask {
483479
}
484480

485481
async callCloseClient(targetClient: ClientWrapper | null) {
486-
if (targetClient) {
487-
await this.pluginService.abortTargetClient(targetClient);
488-
}
482+
await this.pluginService.abortTargetClient(targetClient);
489483
}
490484
}

common/lib/pool_client_wrapper.ts

+45
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License").
5+
You may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
import { AwsPoolClient } from "./aws_pool_client";
18+
import { ClientWrapper } from "./client_wrapper";
19+
import { HostInfo } from "./host_info";
20+
21+
export class PoolClientWrapper implements ClientWrapper {
22+
private readonly pool: AwsPoolClient;
23+
readonly client: any;
24+
readonly hostInfo: HostInfo;
25+
readonly properties: Map<string, string>;
26+
27+
constructor(targetClient: any, pool: AwsPoolClient, hostInfo: HostInfo, properties: Map<string, string>) {
28+
this.client = targetClient;
29+
this.pool = pool;
30+
this.hostInfo = hostInfo;
31+
this.properties = properties;
32+
}
33+
34+
end(): Promise<void> {
35+
return this.client.release();
36+
}
37+
38+
destroy(): Promise<void> {
39+
return this.client.destroy();
40+
}
41+
42+
rollback(): Promise<void> {
43+
return this.client.rollback();
44+
}
45+
}

mysql/lib/client.ts

+3-13
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import { Messages } from "../../common/lib/utils/messages";
3030
import { ClientWrapper } from "../../common/lib/client_wrapper";
3131
import { ClientUtils } from "../../common/lib/utils/client_utils";
3232
import { RdsMultiAZMySQLDatabaseDialect } from "./dialect/rds_multi_az_mysql_database_dialect";
33-
import { HostInfo } from "../../common/lib/host_info";
3433
import { TelemetryTraceLevel } from "../../common/lib/utils/telemetry/telemetry_trace_level";
3534
import { MySQL2DriverDialect } from "./dialect/mysql2_driver_dialect";
3635

@@ -220,37 +219,28 @@ export class AwsMySQLClient extends AwsClient {
220219
return;
221220
}
222221

223-
const hostInfo: HostInfo | null = this.pluginService.getCurrentHostInfo();
224222
const result = await this.pluginManager.execute(
225223
this.pluginService.getCurrentHostInfo(),
226224
this.properties,
227225
"end",
228226
() => {
229-
return ClientUtils.queryWithTimeout(
230-
this.pluginService
231-
.getConnectionProvider(hostInfo, this.properties)
232-
.end(this.pluginService, this.targetClient)
233-
.catch((error: any) => {
234-
// ignore
235-
}),
236-
this.properties
237-
);
227+
return ClientUtils.queryWithTimeout(this.targetClient?.end() ?? Promise.resolve(), this.properties);
238228
},
239229
null
240230
);
241231
await this.releaseResources();
242232
return result;
243233
}
244234

245-
async rollback(): Promise<Query> {
235+
async rollback(): Promise<any> {
246236
return this.pluginManager.execute(
247237
this.pluginService.getCurrentHostInfo(),
248238
this.properties,
249239
"rollback",
250240
async () => {
251241
if (this.targetClient) {
252242
this.pluginService.updateInTransaction("rollback");
253-
return await this.pluginService.getDriverDialect().rollback(this.targetClient);
243+
return await this.targetClient.rollback();
254244
}
255245
return null;
256246
},

0 commit comments

Comments
 (0)