Skip to content

Commit

Permalink
chore: separate database dialect and driver dialect
Browse files Browse the repository at this point in the history
  • Loading branch information
karenc-bq committed Oct 26, 2024
1 parent 737571e commit a6f20c3
Show file tree
Hide file tree
Showing 29 changed files with 299 additions and 207 deletions.
7 changes: 6 additions & 1 deletion .eslintrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@
"@typescript-eslint/no-explicit-any": "off",
"@typescript-eslint/no-unused-vars": "off",
"@typescript-eslint/ban-ts-comment": "off",
"prettier/prettier": 2,
"prettier/prettier": [
"error",
{
"endOfLine": "auto"
}
],
"header/header": [
1,
"block",
Expand Down
11 changes: 4 additions & 7 deletions common/lib/aws_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import { ClientWrapper } from "./client_wrapper";
import { ConnectionProviderManager } from "./connection_provider_manager";
import { DefaultTelemetryFactory } from "./utils/telemetry/default_telemetry_factory";
import { TelemetryFactory } from "./utils/telemetry/telemetry_factory";
import { DriverDialect } from "./driver_dialect/driver_dialect";

export abstract class AwsClient extends EventEmitter {
private _defaultPort: number = -1;
Expand All @@ -40,7 +41,6 @@ export abstract class AwsClient extends EventEmitter {
protected _schema: string = "";
protected _isolationLevel: number = 0;
protected _errorHandler: ErrorHandler;
protected _createClientFunc?: (config: any) => any;
protected _connectionUrlParser: ConnectionUrlParser;
readonly properties: Map<string, any>;
config: any;
Expand All @@ -51,7 +51,8 @@ export abstract class AwsClient extends EventEmitter {
errorHandler: ErrorHandler,
dbType: DatabaseType,
knownDialectsByCode: Map<string, DatabaseDialect>,
parser: ConnectionUrlParser
parser: ConnectionUrlParser,
driverDialect: DriverDialect
) {
super();
this.config = config;
Expand All @@ -62,7 +63,7 @@ export abstract class AwsClient extends EventEmitter {

this.telemetryFactory = new DefaultTelemetryFactory(this.properties);
const container = new PluginServiceManagerContainer();
this.pluginService = new PluginService(container, this, dbType, knownDialectsByCode, this.properties);
this.pluginService = new PluginService(container, this, dbType, knownDialectsByCode, this.properties, driverDialect);
this.pluginManager = new PluginManager(
container,
this.properties,
Expand Down Expand Up @@ -111,10 +112,6 @@ export abstract class AwsClient extends EventEmitter {
return this._connectionUrlParser;
}

getCreateClientFunc<Type>(): ((config: any) => Type) | undefined {
return this._createClientFunc;
}

abstract updateSessionStateReadOnly(readOnly: boolean): Promise<any | void>;

abstract setReadOnly(readOnly: boolean): Promise<any | void>;
Expand Down
6 changes: 0 additions & 6 deletions common/lib/database_dialect/database_dialect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,7 @@ export interface DatabaseDialect {
getServerVersionQuery(): string;
getDialectUpdateCandidates(): string[];
isDialect(targetClient: ClientWrapper): Promise<boolean>;
getAwsPoolClient(props: any): AwsPoolClient;
getHostListProvider(props: Map<string, any>, originalUrl: string, hostListProviderService: HostListProviderService): HostListProvider;
tryClosingTargetClient(targetClient: ClientWrapper): Promise<void>;
rollback(targetClient: ClientWrapper): Promise<any>;
isClientValid(targetClient: ClientWrapper): Promise<boolean>;
getDatabaseType(): DatabaseType;
getDialectName(): string;
Expand All @@ -46,7 +43,4 @@ export interface DatabaseDialect {
doesStatementSetAutoCommit(statement: string): boolean | undefined;
doesStatementSetSchema(statement: string): string | undefined;
doesStatementSetCatalog(statement: string): string | undefined;
connect(targetClient: any): Promise<any>;
end(clientWrapper: ClientWrapper | undefined): Promise<void>;
preparePoolClientProperties(props: Map<string, any>, poolConfig: AwsPoolConfig | undefined): any;
}
12 changes: 5 additions & 7 deletions common/lib/driver_connection_provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import { logger } from "../logutils";
import { maskProperties } from "./utils/utils";
import { ClientWrapper } from "./client_wrapper";
import { RoundRobinHostSelector } from "./round_robin_host_selector";
import { DatabaseDialect } from "./database_dialect/database_dialect";
import { DriverDialect } from "./driver_dialect/driver_dialect";

export class DriverConnectionProvider implements ConnectionProvider {
private static readonly acceptedStrategies: Map<string, HostSelector> = new Map([
Expand All @@ -53,9 +53,9 @@ export class DriverConnectionProvider implements ConnectionProvider {
const resultProps = new Map(props);
let connectionHostInfo: HostInfo;

const driverDialect: DriverDialect = pluginService.getDriverDialect();
try {
const targetClient: any = await Promise.resolve(pluginService.createTargetClient(props));
await pluginService.getDialect().connect(targetClient);
const targetClient: any = await driverDialect.connect(props);
connectionHostInfo = new HostInfoBuilder({
hostAvailabilityStrategy: hostInfo.hostAvailabilityStrategy
})
Expand Down Expand Up @@ -108,9 +108,7 @@ export class DriverConnectionProvider implements ConnectionProvider {
JSON.stringify(Object.fromEntries(maskProperties(resultProps)))
);

const newTargetClient = pluginService.createTargetClient(resultProps);
await pluginService.getDialect().connect(newTargetClient);
resultTargetClient = newTargetClient;
resultTargetClient = driverDialect.connect(resultProps);
}

return {
Expand All @@ -124,7 +122,7 @@ export class DriverConnectionProvider implements ConnectionProvider {
if (clientWrapper === undefined) {
return;
}
return await pluginService.getDialect().end(clientWrapper);
return await pluginService.getDriverDialect().end(clientWrapper);
}

getHostInfoByStrategy(hosts: HostInfo[], role: HostRole, strategy: string, props?: Map<string, any>): HostInfo {
Expand Down
29 changes: 29 additions & 0 deletions common/lib/driver_dialect/driver_dialect.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License").
You may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

import { ClientWrapper } from "../client_wrapper";
import { AwsPoolConfig } from "../aws_pool_config";
import { AwsPoolClient } from "../aws_pool_client";

export interface DriverDialect {
getDialectName(): string;
abort(targetClient: ClientWrapper): Promise<void>;
rollback(targetClient: ClientWrapper): Promise<any>;
connect(props: Map<string, any>): Promise<any>;
end(targetClient: ClientWrapper | undefined): Promise<void>;
preparePoolClientProperties(props: Map<string, any>, poolConfig: AwsPoolConfig | undefined): any;
getAwsPoolClient(props: any): AwsPoolClient;
}
2 changes: 1 addition & 1 deletion common/lib/internal_pooled_connection_provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ export class InternalPooledConnectionProvider implements PooledConnectionProvide
}
}

const dialect = pluginService.getDialect();
const dialect = pluginService.getDriverDialect();
const preparedConfig = dialect.preparePoolClientProperties(props, this._poolConfig);

this.internalPool = this.databasePools.computeIfAbsent(
Expand Down
32 changes: 17 additions & 15 deletions common/lib/plugin_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import { DatabaseDialectCodes } from "./database_dialect/database_dialect_codes"
import { getWriter } from "./utils/utils";
import { ConnectionProvider } from "./connection_provider";
import { TelemetryFactory } from "./utils/telemetry/telemetry_factory";
import { DriverDialect } from "./driver_dialect/driver_dialect";

export class PluginService implements ErrorHandler, HostListProviderService {
private readonly _currentClient: AwsClient;
Expand All @@ -55,6 +56,7 @@ export class PluginService implements ErrorHandler, HostListProviderService {
private dbDialectProvider: DatabaseDialectProvider;
private readonly initialHost: string;
private dialect: DatabaseDialect;
private readonly driverDialect: DriverDialect;
protected readonly sessionStateService: SessionStateService;
protected static readonly hostAvailabilityExpiringCache: CacheMap<string, HostAvailability> = new CacheMap<string, HostAvailability>();
readonly props: Map<string, any>;
Expand All @@ -64,12 +66,14 @@ export class PluginService implements ErrorHandler, HostListProviderService {
client: AwsClient,
dbType: DatabaseType,
knownDialectsByCode: Map<DatabaseDialectCodes, DatabaseDialect>,
props: Map<string, any>
props: Map<string, any>,
driverDialect: DriverDialect
) {
this._currentClient = client;
this.pluginServiceManagerContainer = container;
this.props = props;
this.dbDialectProvider = new DatabaseDialectManager(knownDialectsByCode, dbType, this.props);
this.driverDialect = driverDialect;
this.initialHost = props.get(WrapperProperties.HOST.name);
this.sessionStateService = new SessionStateServiceImpl(this, this.props);
container.pluginService = this;
Expand Down Expand Up @@ -158,6 +162,10 @@ export class PluginService implements ErrorHandler, HostListProviderService {
return this.dialect;
}

getDriverDialect(): DriverDialect {
return this.driverDialect;
}

getHostInfoBuilder(): HostInfoBuilder {
return new HostInfoBuilder({ hostAvailabilityStrategy: new HostAvailabilityStrategyFactory().create(this.props) });
}
Expand Down Expand Up @@ -310,14 +318,6 @@ export class PluginService implements ErrorHandler, HostListProviderService {
return provider.identifyConnection(targetClient, this.dialect);
}

createTargetClient(props: Map<string, any>): any {
const createClientFunc = this.getCurrentClient().getCreateClientFunc();
if (createClientFunc) {
return createClientFunc(props);
}
throw new AwsWrapperError("AwsClient is missing create target client function."); // This should not be reached
}

connect(hostInfo: HostInfo, props: Map<string, any>): Promise<ClientWrapper> {
return this.pluginServiceManagerContainer.pluginManager!.connect(hostInfo, props, false);
}
Expand Down Expand Up @@ -356,7 +356,7 @@ export class PluginService implements ErrorHandler, HostListProviderService {

if (oldClient && (isInTransaction || WrapperProperties.ROLLBACK_ON_SWITCH.get(this.props))) {
try {
await this.getDialect().rollback(oldClient);
await this.getDriverDialect().rollback(oldClient);
} catch (error: any) {
// Ignore.
}
Expand All @@ -379,7 +379,7 @@ export class PluginService implements ErrorHandler, HostListProviderService {
}

try {
await this.tryClosingTargetClient(oldClient);
await this.abortTargetClient(oldClient);
} catch (error: any) {
// Ignore.
}
Expand All @@ -400,12 +400,14 @@ export class PluginService implements ErrorHandler, HostListProviderService {

async abortCurrentClient(): Promise<void> {
if (this._currentClient.targetClient) {
await this.getDialect().tryClosingTargetClient(this._currentClient.targetClient);
await this.getDriverDialect().abort(this._currentClient.targetClient);
}
}

async tryClosingTargetClient(targetClient: ClientWrapper): Promise<void> {
await this.getDialect().tryClosingTargetClient(targetClient);
async abortTargetClient(targetClient: ClientWrapper | undefined): Promise<void> {
if (targetClient) {
await this.getDriverDialect().abort(targetClient);
}
}

getSessionStateService() {
Expand Down Expand Up @@ -482,7 +484,7 @@ export class PluginService implements ErrorHandler, HostListProviderService {
}

async rollback(targetClient: ClientWrapper) {
return await this.getDialect().rollback(targetClient);
return await this.getDriverDialect().rollback(targetClient);
}

getTelemetryFactory(): TelemetryFactory {
Expand Down
12 changes: 6 additions & 6 deletions common/lib/plugins/aurora_initial_connection_strategy_plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ export class AuroraInitialConnectionStrategyPlugin extends AbstractConnectionPlu
if (writerCandidate) {
if (writerCandidate.role !== HostRole.WRITER) {
// Shouldn't be here. But let's try again.
await this.pluginService.tryClosingTargetClient(writerCandidateClient);
await this.pluginService.abortTargetClient(writerCandidateClient);
await sleep(retryDelayMs);
continue;
}
Expand All @@ -154,7 +154,7 @@ export class AuroraInitialConnectionStrategyPlugin extends AbstractConnectionPlu
// If the new connection resolves to a reader instance, this means the topology is outdated.
// Force refresh to update the topology.
await this.pluginService.forceRefreshHostList(writerCandidateClient);
await this.pluginService.tryClosingTargetClient(writerCandidateClient);
await this.pluginService.abortTargetClient(writerCandidateClient);
await sleep(retryDelayMs);
continue;
}
Expand All @@ -165,7 +165,7 @@ export class AuroraInitialConnectionStrategyPlugin extends AbstractConnectionPlu
}
return writerCandidateClient;
} catch (error: any) {
await this.pluginService.tryClosingTargetClient(writerCandidateClient);
await this.pluginService.abortTargetClient(writerCandidateClient);
if (this.pluginService.isLoginError(error) || !writerCandidate) {
throw error;
} else if (writerCandidate) {
Expand Down Expand Up @@ -211,7 +211,7 @@ export class AuroraInitialConnectionStrategyPlugin extends AbstractConnectionPlu
}
return readerCandidateClient;
}
await this.pluginService.tryClosingTargetClient(readerCandidateClient);
await this.pluginService.abortTargetClient(readerCandidateClient);
await sleep(retryDelayMs);
continue;
}
Expand Down Expand Up @@ -240,7 +240,7 @@ export class AuroraInitialConnectionStrategyPlugin extends AbstractConnectionPlu
}
return readerCandidateClient;
}
await this.pluginService.tryClosingTargetClient(readerCandidateClient);
await this.pluginService.abortTargetClient(readerCandidateClient);
await sleep(retryDelayMs);
continue;
}
Expand All @@ -250,7 +250,7 @@ export class AuroraInitialConnectionStrategyPlugin extends AbstractConnectionPlu
}
return readerCandidateClient;
} catch (error: any) {
await this.pluginService.tryClosingTargetClient(readerCandidateClient);
await this.pluginService.abortTargetClient(readerCandidateClient);
if (this.pluginService.isLoginError(error) || !readerCandidate) {
throw error;
} else if (readerCandidate) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ export class OpenedConnectionTracker {
if (!client) {
continue;
}
await this.pluginService.tryClosingTargetClient(client);
await this.pluginService.abortTargetClient(client);
}
}

Expand Down
4 changes: 2 additions & 2 deletions common/lib/plugins/default_plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ export class DefaultPlugin extends AbstractConnectionPlugin {
private async connectInternal(hostInfo: HostInfo, props: Map<string, any>, connProvider: ConnectionProvider): Promise<ClientWrapper> {
const telemetryFactory = this.pluginService.getTelemetryFactory();
const telemetryContext = telemetryFactory.openTelemetryContext(
`${this.pluginService.getDialect().getDialectName()} - connect`,
`${this.pluginService.getDriverDialect().getDialectName()} - connect`,
TelemetryTraceLevel.NESTED
);

Expand All @@ -91,7 +91,7 @@ export class DefaultPlugin extends AbstractConnectionPlugin {

const telemetryFactory = this.pluginService.getTelemetryFactory();
const telemetryContext = telemetryFactory.openTelemetryContext(
`${this.pluginService.getDialect().getDialectName()} - ${methodName}`,
`${this.pluginService.getDriverDialect().getDialectName()} - ${methodName}`,
TelemetryTraceLevel.NESTED
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ export class HostMonitoringConnectionPlugin extends AbstractConnectionPlugin imp

if (!targetClient || !isClientValid) {
if (targetClient) {
await this.pluginService.tryClosingTargetClient(targetClient);
await this.pluginService.abortTargetClient(targetClient);
}
// eslint-disable-next-line no-unsafe-finally
throw new AwsWrapperError(Messages.get("HostMonitoringConnectionPlugin.unavailableHost", monitoringHostInfo.host));
Expand Down
2 changes: 1 addition & 1 deletion common/lib/plugins/efm/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ export class MonitorImpl implements Monitor {

async endMonitoringClient() {
if (this.monitoringClient) {
await this.pluginService.tryClosingTargetClient(this.monitoringClient);
await this.pluginService.abortTargetClient(this.monitoringClient);
this.monitoringClient = null;
}
}
Expand Down
2 changes: 1 addition & 1 deletion common/lib/plugins/efm/monitor_connection_context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ export class MonitorConnectionContext {
}

try {
await this.pluginService.tryClosingTargetClient(this.clientToAbort);
await this.pluginService.abortTargetClient(this.clientToAbort);
this.telemetryAbortedConnectionCounter.inc();
} catch (error: any) {
// ignore
Expand Down
6 changes: 3 additions & 3 deletions common/lib/plugins/failover/reader_failover_handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ export class ClusterAwareReaderFailoverHandler implements ReaderFailoverHandler
// Need to continue this loop and to make another try to connect to a reader.

try {
await this.pluginService.tryClosingTargetClient(result.client);
await this.pluginService.abortTargetClient(result.client);
} catch (error) {
// ignore
}
Expand Down Expand Up @@ -322,7 +322,7 @@ class ConnectionAttemptTask {
this.taskHandler.setSelectedConnectionAttemptTask(this.failoverTaskId, this.taskId);
return new ReaderFailoverResult(this.targetClient, this.newHost, true, undefined, this.taskId);
}
await this.pluginService.tryClosingTargetClient(this.targetClient);
await this.pluginService.abortTargetClient(this.targetClient);
return new ReaderFailoverResult(null, null, false, undefined, this.taskId);
} catch (error) {
this.pluginService.setAvailability(this.newHost.allAliases, HostAvailability.NOT_AVAILABLE);
Expand All @@ -342,7 +342,7 @@ class ConnectionAttemptTask {

async performFinalCleanup() {
if (this.taskHandler.getSelectedConnectionAttemptTask(this.failoverTaskId) !== this.taskId) {
await this.pluginService.tryClosingTargetClient(this.targetClient);
await this.pluginService.abortTargetClient(this.targetClient);
}
}
}
Loading

0 comments on commit a6f20c3

Please sign in to comment.