Skip to content

Commit

Permalink
feat: connection provider and connection provider manager (#41)
Browse files Browse the repository at this point in the history
  • Loading branch information
crystall-bitquill authored May 8, 2024
1 parent bda20dc commit 77ea4d0
Show file tree
Hide file tree
Showing 18 changed files with 521 additions and 48 deletions.
9 changes: 7 additions & 2 deletions common/lib/aws_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import { ConnectionUrlParser } from "./utils/connection_url_parser";
import { HostListProvider } from "./host_list_provider/host_list_provider";
import { PluginManager } from "./plugin_manager";
import { EventEmitter } from "stream";
import { DriverConnectionProvider } from "./driver_connection_provider";

export abstract class AwsClient extends EventEmitter {
protected pluginManager: PluginManager;
Expand All @@ -48,9 +49,13 @@ export abstract class AwsClient extends EventEmitter {
this._dialect = dialect;
this._properties = new Map<string, any>(Object.entries(config));

const defaultConnProvider = new DriverConnectionProvider();
const effectiveConnProvider = null;
// TODO: check for configuration profile to update the effectiveConnProvider

const container = new PluginServiceManagerContainer();
this.pluginService = new PluginService(container, this);
this.pluginManager = new PluginManager(container, this.properties);
this.pluginService = new PluginService(container, this, this._properties);
this.pluginManager = new PluginManager(container, this._properties, defaultConnProvider, effectiveConnProvider);

// TODO: properly set up host info
const host: string = this._properties.get("host");
Expand Down
12 changes: 9 additions & 3 deletions common/lib/connection_plugin_chain_builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import { DefaultPlugin } from "./plugins/default_plugin";
import { ExecuteTimePluginFactory } from "./plugins/execute_time_plugin";
import { ConnectTimePluginFactory } from "./plugins/connect_time_plugin";
import { AwsSecretsManagerPluginFactory } from "./authentication/aws_secrets_manager_plugin";
import { ConnectionProvider } from "./connection_provider";

export class PluginFactoryInfo {}

Expand All @@ -42,10 +43,15 @@ export class ConnectionPluginChainBuilder {
["failover", FailoverPluginFactory]
]);

getPlugins(pluginService: PluginService, props: Map<string, any>): ConnectionPlugin[] {
getPlugins(
pluginService: PluginService,
props: Map<string, any>,
defaultConnProvider: ConnectionProvider,
effectiveConnProvider: ConnectionProvider | null
): ConnectionPlugin[] {
const plugins: ConnectionPlugin[] = [];
let pluginCodes: string = props.get(WrapperProperties.PLUGINS.name);
if (pluginCodes === null || pluginCodes === undefined) {
if (pluginCodes == null) {
pluginCodes = ConnectionPluginChainBuilder.DEFAULT_PLUGINS;
}

Expand All @@ -70,7 +76,7 @@ export class ConnectionPluginChainBuilder {
});
}

plugins.push(new DefaultPlugin(pluginService));
plugins.push(new DefaultPlugin(pluginService, defaultConnProvider, effectiveConnProvider));

return plugins;
}
Expand Down
10 changes: 8 additions & 2 deletions common/lib/connection_provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,13 @@
limitations under the License.
*/

import { HostRole } from "./host_role";
import { HostInfo } from "./host_info";
import { PluginService } from "./plugin_service";

export interface ConnectionProvider {
connect<T>(connectFunc: () => Promise<T>): Promise<T>;
execute<T>(methodName: string, methodFunc: () => Promise<T>): Promise<T>;
connect<T>(hostInfo: HostInfo, pluginService: PluginService, props: Map<string, any>, connectFunc: () => Promise<T>): Promise<T>;
acceptsUrl(hostInfo: HostInfo, props: Map<string, any>): boolean;
acceptsStrategy(role: HostRole, strategy: string): boolean;
getHostInfoByStrategy(hosts: HostInfo[], role: HostRole, strategy: string, props?: Map<string, any>): HostInfo;
}
55 changes: 55 additions & 0 deletions common/lib/connection_provider_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,58 @@
See the License for the specific language governing permissions and
limitations under the License.
*/

import { ConnectionProvider } from "./connection_provider";
import { HostRole } from "./host_role";
import { HostInfo } from "./host_info";
import { AwsWrapperError } from "./utils/errors";

export class ConnectionProviderManager {
private static connProvider: ConnectionProvider | null = null;
private readonly defaultProvider: ConnectionProvider;

constructor(defaultProvider: ConnectionProvider) {
this.defaultProvider = defaultProvider;
}

static setConnectionProvider(connProvider: ConnectionProvider) {
ConnectionProviderManager.connProvider = connProvider;
}

getConnectionProvider(hostInfo: HostInfo, props: Map<string, any>) {
if (ConnectionProviderManager.connProvider?.acceptsUrl(hostInfo, props)) {
return ConnectionProviderManager.connProvider;
}

return this.defaultProvider;
}

acceptsStrategy(role: HostRole, strategy: string) {
return ConnectionProviderManager.connProvider?.acceptsStrategy(role, strategy) || this.defaultProvider.acceptsStrategy(role, strategy);
}

getHostInfoByStrategy(hosts: HostInfo[], role: HostRole, strategy: string, props: Map<string, any>) {
let host;
if (ConnectionProviderManager.connProvider?.acceptsStrategy(role, strategy)) {
try {
host = ConnectionProviderManager.connProvider.getHostInfoByStrategy(hosts, role, strategy, props);
} catch (error) {
if (error instanceof AwsWrapperError && error.message.includes("Unsupported host selection strategy")) {
// Ignore and try with the default provider.
} else {
throw error;
}
}
}

if (!host) {
host = this.defaultProvider.getHostInfoByStrategy(hosts, role, strategy, props);
}

return host;
}

static resetProvider() {
this.connProvider = null;
}
}
113 changes: 113 additions & 0 deletions common/lib/driver_connection_provider.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License").
You may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

import { ConnectionProvider } from "./connection_provider";
import { HostRole } from "./host_role";
import { HostInfo } from "./host_info";
import { HostSelector } from "./host_selector";
import { RandomHostSelector } from "./random_host_selector";
import { AwsWrapperError } from "./utils/errors";
import { WrapperProperties } from "./wrapper_property";
import { Messages } from "./utils/messages";
import { RdsUtils } from "./utils/rds_utils";
import { HostInfoBuilder } from "./host_info_builder";
import { promisify } from "util";
import { lookup } from "dns";
import { PluginService } from "./plugin_service";
import { logger } from "../logutils";
import { maskProperties } from "./utils/utils";

export class DriverConnectionProvider implements ConnectionProvider {
private static readonly acceptedStrategies: Map<string, HostSelector> = new Map([[RandomHostSelector.STRATEGY_NAME, new RandomHostSelector()]]);
private readonly rdsUtils: RdsUtils = new RdsUtils();

acceptsStrategy(role: HostRole, strategy: string): boolean {
return DriverConnectionProvider.acceptedStrategies.has(strategy);
}

acceptsUrl(hostInfo: HostInfo, props: Map<string, any>): boolean {
return true;
}

async connect<T>(hostInfo: HostInfo, pluginService: PluginService, props: Map<string, any>, connectFunc: () => Promise<T>): Promise<T> {
let result;
try {
result = await connectFunc();
} catch (e) {
await pluginService.tryClosingTargetClient();

if (!WrapperProperties.ENABLE_GREEN_NODE_REPLACEMENT.get(props)) {
throw e;
}

if (!JSON.stringify(e).includes("Error: getaddrinfo ENOTFOUND")) {
throw e;
}

if (!this.rdsUtils.isRdsDns(hostInfo.host) || !this.rdsUtils.isGreenInstance(hostInfo.host)) {
throw e;
}

// check DNS for green host name
let resolvedAddress;
try {
resolvedAddress = await promisify(lookup)(hostInfo.host, {});
} catch (tmp) {
// do nothing
}

if (resolvedAddress) {
// Green node DNS exists
throw e;
}

// Green node DNS doesn't exist. Try to replace it with corresponding node name and connect again.
const originalHost: string = hostInfo.host;
const fixedHost: string = this.rdsUtils.removeGreenInstancePrefix(hostInfo.host);
props.set(WrapperProperties.HOST.name, fixedHost);
const connectionHostInfo = new HostInfoBuilder({
hostAvailabilityStrategy: hostInfo.hostAvailabilityStrategy
})
.copyFrom(hostInfo)
.withHost(fixedHost)
.build();

logger.info(
"Connecting to " +
fixedHost +
" after correcting the hostname from " +
originalHost +
"\nwith properties: \n" +
JSON.stringify(maskProperties(props))
);

const newTargetClient = pluginService.createTargetClient(props);
const fixedConnFunc = pluginService.getConnectFunc(newTargetClient);
result = await fixedConnFunc();
pluginService.setCurrentClient(newTargetClient, connectionHostInfo);
}

return result;
}

getHostInfoByStrategy(hosts: HostInfo[], role: HostRole, strategy: string, props?: Map<string, any>): HostInfo {
const acceptedStrategy = DriverConnectionProvider.acceptedStrategies.get(strategy);
if (!acceptedStrategy) {
throw new AwsWrapperError(Messages.get("ConnectionProvider.unsupportedHostSpecSelectorStrategy", strategy, "DriverConnectionProvider")); // TODO
}
return acceptedStrategy.getHost(hosts, role, props);
}
}
12 changes: 12 additions & 0 deletions common/lib/host_info_builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,18 @@ export class HostInfoBuilder {
return this;
}

copyFrom(hostInfo: HostInfo): HostInfoBuilder {
this.host = hostInfo.host;
this.hostId = hostInfo.hostId ?? "";
this.port = hostInfo.port;
this.availability = hostInfo.availability;
this.role = hostInfo.role;
this.weight = hostInfo.weight;
this.lastUpdateTime = hostInfo.lastUpdateTime;
this.hostAvailabilityStrategy = hostInfo.hostAvailabilityStrategy;
return this;
}

build() {
if (!this.host) {
throw new AwsWrapperError("host parameter must be set");
Expand Down
6 changes: 3 additions & 3 deletions common/lib/host_role.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/

export enum HostRole {
UNKNOWN,
WRITER,
READER
UNKNOWN = "unknown",
WRITER = "writer",
READER = "reader"
}
22 changes: 22 additions & 0 deletions common/lib/host_selector.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License").
You may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

import { HostInfo } from "./host_info";
import { HostRole } from "./host_role";

export interface HostSelector {
getHost(hosts: HostInfo[], role: HostRole, props?: Map<string, any>): HostInfo;
}
15 changes: 13 additions & 2 deletions common/lib/plugin_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import { HostListProviderService } from "./host_list_provider_service";
import { AwsClient } from "./aws_client";
import { PluginService } from "./plugin_service";
import { HostRole } from "./host_role";
import { ConnectionProvider } from "./connection_provider";

type PluginFunc<T> = (plugin: ConnectionPlugin, targetFunc: () => Promise<T>) => Promise<T>;

Expand Down Expand Up @@ -69,12 +70,22 @@ export class PluginManager {
private pluginServiceManagerContainer: PluginServiceManagerContainer;
private props: Map<string, any>;

constructor(pluginServiceManagerContainer: PluginServiceManagerContainer, props: Map<string, any>) {
constructor(
pluginServiceManagerContainer: PluginServiceManagerContainer,
props: Map<string, any>,
defaultConnProvider: ConnectionProvider,
effectiveConnProvider: ConnectionProvider | null
) {
this.pluginServiceManagerContainer = pluginServiceManagerContainer;
this.pluginServiceManagerContainer.pluginManager = this;
this.props = props;
if (this.pluginServiceManagerContainer.pluginService != null) {
this._plugins = new ConnectionPluginChainBuilder().getPlugins(this.pluginServiceManagerContainer.pluginService, props);
this._plugins = new ConnectionPluginChainBuilder().getPlugins(
this.pluginServiceManagerContainer.pluginService,
props,
defaultConnProvider,
effectiveConnProvider
);
}

// TODO: proper parsing logic
Expand Down
8 changes: 7 additions & 1 deletion common/lib/plugin_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,21 @@ export class PluginService implements ErrorHandler, HostListProviderService {
private _initialConnectionHostInfo?: HostInfo;
private _isInTransaction: boolean = false;
private pluginServiceManagerContainer: PluginServiceManagerContainer;
private _props: Map<string, any>;
protected hosts: HostInfo[] = [];
protected static readonly hostAvailabilityExpiringCache: CacheMap<string, HostAvailability> = new CacheMap<string, HostAvailability>();

constructor(container: PluginServiceManagerContainer, client: AwsClient) {
constructor(container: PluginServiceManagerContainer, client: AwsClient, props: Map<string, any>) {
this._currentClient = client;
this.pluginServiceManagerContainer = container;
this._props = props;
container.pluginService = this;
}

get props(): Map<string, any> {
return this._props;
}

isInTransaction(): boolean {
return this._isInTransaction;
}
Expand Down
Loading

0 comments on commit 77ea4d0

Please sign in to comment.