From e630ab24b82201e3e02c748eec11c3fb2bba4388 Mon Sep 17 00:00:00 2001 From: joyc-bq Date: Tue, 23 Apr 2024 10:46:36 -0700 Subject: [PATCH] feat: read-write splitting plugin --- common/lib/connection_plugin_chain_builder.ts | 2 + common/lib/driver_connection_provider.ts | 2 +- common/lib/plugin_service.ts | 9 + common/lib/plugins/read_write_splitting.ts | 395 ++++++++++++++++ common/lib/utils/locales/en.json | 34 +- common/lib/wrapper_property.ts | 6 + pg/lib/client.ts | 10 +- .../tests/read_write_splitting.test.ts | 422 ++++++++++++++++++ tests/unit/read_write_splitting.test.ts | 333 ++++++++++++++ 9 files changed, 1206 insertions(+), 7 deletions(-) create mode 100644 common/lib/plugins/read_write_splitting.ts create mode 100644 tests/integration/container/tests/read_write_splitting.test.ts create mode 100644 tests/unit/read_write_splitting.test.ts diff --git a/common/lib/connection_plugin_chain_builder.ts b/common/lib/connection_plugin_chain_builder.ts index f1dbf4c28..d75a626f6 100644 --- a/common/lib/connection_plugin_chain_builder.ts +++ b/common/lib/connection_plugin_chain_builder.ts @@ -26,6 +26,7 @@ import { ExecuteTimePluginFactory } from "./plugins/execute_time_plugin"; import { ConnectTimePluginFactory } from "./plugins/connect_time_plugin"; import { AwsSecretsManagerPluginFactory } from "./authentication/aws_secrets_manager_plugin"; import { ConnectionProvider } from "./connection_provider"; +import { ReadWriteSplittingPluginFactory } from "./plugins/read_write_splitting"; import { StaleDnsPluginFactory } from "./plugins/stale_dns/stale_dns_plugin"; import { FederatedAuthPluginFactory } from "./plugins/federated_auth/federated_auth_plugin"; @@ -43,6 +44,7 @@ export class ConnectionPluginChainBuilder { ["connectTime", ConnectTimePluginFactory], ["secretsManager", AwsSecretsManagerPluginFactory], ["failover", FailoverPluginFactory], + ["readWriteSplitting", ReadWriteSplittingPluginFactory], ["staleDns", StaleDnsPluginFactory], ["federatedAuth", FederatedAuthPluginFactory] ]); diff --git a/common/lib/driver_connection_provider.ts b/common/lib/driver_connection_provider.ts index 47f5112f7..4f3e38783 100644 --- a/common/lib/driver_connection_provider.ts +++ b/common/lib/driver_connection_provider.ts @@ -113,7 +113,7 @@ export class DriverConnectionProvider implements ConnectionProvider { getHostInfoByStrategy(hosts: HostInfo[], role: HostRole, strategy: string, props?: Map): HostInfo { const acceptedStrategy = DriverConnectionProvider.acceptedStrategies.get(strategy); if (!acceptedStrategy) { - throw new AwsWrapperError(Messages.get("ConnectionProvider.unsupportedHostInfoSelectorStrategy", strategy, "DriverConnectionProvider")); + throw new AwsWrapperError(Messages.get("ConnectionProvider.unsupportedHostSelectorStrategy", strategy, "DriverConnectionProvider")); } return acceptedStrategy.getHost(hosts, role, props); } diff --git a/common/lib/plugin_service.ts b/common/lib/plugin_service.ts index 4fd1ff3cd..b711a3811 100644 --- a/common/lib/plugin_service.ts +++ b/common/lib/plugin_service.ts @@ -102,6 +102,11 @@ export class PluginService implements ErrorHandler, HostListProviderService { this._initialConnectionHostInfo = initialConnectionHostInfo; } + getHostInfoByStrategy(role: HostRole, strategy: string): HostInfo | undefined { + const pluginManager = this.pluginServiceManagerContainer.pluginManager; + return pluginManager?.getHostInfoByStrategy(role, strategy); + } + getCurrentHostInfo(): HostInfo | null { return this._currentHostInfo ? this._currentHostInfo : null; } @@ -131,6 +136,10 @@ export class PluginService implements ErrorHandler, HostListProviderService { return false; } + acceptsStrategy(role: HostRole, strategy: string): boolean { + return this.pluginServiceManagerContainer.pluginManager?.acceptsStrategy(role, strategy) ?? false; + } + async forceRefreshHostList(): Promise; async forceRefreshHostList(targetClient?: any): Promise; async forceRefreshHostList(targetClient?: any): Promise { diff --git a/common/lib/plugins/read_write_splitting.ts b/common/lib/plugins/read_write_splitting.ts new file mode 100644 index 000000000..e2103b152 --- /dev/null +++ b/common/lib/plugins/read_write_splitting.ts @@ -0,0 +1,395 @@ +/* + Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + + Licensed under the Apache License, Version 2.0 (the "License"). + You may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +import { AbstractConnectionPlugin } from "../abstract_connection_plugin"; +import { HostInfo } from "../host_info"; +import { uniqueId } from "lodash"; +import { PluginService } from "../plugin_service"; +import { HostListProviderService } from "../host_list_provider_service"; +import { OldConnectionSuggestionAction } from "../old_connection_suggestion_action"; +import { HostChangeOptions } from "../host_change_options"; +import { WrapperProperties } from "../wrapper_property"; +import { Messages } from "../utils/messages"; +import { logger } from "../../logutils"; +import { AwsWrapperError, FailoverError } from "../utils/errors"; +import { HostRole } from "../host_role"; +import { ConnectionPluginFactory } from "../plugin_factory"; +import { ConnectionPlugin } from "../connection_plugin"; +import { SqlMethodUtils } from "../utils/sql_method_utils"; + +export class ReadWriteSplittingPlugin extends AbstractConnectionPlugin { + private static readonly subscribedMethods: Set = new Set(["initHostProvider", "connect", "notifyConnectionChanged", "query"]); + private readonly readerSelectorStrategy: string = ""; + private readonly id: string = uniqueId("_readWriteSplittingPlugin"); + + private _hostListProviderService: HostListProviderService | undefined; + private pluginService: PluginService; + private readonly _properties: Map; + private _readerHostInfo?: HostInfo = undefined; + private _inReadWriteSplit = false; + writerTargetClient: any | undefined; + readerTargetClient: any | undefined; + + constructor(pluginService: PluginService, properties: Map); + constructor( + pluginService: PluginService, + properties: Map, + hostListProviderService: HostListProviderService, + writerClient: any, + readerClient: any + ); + constructor( + pluginService: PluginService, + properties: Map, + hostListProviderService?: HostListProviderService, + writerClient?: any, + readerClient?: any + ) { + super(); + logger.debug(`TestPlugin constructor id: ${this.id}`); + this.pluginService = pluginService; + this._properties = properties; + this.readerSelectorStrategy = WrapperProperties.READER_HOST_SELECTOR_STRATEGY.get(properties); + this._hostListProviderService = hostListProviderService; + this.writerTargetClient = writerClient; + this.readerTargetClient = readerClient; + } + + override getSubscribedMethods(): Set { + return ReadWriteSplittingPlugin.subscribedMethods; + } + + override initHostProvider( + hostInfo: HostInfo, + props: Map, + hostListProviderService: HostListProviderService, + initHostProviderFunc: () => void + ) { + this._hostListProviderService = hostListProviderService; + initHostProviderFunc(); + } + + override notifyConnectionChanged(changes: Set): OldConnectionSuggestionAction { + try { + this.updateInternalClientInfo(); + } catch (e) { + // pass + } + if (this._inReadWriteSplit) { + return OldConnectionSuggestionAction.PRESERVE; + } + return OldConnectionSuggestionAction.NO_OPINION; + } + + updateInternalClientInfo(): void { + const currentTargetClient = this.pluginService.getCurrentClient().targetClient; + const currentHost = this.pluginService.getCurrentHostInfo(); + if (currentHost === null || currentTargetClient === null) { + return; + } + + if (currentHost.role === HostRole.WRITER) { + this.setWriterClient(currentTargetClient, currentHost); + } else { + this.setReaderClient(currentTargetClient, currentHost); + } + } + + override async connect(hostInfo: HostInfo, props: Map, isInitialConnection: boolean, connectFunc: () => Promise): Promise { + if (!this.pluginService.acceptsStrategy(hostInfo.role, this.readerSelectorStrategy)) { + const message: string = Messages.get("ReadWriteSplittingPlugin.unsupportedHostSelectorStrategy", this.readerSelectorStrategy); + this.logAndThrowError(message); + } + return await this.connectInternal(hostInfo, props, isInitialConnection, connectFunc); + } + + forceConnect(hostInfo: HostInfo, props: Map, isInitialConnection: boolean, forceConnectFunc: () => Promise): Promise { + return this.connectInternal(hostInfo, props, isInitialConnection, forceConnectFunc); + } + + private async connectInternal( + hostInfo: HostInfo, + props: Map, + isInitialConnection: boolean, + connectFunc: () => Promise + ): Promise { + const result = await connectFunc(); + if (!isInitialConnection || this._hostListProviderService?.isStaticHostListProvider()) { + return result; + } + const currentRole = this.pluginService.getCurrentHostInfo()?.role; + + if (currentRole == HostRole.UNKNOWN) { + this.logAndThrowError(Messages.get("ReadWriteSplittingPlugin.errorVerifyingInitialHostRole")); + } + const currentHost: HostInfo | null = this.pluginService.getInitialConnectionHostInfo(); + if (currentHost !== null) { + if (currentRole === currentHost.role) { + return result; + } + const updatedHost: HostInfo = Object.assign({}, currentHost); + updatedHost.role = currentRole ?? HostRole.UNKNOWN; + this._hostListProviderService?.setInitialConnectionHostInfo(updatedHost); + } + return result; + } + + override async execute(methodName: string, executeFunc: () => Promise, methodArgs: any): Promise { + const statement = methodArgs.sql ?? methodArgs; + const statements = SqlMethodUtils.parseMultiStatementQueries(statement); + + const updateReadOnly: boolean | undefined = SqlMethodUtils.doesSetReadOnly(statements, this.pluginService.getDialect()); + + if (updateReadOnly !== undefined) + try { + await this.switchClientIfRequired(updateReadOnly); + } catch (error) { + await this.closeIdleClients(); + throw error; + } + + try { + return await executeFunc(); + } catch (error) { + if (error instanceof FailoverError) { + logger.debug(Messages.get("ReadWriteSplittingPlugin.failoverExceptionWhileExecutingCommand", methodName)); + await this.closeIdleClients(); + } else { + logger.debug(Messages.get("ReadWriteSplittingPlugin.exceptionWhileExecutingCommand", methodName)); + } + + throw error; + } + } + + setWriterClient(writerTargetClient: any, writerHostInfo: HostInfo): void { + this.writerTargetClient = writerTargetClient; + logger.debug(Messages.get("ReadWriteSplittingPlugin.setWriterClient", writerHostInfo.url)); + } + + setReaderClient(readerTargetClient: any, readerHost: HostInfo): void { + this.readerTargetClient = readerTargetClient; + this._readerHostInfo = readerHost; + logger.debug(Messages.get("ReadWriteSplittingPlugin.setReaderConnection", readerHost.url)); + } + + async getNewWriterClient(writerHost: HostInfo) { + const props = new Map(this._properties); + props.set(WrapperProperties.HOST.name, writerHost.host); + try { + const targetClient = await this.pluginService.connect(writerHost, props); + this.setWriterClient(targetClient, writerHost); + await this.switchCurrentTargetClientTo(this.writerTargetClient, writerHost); + } catch (any) { + logger.warn(Messages.get("ReadWriteSplittingPlugin.failedToConnectToWriter", writerHost.url)); + } + } + + async switchClientIfRequired(readOnly: boolean) { + const currentClient = this.pluginService.getCurrentClient(); + if (!(await currentClient.isValid())) { + this.logAndThrowError(Messages.get("ReadWriteSplittingPlugin.setReadOnlyOnClosedConnection")); + } + try { + await this.pluginService.refreshHostList(); + } catch { + // pass + } + + const hosts: HostInfo[] = this.pluginService.getHosts(); + if (hosts == null || hosts.length === 0) { + this.logAndThrowError(Messages.get("ReadWriteSplittingPlugin.emptyHostList")); + } + + const currentHost = this.pluginService.getCurrentHostInfo(); + if (currentHost == null) { + this.logAndThrowError(Messages.get("ReadWriteSplittingPlugin.unavailableHostInfo")); + } else if (readOnly) { + if (!this.pluginService.isInTransaction() && currentHost.role != HostRole.READER) { + try { + await this.switchToReaderTargetClient(hosts); + } catch (error) { + if (!(await currentClient.isValid())) { + this.logAndThrowError(Messages.get("ReadWriteSplittingPlugin.errorSwitchingToReader")); + } + logger.warn("ReadWriteSplittingPlugin.fallbackToWriter", currentHost.url); + } + } + } else if (currentHost.role != HostRole.WRITER) { + if (this.pluginService.isInTransaction()) { + this.logAndThrowError(Messages.get("ReadWriteSplittingPlugin.setReadOnlyFalseInTransaction")); + } + try { + await this.switchToWriterTargetClient(hosts); + } catch { + this.logAndThrowError(Messages.get("ReadWriteSplittingPlugin.errorSwitchingToWriter")); + } + } + } + + async switchCurrentTargetClientTo(newTargetClient: any, newClientHost: HostInfo | undefined) { + const currentTargetClient = this.pluginService.getCurrentClient().targetClient; + + if (currentTargetClient === newTargetClient) { + return; + } + if (newClientHost) { + try { + await this.pluginService.setCurrentClient(newTargetClient, newClientHost); + logger.debug("ReadWriteSplittingPlugin.settingCurrentClient", newClientHost.url); + } catch (error) { + // pass + } + } + } + + async initializeReaderClient(hosts: HostInfo[]) { + if (hosts.length === 1) { + const writerHost = this.getWriter(hosts); + if (writerHost !== undefined) { + if (!(await this.isTargetClientUsable(this.writerTargetClient))) { + await this.getNewWriterClient(writerHost); + } + logger.warn(Messages.get("ReadWriteSplittingPlugin.noReadersFound", writerHost.url)); + } + } else { + await this.getNewReaderClient(); + } + } + + async getNewReaderClient() { + let targetClient = undefined; + let readerHost: HostInfo | undefined = undefined; + const connectAttempts = this.pluginService.getHosts().length; + + for (let i = 0; i < connectAttempts; i++) { + const host = this.pluginService.getHostInfoByStrategy(HostRole.READER, this.readerSelectorStrategy); + if (host) { + const props = new Map(this._properties); + props.set(WrapperProperties.HOST.name, host.host); + + try { + targetClient = await this.pluginService.connect(host, props); + readerHost = host; + break; + } catch (any) { + logger.warn(Messages.get("ReadWriteSplittingPlugin.failedToConnectToReader", host.url)); + } + } + } + if (targetClient == undefined || readerHost === undefined) { + logger.debug(Messages.get("ReadWriteSplittingPlugin.noReadersAvailable")); + return; + } else { + logger.debug(Messages.get("ReadWriteSplittingPlugin.successfullyConnectedToReader", readerHost.url)); + this.setReaderClient(targetClient, readerHost); + await this.switchCurrentTargetClientTo(this.readerTargetClient, this._readerHostInfo); + } + } + + async switchToWriterTargetClient(hosts: HostInfo[]) { + const currentHost = this.pluginService.getCurrentHostInfo(); + const currentClient = this.pluginService.getCurrentClient(); + if (currentHost !== null && currentHost?.role === HostRole.WRITER && (await currentClient.isValid())) { + return; + } + this._inReadWriteSplit = true; + const writerHost = this.getWriter(hosts); + if (!writerHost) { + return; + } + if (!(await this.isTargetClientUsable(this.writerTargetClient))) { + await this.getNewWriterClient(writerHost); + } else if (this.writerTargetClient) { + await this.switchCurrentTargetClientTo(this.writerTargetClient, writerHost); + } + logger.debug(Messages.get("ReadWriteSplittingPlugin.switchedFromReaderToWriter", writerHost.url)); + } + + async switchToReaderTargetClient(hosts: HostInfo[]) { + const currentHost = this.pluginService.getCurrentHostInfo(); + const currentClient = this.pluginService.getCurrentClient(); + if (currentHost !== null && currentHost?.role === HostRole.READER && currentClient) { + return; + } + + this._inReadWriteSplit = true; + if (!(await this.isTargetClientUsable(this.readerTargetClient))) { + await this.initializeReaderClient(hosts); + } else if (this.readerTargetClient != null && this._readerHostInfo != null) { + try { + await this.switchCurrentTargetClientTo(this.readerTargetClient, this._readerHostInfo); + logger.debug(Messages.get("ReadWriteSplittingPlugin.switchedFromWriterToReader", this._readerHostInfo.url)); + } catch (error: any) { + logger.debug(Messages.get("ReadWriteSplittingPlugin.errorSwitchingToCachedReader", this._readerHostInfo.url)); + await this.pluginService.tryClosingTargetClient(this.readerTargetClient); + this.readerTargetClient = undefined; + this._readerHostInfo = undefined; + await this.initializeReaderClient(hosts); + } + } + } + + async isTargetClientUsable(targetClient: any | undefined): Promise { + if (!targetClient) { + return Promise.resolve(false); + } + return await this.pluginService.isClientValid(targetClient); + } + + async closeTargetClientIfIdle(internalTargetClient: any | undefined) { + const currentTargetClient = this.pluginService.getCurrentClient().targetClient; + try { + if (internalTargetClient != null && internalTargetClient != currentTargetClient && (await this.isTargetClientUsable(internalTargetClient))) { + await this.pluginService.tryClosingTargetClient(internalTargetClient); + } + } catch (error) { + // ignore + } finally { + if (internalTargetClient === this.writerTargetClient) { + this.writerTargetClient = undefined; + } + if (internalTargetClient === this.readerTargetClient) { + this.readerTargetClient = undefined; + this._readerHostInfo = undefined; + } + } + } + + async closeIdleClients() { + logger.debug(Messages.get("ReadWriteSplittingPlugin.closingInternalClients")); + await this.closeTargetClientIfIdle(this.readerTargetClient); + await this.closeTargetClientIfIdle(this.writerTargetClient); + } + + getWriter(hosts: HostInfo[]): HostInfo | undefined { + for (const host of hosts) { + if (host.role === HostRole.WRITER) return host; + } + this.logAndThrowError(Messages.get("ReadWriteSplittingPlugin.noWriterFound")); + } + + private logAndThrowError(message: string) { + logger.error(message); + throw new AwsWrapperError(message); + } +} + +export class ReadWriteSplittingPluginFactory implements ConnectionPluginFactory { + getInstance(pluginService: PluginService, properties: Map): ConnectionPlugin { + return new ReadWriteSplittingPlugin(pluginService, properties); + } +} diff --git a/common/lib/utils/locales/en.json b/common/lib/utils/locales/en.json index 1478368e4..7b4e19168 100644 --- a/common/lib/utils/locales/en.json +++ b/common/lib/utils/locales/en.json @@ -1,6 +1,7 @@ { "ConnectionPluginManager.unknownPluginCode": "Unknown plugin code: '%s'", "ConnectionPluginManager.failedToConnectWithNewTargetClient": "Failed to connect with a new target for host: '%s'", + "ConnectionProvider.unsupportedHostSelectorStrategy": "Unsupported host selection strategy '%s' specified for this connection provider '%s'. Please visit the documentation for all supported strategies.", "ConnectionProvider.unsupportedHostInfoSelectorStrategy": "Unsupported host selection strategy '%s' specified for this connection provider '%s'. Please visit the documentation for all supported strategies.", "DialectManager.unknownDialectCode": "Unknown dialect code: '%s'.", "DialectManager.getDialectError": "Was not able to get a database dialect.", @@ -58,8 +59,8 @@ "Failover.startWriterFailover": "Starting writer failover procedure.", "Failover.startReaderFailover": "Starting reader failover procedure.", "Failover.invalidNode": "Node is no longer available in the topology: %s", - "Failover.noOperationsAfterConnectionClosed": "No operations allowed after cliend ended.", - "Failover.transactionResolutionUnknownError": "Failover.transactionResolutionUnknownError", + "Failover.noOperationsAfterConnectionClosed": "No operations allowed after client ended.", + "Failover.transactionResolutionUnknownError": "Unknown transaction resolution error occurred during failover", "StaleDnsHelper.clusterEndpointDns": "Cluster endpoint resolves to '%s'.", "StaleDnsHelper.writerHostInfo": "Writer host: '%s'.", "StaleDnsHelper.writerInetAddress": "Writer host address: '%s'", @@ -68,6 +69,35 @@ "StaleDnsPlugin.requireDynamicProvider": "Dynamic host list provider is required.", "Client.methodNotSupported": "Method not supported.", "Client.invalidTransactionIsolationLevel": "An invalid transaction isolation level was provided: '%s'.", + "AuroraStaleDnsHelper.clusterEndpointDns": "Cluster endpoint resolves to '%s'.", + "AuroraStaleDnsHelper.writerHostSpec": "Writer host: '%s'.", + "AuroraStaleDnsHelper.writerInetAddress": "Writer host address: '%s'", + "AuroraStaleDnsHelper.staleDnsDetected": "Stale DNS data detected. Opening a connection to '%s'.", + "ReadWriteSplittingPlugin.setReadOnlyOnClosedClient": "setReadOnly cannot be called on a closed client.", + "ReadWriteSplittingPlugin.errorSwitchingToCachedReader": "An error occurred while trying to switch to a cached reader client: '%s'. The driver will attempt to establish a new reader client.", + "ReadWriteSplittingPlugin.errorSwitchingToReader": "An error occurred while trying to switch to a reader client.", + "ReadWriteSplittingPlugin.errorSwitchingToWriter": "An error occurred while trying to switch to a writer client.", + "ReadWriteSplittingPlugin.closingInternalClients": "Closing all internal clients except for the current one.", + "ReadWriteSplittingPlugin.setReaderClient": "Reader client set to '%s'", + "ReadWriteSplittingPlugin.setWriterClient": "Writer client set to '%s'", + "ReadWriteSplittingPlugin.setReadOnlyFalseInTransaction": "setReadOnly(false) was called on a read-only client inside a transaction. Please complete the transaction before calling setReadOnly(false).", + "ReadWriteSplittingPlugin.fallbackToWriter": "Failed to switch to a reader; the current writer will be used as a fallback: '%s'", + "ReadWriteSplittingPlugin.switchedFromWriterToReader": "Switched from a writer to a reader host. New reader host: '%s'", + "ReadWriteSplittingPlugin.switchedFromReaderToWriter": "Switched from a reader to a writer host. New writer host: '%s'", + "ReadWriteSplittingPlugin.settingCurrentClient": "Setting the current client to '%s'", + "ReadWriteSplittingPlugin.noWriterFound": "No writer was found in the current host list.", + "ReadWriteSplittingPlugin.noReadersFound": "A reader instance was requested via setReadOnly, but there are no readers in the host list. The current writer will be used as a fallback: '%s'", + "ReadWriteSplittingPlugin.emptyHostList": "Host list is empty.", + "ReadWriteSplittingPlugin.exceptionWhileExecutingCommand": "Detected an exception while executing a command: '%s'", + "ReadWriteSplittingPlugin.failoverExceptionWhileExecutingCommand": "Detected a failover exception while executing a command: '%s'", + "ReadWriteSplittingPlugin.noReadersAvailable": "The plugin was unable to establish a reader client to any reader instance.", + "ReadWriteSplittingPlugin.successfullyConnectedToReader": "Successfully connected to a new reader host: '%s'", + "ReadWriteSplittingPlugin.failedToConnectToReader": "Failed to connect to reader host: '%s'", + "ReadWriteSplittingPlugin.unsupportedHostSelectorStrategy": "Unsupported host selection strategy '%s' specified in plugin configuration parameter 'readerHostSelectorStrategy'. Please visit the Read/Write Splitting Plugin documentation for all supported strategies.", + "ReadWriteSplittingPlugin.errorVerifyingInitialHostRole": "An error occurred while obtaining the connected host's role. This could occur if the client is broken or if you are not connected to an Aurora database.", + "ReadWriteSplittingPlugin.setReaderConnection": "ReadWriteSplittingPlugin.setReaderConnection", + "ReadWriteSplittingPlugin.setReadOnlyOnClosedConnection": "ReadWriteSplittingPlugin.setReadOnlyOnClosedConnection", + "ReadWriteSplittingPlugin.unavailableHostInfo": "ReadWriteSplittingPlugin.unavailableHostInfo", "AdfsCredentialsProviderFactory.failedLogin": "Failed login. Could not obtain SAML Assertion from ADFS SignOn Page POST response: \n '%s'", "AdfsCredentialsProviderFactory.getSamlAssertionFailed": "Failed to get SAML Assertion due to exception: '%s'", "AdfsCredentialsProviderFactory.invalidHttpsUrl": "Invalid HTTPS URL: '%s'", diff --git a/common/lib/wrapper_property.ts b/common/lib/wrapper_property.ts index 2ddeff7a6..74031bca8 100644 --- a/common/lib/wrapper_property.ts +++ b/common/lib/wrapper_property.ts @@ -163,6 +163,12 @@ export class WrapperProperties { "false" ); + static readonly READER_HOST_SELECTOR_STRATEGY = new WrapperProperty( + "readerHostSelectorStrategy", + "The strategy that should be used to select a new reader host.", + "random" + ); + static removeWrapperProperties(config: T): T { const copy = Object.assign({}, config); const persistingProperties = [ diff --git a/pg/lib/client.ts b/pg/lib/client.ts index 058f64faa..53bf727ad 100644 --- a/pg/lib/client.ts +++ b/pg/lib/client.ts @@ -85,12 +85,14 @@ export class AwsPGClient extends AwsClient { this.pluginService.getSessionStateService().setupPristineReadOnly(); this.pluginService.getSessionStateService().setReadOnly(readOnly); - - this._isReadOnly = readOnly; + let result; if (this.isReadOnly()) { - return await this.query("SET SESSION CHARACTERISTICS AS TRANSACTION READ ONLY"); + result = await this.query("SET SESSION CHARACTERISTICS AS TRANSACTION READ ONLY"); + } else { + result = await this.query("SET SESSION CHARACTERISTICS AS TRANSACTION READ WRITE"); } - return await this.query("SET SESSION CHARACTERISTICS AS TRANSACTION READ WRITE"); + this._isReadOnly = readOnly; + return result; } isReadOnly(): boolean { diff --git a/tests/integration/container/tests/read_write_splitting.test.ts b/tests/integration/container/tests/read_write_splitting.test.ts new file mode 100644 index 000000000..98466877a --- /dev/null +++ b/tests/integration/container/tests/read_write_splitting.test.ts @@ -0,0 +1,422 @@ +/* + Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + + Licensed under the Apache License, Version 2.0 (the "License"). + You may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +import { TestEnvironment } from "./utils/test_environment"; +import { DriverHelper } from "./utils/driver_helper"; +import { AuroraTestUtility } from "./utils/aurora_test_utility"; +import { AwsWrapperError, FailoverSuccessError } from "aws-wrapper-common-lib/lib/utils/errors"; +import { DatabaseEngine } from "./utils/database_engine"; +import { QueryResult } from "pg"; +import { ProxyHelper } from "./utils/proxy_helper"; + +let env: TestEnvironment; +let driver; +let initClientFunc: (props: any) => any; + +const auroraTestUtility = new AuroraTestUtility(); + +async function initDefaultConfig(host: string, port: number, connectToProxy: boolean): Promise { + const env = await TestEnvironment.getCurrent(); + + let config: any = { + user: env.databaseInfo.username, + host: host, + database: env.databaseInfo.default_db_name, + password: env.databaseInfo.password, + port: port, + plugins: "readWriteSplitting" + }; + + if (connectToProxy) { + config["clusterInstanceHostPattern"] = "?." + env.proxyDatabaseInfo.instanceEndpointSuffix; + } + config = DriverHelper.addDriverSpecificConfiguration(config, env.engine); + return config; +} + +async function initConfigWithFailover(host: string, port: number, connectToProxy: boolean): Promise { + const env = await TestEnvironment.getCurrent(); + + let config: any = { + user: env.databaseInfo.username, + host: host, + database: env.databaseInfo.default_db_name, + password: env.databaseInfo.password, + port: port, + plugins: "readWriteSplitting,failover", + failoverTimeoutMs: 400000, + failoverMode: "reader-or-writer" + }; + + if (connectToProxy) { + config["clusterInstanceHostPattern"] = "?." + env.proxyDatabaseInfo.instanceEndpointSuffix; + } + config = DriverHelper.addDriverSpecificConfiguration(config, env.engine); + return config; +} + +describe("aurora read write splitting", () => { + beforeAll(async () => { + env = await TestEnvironment.getCurrent(); + driver = DriverHelper.getDriverForDatabaseEngine(env.engine); + initClientFunc = DriverHelper.getClient(driver); + }); + + beforeEach(async () => { + await ProxyHelper.enableAllConnectivity(); + }); + + it("test connect to writer switch set read only", async () => { + const config = await initDefaultConfig(env.databaseInfo.clusterEndpoint, env.databaseInfo.clusterEndpointPort, false); + const client = initClientFunc(config); + + client.on("error", (error: any) => { + console.log(error); + }); + + await client.connect(); + const initialWriterId = await auroraTestUtility.queryInstanceId(client); + expect(await auroraTestUtility.isDbInstanceWriter(initialWriterId)).toBe(true); + + await client.setReadOnly(true); + const readerId = await auroraTestUtility.queryInstanceId(client); + expect(readerId).not.toBe(initialWriterId); + + await client.setReadOnly(true); + const currentId0 = await auroraTestUtility.queryInstanceId(client); + expect(currentId0).toBe(readerId); + + await client.setReadOnly(false); + const currentId1 = await auroraTestUtility.queryInstanceId(client); + expect(currentId1).toBe(initialWriterId); + + await client.setReadOnly(false); + const currentId2 = await auroraTestUtility.queryInstanceId(client); + expect(currentId2).toBe(initialWriterId); + + await client.setReadOnly(true); + const currentId3 = await auroraTestUtility.queryInstanceId(client); + expect(currentId3).toBe(readerId); + expect(await auroraTestUtility.isDbInstanceWriter(currentId3)).toBe(false); + + await client.end(); + }, 9000000); + + it("test set read only false in read only transaction", async () => { + const config = await initDefaultConfig(env.databaseInfo.clusterEndpoint, env.databaseInfo.clusterEndpointPort, false); + const client = initClientFunc(config); + + client.on("error", (error: any) => { + console.log(error); + }); + + await client.connect(); + const initialWriterId = await auroraTestUtility.queryInstanceId(client); + + await client.setReadOnly(true); + const initialReaderId = await auroraTestUtility.queryInstanceId(client); + expect(initialReaderId).not.toBe(initialWriterId); + + await DriverHelper.executeQuery(env.engine, client, "START TRANSACTION READ ONLY"); // start transaction + await DriverHelper.executeQuery(env.engine, client, "SELECT 1"); + + try { + await client.setReadOnly(false); + } catch (error) { + console.log(error); + if (!(error instanceof AwsWrapperError)) { + throw new Error("Resulting error type incorrect"); + } + } + const currentConnectionId0 = await auroraTestUtility.queryInstanceId(client); + expect(currentConnectionId0).toBe(initialReaderId); + + await DriverHelper.executeQuery(env.engine, client, "COMMIT"); + + await client.setReadOnly(false); + const currentConnectionId1 = await auroraTestUtility.queryInstanceId(client); + expect(currentConnectionId1).toBe(initialWriterId); + + await client.end(); + }, 9000000); + + it("test set read only true in transaction", async () => { + const config = await initDefaultConfig(env.databaseInfo.clusterEndpoint, env.databaseInfo.clusterEndpointPort, false); + const client = initClientFunc(config); + console.log("test set read only true in transaction"); + + client.on("error", (error: any) => { + console.log(error); + }); + + await client.connect(); + const initialWriterId = await auroraTestUtility.queryInstanceId(client); + + await DriverHelper.executeQuery(env.engine, client, "DROP TABLE IF EXISTS test3_3"); + await DriverHelper.executeQuery(env.engine, client, "CREATE TABLE test3_3 (id int not null primary key, test3_3_field varchar(255) not null)"); + + await DriverHelper.executeQuery(env.engine, client, "START TRANSACTION"); // start transaction + await DriverHelper.executeQuery(env.engine, client, "INSERT INTO test3_3 VALUES (1, 'test field string 1')"); + + await client.setReadOnly(true); + const currentReaderId = await auroraTestUtility.queryInstanceId(client); + expect(currentReaderId).toBe(initialWriterId); + + await DriverHelper.executeQuery(env.engine, client, "COMMIT"); + + // Assert that 1 row has been inserted to the table. + const result = await DriverHelper.executeQuery(env.engine, client, "SELECT count(*) from test3_3"); + if (env.engine === DatabaseEngine.PG) { + expect((result as QueryResult).rows[0]["count"]).toBe("1"); + } else if (env.engine === DatabaseEngine.MYSQL) { + expect(JSON.parse(JSON.stringify(result))[0][0]["count(*)"]).toBe(1); + } + await client.setReadOnly(false); + const currentConnectionId1 = await auroraTestUtility.queryInstanceId(client); + expect(await auroraTestUtility.isDbInstanceWriter(currentConnectionId1)).toBe(true); + expect(currentConnectionId1).toBe(initialWriterId); + + await DriverHelper.executeQuery(env.engine, client, "DROP TABLE IF EXISTS test3_3"); + + await client.end(); + }, 9000000); + + it("test set read only all readers down", async () => { + // Connect to writer instance + const config = await initDefaultConfig(env.proxyDatabaseInfo.clusterEndpoint, env.proxyDatabaseInfo.clusterEndpointPort, true); + const client = initClientFunc(config); + client.on("error", (err: any) => { + console.log(err); + }); + await client.connect(); + const writerId = await auroraTestUtility.queryInstanceId(client); + expect(await auroraTestUtility.isDbInstanceWriter(writerId)).toBe(true); + + // Kill all reader instances + for (const host of env.proxyDatabaseInfo.instances) { + if (host.instanceId && host.instanceId !== writerId) { + await ProxyHelper.disableConnectivity(env.engine, host.instanceId); + } + } + + await client.setReadOnly(true); + const currentReaderId0 = await auroraTestUtility.queryInstanceId(client); + expect(currentReaderId0).toBe(writerId); + + await client.setReadOnly(false); + const currentReaderId1 = await auroraTestUtility.queryInstanceId(client); + expect(currentReaderId1).toBe(writerId); + + await ProxyHelper.enableAllConnectivity(); + await client.setReadOnly(true); + const currentReaderId2 = await auroraTestUtility.queryInstanceId(client); + expect(currentReaderId2).not.toBe(writerId); + + await client.end(); + }, 9000000); + + it("test set read only all instances down", async () => { + const config = await initDefaultConfig(env.databaseInfo.clusterEndpoint, env.databaseInfo.clusterEndpointPort, false); + const client = initClientFunc(config); + + client.on("error", (error: any) => { + console.log(error); + }); + await client.connect(); + const writerId = await auroraTestUtility.queryInstanceId(client); + + await client.setReadOnly(true); + const currentReaderId0 = await auroraTestUtility.queryInstanceId(client); + expect(currentReaderId0).not.toBe(writerId); + + // Kill all instances + await ProxyHelper.disableAllConnectivity(env.engine); + + try { + await client.setReadOnly(false); + } catch (error) { + console.log(error); + + if (!(error instanceof AwsWrapperError)) { + throw new Error("read write splitting all instances down failed"); + } + } + await client.end(); + }, 9000000); + + // Uncomment these tests when failover implementation is complete + // it("test failover to new writer set read only true false", async () => { + // // Connect to writer instance + // const writerConfig = await initConfigWithFailover(env.proxyDatabaseInfo.clusterEndpoint, env.proxyDatabaseInfo.clusterEndpointPort, true); + // const writerClient = initClientFunc(writerConfig); + // writerClient.on("error", (err: any) => { + // console.log(err); + // }); + // await writerClient.connect(); + // const initialWriterId = await auroraTestUtility.queryInstanceId(writerClient); + // + // // Kill all reader instances + // for (const host of env.proxyDatabaseInfo.instances) { + // if (host.instanceId && host.instanceId !== initialWriterId) { + // await ProxyHelper.disableConnectivity(env.engine, host.instanceId); + // } + // } + // + // // Force internal reader connection to the writer instance + // await writerClient.setReadOnly(true); + // const currentReaderId0 = await auroraTestUtility.queryInstanceId(writerClient); + // expect(currentReaderId0).toBe(initialWriterId); + // + // await writerClient.setReadOnly(false); + // + // await ProxyHelper.enableAllConnectivity(); + // + // // Crash instance 1 and nominate a new writer + // await auroraTestUtility.failoverClusterAndWaitUntilWriterChanged(); + // + // try { + // await auroraTestUtility.queryInstanceId(writerClient); + // throw new Error("Failover did not occur"); + // } catch (error: any) { + // if (!(error instanceof FailoverSuccessError)) { + // throw new Error("Failover failed"); + // } + // } + // const newWriterId = await auroraTestUtility.queryInstanceId(writerClient); + // expect(await auroraTestUtility.isDbInstanceWriter(newWriterId)).toBe(true); + // expect(newWriterId).not.toBe(initialWriterId); + // + // await writerClient.setReadOnly(true); + // const currentReaderId = await auroraTestUtility.queryInstanceId(writerClient); + // expect(currentReaderId).not.toBe(newWriterId); + // + // await writerClient.setReadOnly(false); + // const currentId = await auroraTestUtility.queryInstanceId(writerClient); + // expect(currentId).toBe(newWriterId); + // + // await writerClient.end(); + // }, 1000000); + // + // it("test failover to new reader set read only false true", async () => { + // // Connect to writer instance + // const writerConfig = await initConfigWithFailover(env.proxyDatabaseInfo.clusterEndpoint, env.proxyDatabaseInfo.clusterEndpointPort, true); + // const writerClient = initClientFunc(writerConfig); + // writerClient.on("error", (err: any) => { + // console.log(err); + // }); + // await writerClient.connect(); + // const initialWriterId = await auroraTestUtility.queryInstanceId(writerClient); + // await writerClient.setReadOnly(true); + // + // const otherReaderId = await auroraTestUtility.queryInstanceId(writerClient); + // expect(otherReaderId).not.toBe(initialWriterId); + // + // // Get a reader instance + // let readerInstanceHost; + // let readerInstanceHostId; + // for (const host of env.proxyDatabaseInfo.instances) { + // if (host.instanceId && host.instanceId !== otherReaderId && host.instanceId !== initialWriterId) { + // readerInstanceHost = host.host; + // readerInstanceHostId = host.instanceId; + // break; + // } + // } + // + // if (!readerInstanceHost) { + // throw new Error("Could not find a reader instance"); + // } + // + // // Kill all instances except one other reader + // for (const host of env.proxyDatabaseInfo.instances) { + // if (host.instanceId && host.instanceId !== otherReaderId) { + // await ProxyHelper.disableConnectivity(env.engine, host.instanceId); + // } + // } + // + // try { + // await auroraTestUtility.queryInstanceId(writerClient); + // } catch (error) { + // console.log(error); + // + // if (!(error instanceof FailoverSuccessError)) { + // console.log(error); + // throw new Error("failover success failed"); + // } + // } + // + // const currentReaderId0 = await auroraTestUtility.queryInstanceId(writerClient); + // expect(currentReaderId0).toBe(otherReaderId); + // expect(currentReaderId0).not.toBe(readerInstanceHostId); + // + // await ProxyHelper.enableAllConnectivity(); + // + // await writerClient.setReadOnly(false); + // const currentReaderId1 = await auroraTestUtility.queryInstanceId(writerClient); + // expect(currentReaderId1).toBe(initialWriterId); + // + // await writerClient.setReadOnly(true); + // const currentReaderId2 = await auroraTestUtility.queryInstanceId(writerClient); + // expect(currentReaderId2).toBe(otherReaderId); + // + // await writerClient.end(); + // }, 1000000); + // + // it("test failover to new writer set read only true false", async () => { + // // Connect to writer instance + // const writerConfig = await initConfigWithFailover(env.proxyDatabaseInfo.clusterEndpoint, env.proxyDatabaseInfo.clusterEndpointPort, true); + // const writerClient = initClientFunc(writerConfig); + // writerClient.on("error", (err: any) => { + // console.log(err); + // }); + // await writerClient.connect(); + // const initialWriterId = await auroraTestUtility.queryInstanceId(writerClient); + // await writerClient.setReadOnly(true); + // + // const currentReaderId = await auroraTestUtility.queryInstanceId(writerClient); + // expect(currentReaderId).not.toBe(initialWriterId); + // + // // Kill all instances except the writer + // for (const host of env.proxyDatabaseInfo.instances) { + // if (host.instanceId && host.instanceId !== initialWriterId) { + // await ProxyHelper.disableConnectivity(env.engine, host.instanceId); + // } + // } + // try { + // await auroraTestUtility.queryInstanceId(writerClient); + // } catch (error) { + // console.log(error); + // + // if (!(error instanceof FailoverSuccessError)) { + // throw new Error("failover success failed"); + // } + // } + // + // const currentId0 = await auroraTestUtility.queryInstanceId(writerClient); + // expect(currentId0).toBe(initialWriterId); + // + // await ProxyHelper.enableAllConnectivity(); + // + // await writerClient.setReadOnly(true); + // const currentId1 = await auroraTestUtility.queryInstanceId(writerClient); + // expect(currentId1).not.toBe(initialWriterId); + // + // await writerClient.setReadOnly(false); + // const currentId2 = await auroraTestUtility.queryInstanceId(writerClient); + // expect(currentId2).toBe(initialWriterId); + // + // await writerClient.end(); + // }, 1000000); +}); diff --git a/tests/unit/read_write_splitting.test.ts b/tests/unit/read_write_splitting.test.ts new file mode 100644 index 000000000..819880389 --- /dev/null +++ b/tests/unit/read_write_splitting.test.ts @@ -0,0 +1,333 @@ +/* + Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + + Licensed under the Apache License, Version 2.0 (the "License"). + You may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +import { AwsClient } from "aws-wrapper-common-lib/lib/aws_client"; +import { HostInfo } from "aws-wrapper-common-lib/lib/host_info"; +import { HostInfoBuilder } from "aws-wrapper-common-lib/lib/host_info_builder"; +import { HostRole } from "aws-wrapper-common-lib/lib/host_role"; +import { PluginService } from "aws-wrapper-common-lib/lib/plugin_service"; +import { AwsWrapperError, FailoverSuccessError } from "aws-wrapper-common-lib/lib/utils/errors"; +import { AwsMySQLClient } from "../../mysql"; +import { anything, instance, mock, reset, verify, when } from "ts-mockito"; +import { HostListProviderService } from "aws-wrapper-common-lib/lib/host_list_provider_service"; +import { ReadWriteSplittingPlugin } from "aws-wrapper-common-lib/lib/plugins/read_write_splitting"; +import { SimpleHostAvailabilityStrategy } from "aws-wrapper-common-lib/lib/host_availability/simple_host_availability_strategy"; +import { MySQLDatabaseDialect } from "mysql-wrapper/lib/dialect/mysql_database_dialect"; +import { HostChangeOptions } from "aws-wrapper-common-lib/lib/host_change_options"; +import { OldConnectionSuggestionAction } from "aws-wrapper-common-lib/lib/old_connection_suggestion_action"; +import { HostListProvider } from "aws-wrapper-common-lib/lib/host_list_provider/host_list_provider"; +import { WrapperProperties } from "aws-wrapper-common-lib/lib/wrapper_property"; + +const properties: Map = new Map(); +const builder = new HostInfoBuilder({ hostAvailabilityStrategy: new SimpleHostAvailabilityStrategy() }); +const writerHost = builder.withHost("writer-host").withRole(HostRole.WRITER).build(); +const writerHostUnknownRole = builder.withHost("writer-host").withRole(HostRole.UNKNOWN).build(); +const readerHostIncorrectRole = builder.withHost("instance1").withRole(HostRole.WRITER).build(); +const readerHost1 = builder.withHost("instance1").withRole(HostRole.READER).build(); +const readerHost2 = builder.withHost("instance2").withRole(HostRole.READER).build(); + +const defaultHosts = [writerHost, readerHost1, readerHost2]; +const singleReaderTopology = [writerHost, readerHost1]; +const mockPluginService: PluginService = mock(PluginService); +const mockReaderClient: AwsClient = mock(AwsMySQLClient); +const mockWriterClient: AwsClient = mock(AwsMySQLClient); +const mockNewWriterClient: AwsClient = mock(AwsMySQLClient); +const mockMySQLClient: AwsClient = mock(AwsMySQLClient); +const mockHostInfo: HostInfo = mock(HostInfo); +const mockHostListProviderService: HostListProviderService = mock(); +const mockHostListProvider: HostListProvider = mock(); +const mockClosedReaderClient: AwsClient = mock(AwsMySQLClient); +const mockClosedWriterClient: AwsClient = mock(AwsMySQLClient); +const mockDialect: MySQLDatabaseDialect = mock(MySQLDatabaseDialect); +const mockChanges: Set = mock(Set); + +const mockConnectFunc = jest.fn().mockImplementation(() => { + return mockReaderClient; +}); + +const mockExecuteFuncThrowsFailoverSuccessError = jest.fn().mockImplementation(() => { + throw new FailoverSuccessError("test"); +}); + +describe("reader write splitting test", () => { + beforeEach(() => { + when(mockPluginService.getHostListProvider()).thenReturn(instance(mockHostListProvider)); + when(mockPluginService.getHosts()).thenReturn(defaultHosts); + when(mockPluginService.isInTransaction()).thenReturn(false); + properties.clear(); + }); + + afterEach(() => { + reset(mockReaderClient); + reset(mockMySQLClient); + reset(mockHostInfo); + reset(mockPluginService); + reset(mockHostListProviderService); + reset(mockReaderClient); + reset(mockWriterClient); + reset(mockClosedReaderClient); + reset(mockClosedWriterClient); + }); + + it("test set read only true", async () => { + const mockPluginServiceInstance = instance(mockPluginService); + when(mockPluginService.getHosts()).thenReturn(singleReaderTopology); + when(mockPluginService.getHostInfoByStrategy(anything(), anything())).thenReturn(readerHost1); + when(mockPluginService.getCurrentClient()).thenReturn(instance(mockWriterClient)); + when(await mockWriterClient.isValid()).thenReturn(true); + when(mockPluginService.getCurrentHostInfo()).thenReturn(writerHost); + when(mockPluginService.getDialect()).thenReturn(mockDialect); + when(mockDialect.getConnectFunc(anything())).thenReturn(() => Promise.resolve()); + when(mockPluginService.connect(anything(), anything())).thenReturn(mockReaderClient); + + const target = new ReadWriteSplittingPlugin(mockPluginServiceInstance, properties, mockHostListProviderService, undefined, undefined); + + await target.switchClientIfRequired(true); + verify(mockPluginService.refreshHostList()).once(); + verify(mockPluginService.setCurrentClient(mockReaderClient, readerHost1)).once(); + expect(target.readerTargetClient).toBe(mockReaderClient); + }); + + it("test set read only false", async () => { + const mockPluginServiceInstance = instance(mockPluginService); + + when(mockPluginService.getHosts()).thenReturn(singleReaderTopology); + when(mockPluginService.getHostInfoByStrategy(anything(), anything())).thenReturn(writerHost); + when(mockPluginService.getCurrentClient()).thenReturn(instance(mockReaderClient)); + when(await mockReaderClient.isValid()).thenReturn(true); + when(mockPluginService.getCurrentHostInfo()).thenReturn(readerHost1); + when(mockPluginService.getDialect()).thenReturn(mockDialect); + when(mockDialect.getConnectFunc(anything())).thenReturn(() => Promise.resolve()); + when(mockPluginService.connect(anything(), anything())).thenReturn(mockWriterClient); + + const target = new ReadWriteSplittingPlugin(mockPluginServiceInstance, properties, mockHostListProviderService, mockWriterClient, undefined); + + await target.switchClientIfRequired(false); + verify(mockPluginService.setCurrentClient(mockWriterClient, writerHost)).once(); + expect(target.writerTargetClient).toEqual(mockWriterClient); + }); + + it("test set read only true already on reader", async () => { + const mockPluginServiceInstance = instance(mockPluginService); + const mockHostListProviderServiceInstance = instance(mockHostListProviderService); + + when(mockPluginService.getHosts()).thenReturn(singleReaderTopology); + when(mockPluginService.getHostInfoByStrategy(anything(), anything())).thenReturn(readerHost1); + when(mockPluginService.getCurrentClient()).thenReturn(instance(mockReaderClient)); + when(await mockReaderClient.isValid()).thenReturn(true); + when(mockPluginService.getCurrentHostInfo()).thenReturn(readerHost1); + when(mockPluginService.getDialect()).thenReturn(mockDialect); + when(mockDialect.getConnectFunc(anything())).thenReturn(() => Promise.resolve()); + when(mockPluginService.connect(anything(), anything())).thenReturn(mockReaderClient); + + const target = new ReadWriteSplittingPlugin( + mockPluginServiceInstance, + properties, + mockHostListProviderServiceInstance, + undefined, + mockReaderClient + ); + + await target.switchClientIfRequired(true); + verify(mockPluginService.setCurrentClient(anything(), anything())).never(); + expect(target.readerTargetClient).toEqual(mockReaderClient); + expect(target.writerTargetClient).toEqual(undefined); + }); + + it("test set read only false already on reader", async () => { + const mockPluginServiceInstance = instance(mockPluginService); + const mockHostListProviderServiceInstance = instance(mockHostListProviderService); + when(mockPluginService.getHosts()).thenReturn(singleReaderTopology); + when(mockPluginService.getHostInfoByStrategy(anything(), anything())).thenReturn(readerHost1); + when(mockPluginService.getCurrentClient()).thenReturn(instance(mockWriterClient)); + when(await mockWriterClient.isValid()).thenReturn(true); + when(mockPluginService.getCurrentHostInfo()).thenReturn(writerHost); + when(mockPluginService.getDialect()).thenReturn(mockDialect); + when(mockDialect.getConnectFunc(anything())).thenReturn(() => Promise.resolve()); + when(mockPluginService.connect(anything(), anything())).thenReturn(mockReaderClient); + + const target = new ReadWriteSplittingPlugin( + mockPluginServiceInstance, + properties, + mockHostListProviderServiceInstance, + mockWriterClient, + undefined + ); + + await target.switchClientIfRequired(false); + verify(mockPluginService.setCurrentClient(anything(), anything())).never(); + expect(target.writerTargetClient).toEqual(mockWriterClient); + expect(target.readerTargetClient).toEqual(undefined); + }); + + it("test set read only true one host", async () => { + const mockPluginServiceInstance = instance(mockPluginService); + + when(mockPluginService.getHosts()).thenReturn([writerHost]); + when(mockPluginService.getHostInfoByStrategy(anything(), anything())).thenReturn(writerHost); + when(mockPluginService.getCurrentClient()).thenReturn(instance(mockWriterClient)); + when(await mockWriterClient.isValid()).thenReturn(true); + when(await mockPluginService.isClientValid(mockWriterClient)).thenReturn(true); + when(mockPluginService.getCurrentHostInfo()).thenReturn(writerHost); + when(mockPluginService.getDialect()).thenReturn(mockDialect); + when(mockDialect.getConnectFunc(anything())).thenReturn(() => Promise.resolve()); + when(mockPluginService.connect(anything(), anything())).thenReturn(mockWriterClient); + + const target = new ReadWriteSplittingPlugin(mockPluginServiceInstance, properties, mockHostListProviderService, mockWriterClient, undefined); + + await target.switchClientIfRequired(true); + + verify(mockPluginService.setCurrentClient(anything(), anything())).never(); + expect(target.readerTargetClient).toEqual(undefined); + expect(target.writerTargetClient).toEqual(mockWriterClient); + }); + + it("test connect incorrect host role", async () => { + const mockPluginServiceInstance = instance(mockPluginService); + const mockHostListProviderServiceInstance = instance(mockHostListProviderService); + const mockHostListProviderInstance = instance(mockHostListProvider); + + when(mockPluginService.getCurrentClient()).thenReturn(mockReaderClient); + when(mockPluginService.getInitialConnectionHostInfo()).thenReturn(readerHostIncorrectRole); + when(mockPluginService.getCurrentHostInfo()).thenReturn(readerHost1); + + when(mockPluginService.acceptsStrategy(anything(), anything())).thenReturn(true); + when(mockHostListProviderService.isStaticHostListProvider()).thenReturn(false); + when(mockHostListProviderService.getHostListProvider()).thenReturn(mockHostListProviderInstance); + + const target = new ReadWriteSplittingPlugin(mockPluginServiceInstance, properties, mockHostListProviderServiceInstance, undefined, undefined); + + await target.connect(writerHost, properties, true, mockConnectFunc); + verify(mockHostListProviderService.setInitialConnectionHostInfo(anything())).once(); + expect(mockConnectFunc).toHaveBeenCalled(); + }); + + it("test set read only false writer connection failed", async () => { + const mockPluginServiceInstance = instance(mockPluginService); + + when(mockPluginService.getHosts()).thenReturn(singleReaderTopology); + when(mockPluginService.getHostInfoByStrategy(anything(), anything())).thenReturn(readerHost1); + when(mockPluginService.getCurrentClient()).thenReturn(instance(mockReaderClient)); + when(mockPluginService.getCurrentHostInfo()).thenReturn(readerHost1); + when(await mockPluginService.connect(writerHost, properties)).thenReject(); + + const target = new ReadWriteSplittingPlugin(mockPluginServiceInstance, properties, mockHostListProviderService, undefined, mockReaderClient); + + await expect(async () => await target.switchClientIfRequired(false)).rejects.toThrow(AwsWrapperError); + verify(mockPluginService.setCurrentClient(anything(), anything())).never(); + }); + + it("test set read only true reader connection failed", async () => { + const mockPluginServiceInstance = instance(mockPluginService); + const mockHostListProviderServiceInstance = instance(mockHostListProviderService); + + when(mockPluginService.getHosts()).thenReturn(defaultHosts); + when(mockPluginService.getHostInfoByStrategy(anything(), anything())).thenReturn(readerHost1); + when(mockPluginService.getCurrentClient()).thenReturn(instance(mockWriterClient)); + when(await mockWriterClient.isValid()).thenReturn(true); + when(mockPluginService.getCurrentHostInfo()).thenReturn(writerHost); + when(mockPluginService.connect(readerHost1 || readerHost2, properties)).thenReject(); + + const target = new ReadWriteSplittingPlugin( + mockPluginServiceInstance, + properties, + mockHostListProviderServiceInstance, + mockWriterClient, + undefined + ); + + await target.switchClientIfRequired(true); + verify(mockPluginService.setCurrentClient(anything(), anything())).never(); + expect(target.readerTargetClient).toEqual(undefined); + }); + + it("test set read only on closed connection", async () => { + const mockPluginServiceInstance = instance(mockPluginService); + + when(mockPluginService.getHosts()).thenReturn(singleReaderTopology); + when(mockPluginService.getHostInfoByStrategy(anything(), anything())).thenReturn(writerHost); + when(mockPluginService.getCurrentClient()).thenReturn(instance(mockClosedWriterClient)); + when(mockPluginService.getCurrentHostInfo()).thenReturn(writerHost); + + const target = new ReadWriteSplittingPlugin(mockPluginServiceInstance, properties, mockHostListProviderService, mockWriterClient, undefined); + + await expect(async () => await target.switchClientIfRequired(true)).rejects.toThrow(AwsWrapperError); + verify(mockPluginService.setCurrentClient(anything(), anything())).never(); + expect(target.readerTargetClient).toEqual(undefined); + }); + + it("test execute failover to new writer", async () => { + const mockPluginServiceInstance = instance(mockPluginService); + properties.set(WrapperProperties.ENABLE_CLUSTER_AWARE_FAILOVER.name, true); + + when(mockPluginService.getHosts()).thenReturn(singleReaderTopology); + when(mockPluginService.getHostInfoByStrategy(anything(), anything())).thenReturn(writerHost); + when(mockPluginService.getCurrentClient()).thenReturn(mockNewWriterClient); + when(mockPluginService.getDialect()).thenReturn(mockDialect); + when(mockPluginService.getCurrentHostInfo()).thenReturn(writerHost); + when(await mockPluginService.isClientValid(mockWriterClient)).thenReturn(true); + const target = new ReadWriteSplittingPlugin(mockPluginServiceInstance, properties, mockHostListProviderService, mockWriterClient, undefined); + + await expect(async () => { + await target.execute("query", mockExecuteFuncThrowsFailoverSuccessError, "test"); + }).rejects.toThrow(new FailoverSuccessError("test")); + + verify(mockPluginService.tryClosingTargetClient(mockWriterClient)).once(); + }); + + it("test notify connection changed", async () => { + const mockPluginServiceInstance = instance(mockPluginService); + + when(mockPluginService.getHosts()).thenReturn(defaultHosts); + when(mockPluginService.getCurrentClient()).thenReturn(mockWriterClient); + when(mockPluginService.getCurrentHostInfo()).thenReturn(writerHost); + + const target = new ReadWriteSplittingPlugin(mockPluginServiceInstance, properties, mockHostListProviderService, undefined, undefined); + + const suggestion = target.notifyConnectionChanged(mockChanges); + expect(suggestion).toEqual(OldConnectionSuggestionAction.NO_OPINION); + }); + + it("test notify non initial connection", async () => { + const mockPluginServiceInstance = instance(mockPluginService); + + when(mockPluginService.getHosts()).thenReturn(singleReaderTopology); + when(mockPluginService.getCurrentClient()).thenReturn(mockWriterClient); + when(mockPluginService.getCurrentHostInfo()).thenReturn(writerHost); + when(mockPluginService.acceptsStrategy(anything(), anything())).thenReturn(true); + + const target = new ReadWriteSplittingPlugin(mockPluginServiceInstance, properties, mockHostListProviderService, mockWriterClient, undefined); + + await target.connect(writerHost, properties, false, mockConnectFunc); + + expect(mockConnectFunc).toHaveBeenCalled(); + verify(mockHostListProviderService.getInitialConnectionHostInfo()).never(); + }); + + it("test connect error updating host", async () => { + const mockPluginServiceInstance = instance(mockPluginService); + const mockHostListProviderServiceInstance = instance(mockHostListProviderService); + + when(mockPluginService.getCurrentHostInfo()).thenReturn(writerHostUnknownRole); + when(mockPluginService.acceptsStrategy(anything(), anything())).thenReturn(true); + when(mockHostListProviderService.isStaticHostListProvider()).thenReturn(false); + + const target = new ReadWriteSplittingPlugin(mockPluginServiceInstance, properties, mockHostListProviderServiceInstance, undefined, undefined); + + await expect(async () => await target.connect(writerHost, properties, true, mockConnectFunc)).rejects.toThrow(AwsWrapperError); + verify(mockHostListProviderService.setInitialConnectionHostInfo(anything())).never(); + }); +});