From 9c6277c7feed40a75e7a6f9f28a9421b3981f0e8 Mon Sep 17 00:00:00 2001 From: joyc-bq Date: Thu, 16 Jan 2025 10:38:51 -0800 Subject: [PATCH] feat: efm2 --- common/lib/connection_plugin_chain_builder.ts | 2 + .../efm/host_monitoring_connection_plugin.ts | 9 +- .../plugins/efm/monitor_connection_context.ts | 1 - .../host_monitoring2_connection_plugin.ts | 175 ++++++++++ .../efm2/host_monitoring2_plugin_factory.ts | 37 +++ common/lib/plugins/efm2/monitor.ts | 310 ++++++++++++++++++ .../efm2/monitor_connection_context.ts | 57 ++++ common/lib/plugins/efm2/monitor_service.ts | 175 ++++++++++ common/lib/utils/locales/en.json | 8 +- common/lib/wrapper_property.ts | 2 +- ...nal_connection_pooling_postgres_example.ts | 2 +- .../aws_read_write_splitting_mysql_example.ts | 2 +- ...fastest_response_strategy_mysql_example.ts | 4 +- ...test_response_strategy_postgres_example.ts | 2 +- .../container/tests/autoscaling.test.ts | 2 +- .../tests/basic_connectivity.test.ts | 6 +- .../container/tests/performance.test.ts | 2 +- .../tests/read_write_splitting.test.ts | 2 +- .../container/tests/session_state.test.ts | 218 ++++++------ 19 files changed, 891 insertions(+), 125 deletions(-) create mode 100644 common/lib/plugins/efm2/host_monitoring2_connection_plugin.ts create mode 100644 common/lib/plugins/efm2/host_monitoring2_plugin_factory.ts create mode 100644 common/lib/plugins/efm2/monitor.ts create mode 100644 common/lib/plugins/efm2/monitor_connection_context.ts create mode 100644 common/lib/plugins/efm2/monitor_service.ts diff --git a/common/lib/connection_plugin_chain_builder.ts b/common/lib/connection_plugin_chain_builder.ts index d099797c..dd172cad 100644 --- a/common/lib/connection_plugin_chain_builder.ts +++ b/common/lib/connection_plugin_chain_builder.ts @@ -40,6 +40,7 @@ import { ConnectionPluginFactory } from "./plugin_factory"; import { LimitlessConnectionPluginFactory } from "./plugins/limitless/limitless_connection_plugin_factory"; import { FastestResponseStrategyPluginFactory } from "./plugins/strategy/fastest_response/fastest_respose_strategy_plugin_factory"; import { ConfigurationProfile } from "./profile/configuration_profile"; +import { HostMonitoring2PluginFactory } from "./plugins/efm2/host_monitoring2_plugin_factory"; /* Type alias used for plugin factory sorting. It holds a reference to a plugin @@ -61,6 +62,7 @@ export class ConnectionPluginChainBuilder { ["failover", { factory: FailoverPluginFactory, weight: 700 }], ["failover2", { factory: Failover2PluginFactory, weight: 710 }], ["efm", { factory: HostMonitoringPluginFactory, weight: 800 }], + ["efm2", { factory: HostMonitoring2PluginFactory, weight: 810 }], ["fastestResponseStrategy", { factory: FastestResponseStrategyPluginFactory, weight: 900 }], ["limitless", { factory: LimitlessConnectionPluginFactory, weight: 950 }], ["iam", { factory: IamAuthenticationPluginFactory, weight: 1000 }], diff --git a/common/lib/plugins/efm/host_monitoring_connection_plugin.ts b/common/lib/plugins/efm/host_monitoring_connection_plugin.ts index d27fb26e..63ba986b 100644 --- a/common/lib/plugins/efm/host_monitoring_connection_plugin.ts +++ b/common/lib/plugins/efm/host_monitoring_connection_plugin.ts @@ -121,6 +121,7 @@ export class HostMonitoringConnectionPlugin extends AbstractConnectionPlugin imp } finally { if (monitorContext != null) { await this.monitorService.stopMonitoring(monitorContext); + logger.debug(Messages.get("HostMonitoringConnectionPlugin.monitoringDeactivated", methodName)); if (monitorContext.isHostUnhealthy) { const monitoringHostInfo = await this.getMonitoringHostInfo(); @@ -148,7 +149,7 @@ export class HostMonitoringConnectionPlugin extends AbstractConnectionPlugin imp return result; } - private throwUnableToIdentifyConnection(host: HostInfo | null, provider: HostListProvider | null): never { + private throwUnableToIdentifyConnection(host: HostInfo | null): never { throw new AwsWrapperError( Messages.get( "HostMonitoringConnectionPlugin.unableToIdentifyConnection", @@ -163,17 +164,17 @@ export class HostMonitoringConnectionPlugin extends AbstractConnectionPlugin imp this.monitoringHostInfo = this.pluginService.getCurrentHostInfo(); const provider: HostListProvider | null = this.pluginService.getHostListProvider(); if (this.monitoringHostInfo == null) { - this.throwUnableToIdentifyConnection(null, provider); + this.throwUnableToIdentifyConnection(null); } const rdsUrlType: RdsUrlType = this.rdsUtils.identifyRdsType(this.monitoringHostInfo.url); try { if (rdsUrlType.isRdsCluster) { - logger.debug("Monitoring host info is associated with a cluster endpoint, plugin needs to identify the cluster connection"); + logger.debug(Messages.get("HostMonitoringConnectionPlugin.identifyClusterConnection")); this.monitoringHostInfo = await this.pluginService.identifyConnection(this.pluginService.getCurrentClient().targetClient!); if (this.monitoringHostInfo == null) { const host: HostInfo | null = this.pluginService.getCurrentHostInfo(); - this.throwUnableToIdentifyConnection(host, provider); + this.throwUnableToIdentifyConnection(host); } await this.pluginService.fillAliases(this.pluginService.getCurrentClient().targetClient!, this.monitoringHostInfo); } diff --git a/common/lib/plugins/efm/monitor_connection_context.ts b/common/lib/plugins/efm/monitor_connection_context.ts index 3781da62..adb1d7f2 100644 --- a/common/lib/plugins/efm/monitor_connection_context.ts +++ b/common/lib/plugins/efm/monitor_connection_context.ts @@ -127,7 +127,6 @@ export class MonitorConnectionContext { const invalidHostDurationNano: number = statusCheckEndNano - this.invalidHostStartTimeNano; const maxInvalidHostDurationMillis: number = this.failureDetectionIntervalMillis * Math.max(0, this.failureDetectionCount); - if (this.failureCount >= this.failureDetectionCount || invalidHostDurationNano >= maxInvalidHostDurationMillis * 1_000_000) { logger.debug(Messages.get("MonitorConnectionContext.hostDead", hostName)); this.isHostUnhealthy = true; diff --git a/common/lib/plugins/efm2/host_monitoring2_connection_plugin.ts b/common/lib/plugins/efm2/host_monitoring2_connection_plugin.ts new file mode 100644 index 00000000..8cbea025 --- /dev/null +++ b/common/lib/plugins/efm2/host_monitoring2_connection_plugin.ts @@ -0,0 +1,175 @@ +/* + Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + + Licensed under the Apache License, Version 2.0 (the "License"). + You may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +import { PluginService } from "../../plugin_service"; +import { HostChangeOptions } from "../../host_change_options"; +import { HostInfo } from "../../host_info"; +import { OldConnectionSuggestionAction } from "../../old_connection_suggestion_action"; +import { RdsUtils } from "../../utils/rds_utils"; +import { AbstractConnectionPlugin } from "../../abstract_connection_plugin"; +import { RdsUrlType } from "../../utils/rds_url_type"; +import { WrapperProperties } from "../../wrapper_property"; +import { MonitorConnectionContext } from "./monitor_connection_context"; +import { logger, uniqueId } from "../../../logutils"; +import { Messages } from "../../utils/messages"; +import { MonitorService, MonitorServiceImpl } from "./monitor_service"; +import { AwsWrapperError } from "../../utils/errors"; +import { HostListProvider } from "../../host_list_provider/host_list_provider"; +import { CanReleaseResources } from "../../can_release_resources"; +import { SubscribedMethodHelper } from "../../utils/subscribed_method_helper"; +import { ClientWrapper } from "../../client_wrapper"; + +export class HostMonitoring2ConnectionPlugin extends AbstractConnectionPlugin implements CanReleaseResources { + id: string = uniqueId("_efm2Plugin"); + private readonly properties: Map; + private pluginService: PluginService; + private rdsUtils: RdsUtils; + private monitoringHostInfo: HostInfo | null = null; + private monitorService: MonitorService; + + constructor(pluginService: PluginService, properties: Map, rdsUtils: RdsUtils = new RdsUtils(), monitorService?: MonitorServiceImpl) { + super(); + this.pluginService = pluginService; + this.properties = properties; + this.rdsUtils = rdsUtils; + this.monitorService = monitorService ?? new MonitorServiceImpl(pluginService); + } + + getSubscribedMethods(): Set { + return new Set(["*"]); + } + + connect( + hostInfo: HostInfo, + props: Map, + isInitialConnection: boolean, + connectFunc: () => Promise + ): Promise { + return this.connectInternal(hostInfo, connectFunc); + } + + forceConnect( + hostInfo: HostInfo, + props: Map, + isInitialConnection: boolean, + forceConnectFunc: () => Promise + ): Promise { + return this.connectInternal(hostInfo, forceConnectFunc); + } + + private async connectInternal(hostInfo: HostInfo, connectFunc: () => Promise): Promise { + const targetClient = await connectFunc(); + if (targetClient != null) { + const type: RdsUrlType = this.rdsUtils.identifyRdsType(hostInfo.host); + if (type.isRdsCluster) { + hostInfo.resetAliases(); + await this.pluginService.fillAliases(targetClient, hostInfo); + } + } + return targetClient; + } + + async execute(methodName: string, methodFunc: () => Promise, methodArgs: any): Promise { + const isEnabled: boolean = WrapperProperties.FAILURE_DETECTION_ENABLED.get(this.properties); + + if (!isEnabled || !SubscribedMethodHelper.NETWORK_BOUND_METHODS.includes(methodName)) { + return methodFunc(); + } + + const failureDetectionTimeMillis: number = WrapperProperties.FAILURE_DETECTION_TIME_MS.get(this.properties); + const failureDetectionIntervalMillis: number = WrapperProperties.FAILURE_DETECTION_INTERVAL_MS.get(this.properties); + const failureDetectionCount: number = WrapperProperties.FAILURE_DETECTION_COUNT.get(this.properties); + + let result: T; + let monitorContext: MonitorConnectionContext | null = null; + + try { + logger.debug(Messages.get("HostMonitoringConnectionPlugin.activatedMonitoring", methodName)); + const monitoringHostInfo: HostInfo = await this.getMonitoringHostInfo(); + + monitorContext = await this.monitorService.startMonitoring( + this.pluginService.getCurrentClient().targetClient, + monitoringHostInfo, + this.properties, + failureDetectionTimeMillis, + failureDetectionIntervalMillis, + failureDetectionCount + ); + + result = await methodFunc(); + } finally { + if (monitorContext != null) { + await this.monitorService.stopMonitoring(monitorContext, this.pluginService.getCurrentClient().targetClient); + + logger.debug(Messages.get("HostMonitoringConnectionPlugin.monitoringDeactivated", methodName)); + } + } + + return result; + } + + private throwUnableToIdentifyConnection(host: HostInfo | null): never { + const provider: HostListProvider | null = this.pluginService.getHostListProvider(); + throw new AwsWrapperError( + Messages.get( + "HostMonitoringConnectionPlugin.unableToIdentifyConnection", + host !== null ? host.host : "unknown host", + provider !== null ? provider.getHostProviderType() : "unknown provider" + ) + ); + } + + async getMonitoringHostInfo(): Promise { + if (this.monitoringHostInfo === null) { + this.monitoringHostInfo = this.pluginService.getCurrentHostInfo(); + if (this.monitoringHostInfo === null) { + this.throwUnableToIdentifyConnection(null); + } + const rdsUrlType: RdsUrlType = this.rdsUtils.identifyRdsType(this.monitoringHostInfo.url); + + try { + if (rdsUrlType.isRdsCluster) { + logger.debug(Messages.get("HostMonitoringConnectionPlugin.identifyClusterConnection")); + this.monitoringHostInfo = await this.pluginService.identifyConnection(this.pluginService.getCurrentClient().targetClient!); + if (this.monitoringHostInfo == null) { + const host: HostInfo | null = this.pluginService.getCurrentHostInfo(); + this.throwUnableToIdentifyConnection(host); + } + await this.pluginService.fillAliases(this.pluginService.getCurrentClient().targetClient!, this.monitoringHostInfo); + } + } catch (error: any) { + if (!(error instanceof AwsWrapperError)) { + logger.debug(Messages.get("HostMonitoringConnectionPlugin.errorIdentifyingConnection", error.message)); + } + throw error; + } + } + + return this.monitoringHostInfo; + } + + async notifyConnectionChanged(changes: Set): Promise { + if (changes.has(HostChangeOptions.HOSTNAME) || changes.has(HostChangeOptions.HOST_CHANGED)) { + // Reset monitoring host info since the associated connection has changed. + this.monitoringHostInfo = null; + } + return OldConnectionSuggestionAction.NO_OPINION; + } + + async releaseResources(): Promise { + return this.monitorService.releaseResources(); + } +} diff --git a/common/lib/plugins/efm2/host_monitoring2_plugin_factory.ts b/common/lib/plugins/efm2/host_monitoring2_plugin_factory.ts new file mode 100644 index 00000000..6763b028 --- /dev/null +++ b/common/lib/plugins/efm2/host_monitoring2_plugin_factory.ts @@ -0,0 +1,37 @@ +/* + Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + + Licensed under the Apache License, Version 2.0 (the "License"). + You may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +import { ConnectionPluginFactory } from "../../plugin_factory"; +import { PluginService } from "../../plugin_service"; +import { ConnectionPlugin } from "../../connection_plugin"; +import { RdsUtils } from "../../utils/rds_utils"; +import { AwsWrapperError } from "../../utils/errors"; +import { Messages } from "../../utils/messages"; + +export class HostMonitoring2PluginFactory extends ConnectionPluginFactory { + private static hostMonitoring2Plugin: any; + + async getInstance(pluginService: PluginService, properties: Map): Promise { + try { + if (!HostMonitoring2PluginFactory.hostMonitoring2Plugin) { + HostMonitoring2PluginFactory.hostMonitoring2Plugin = await import("./host_monitoring2_connection_plugin"); + } + return new HostMonitoring2PluginFactory.hostMonitoring2Plugin.HostMonitoring2ConnectionPlugin(pluginService, properties, new RdsUtils()); + } catch (error: any) { + throw new AwsWrapperError(Messages.get("ConnectionPluginChainBuilder.errorImportingPlugin", error.message, "HostMonitoringPlugin")); + } + } +} diff --git a/common/lib/plugins/efm2/monitor.ts b/common/lib/plugins/efm2/monitor.ts new file mode 100644 index 00000000..dc1adec2 --- /dev/null +++ b/common/lib/plugins/efm2/monitor.ts @@ -0,0 +1,310 @@ +/* + Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + + Licensed under the Apache License, Version 2.0 (the "License"). + You may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +import { MonitorConnectionContext } from "./monitor_connection_context"; +import { HostInfo } from "../../host_info"; +import { PluginService } from "../../plugin_service"; +import { logger } from "../../../logutils"; +import { Messages } from "../../utils/messages"; +import { ClientWrapper } from "../../client_wrapper"; +import { sleep } from "../../utils/utils"; +import { WrapperProperties } from "../../wrapper_property"; +import { TelemetryFactory } from "../../utils/telemetry/telemetry_factory"; +import { TelemetryCounter } from "../../utils/telemetry/telemetry_counter"; +import { TelemetryTraceLevel } from "../../utils/telemetry/telemetry_trace_level"; +import { HostAvailability } from "../../host_availability/host_availability"; +import { MapUtils } from "../../utils/map_utils"; +import { AwsWrapperError, InternalQueryTimeoutError } from "../../utils/errors"; + +export interface Monitor { + startMonitoring(context: MonitorConnectionContext): void; + + run(): Promise; + + canDispose(): boolean; + + endMonitoringClient(): Promise; + + end(): Promise; + + releaseResources(): Promise; +} + +export class MonitorImpl implements Monitor { + private static readonly TASK_SLEEP_NANOS: number = 100 * 1000000; + private activeContexts: WeakRef[] = []; + static newContexts: Map>> = new Map>>(); + private readonly pluginService: PluginService; + private readonly telemetryFactory: TelemetryFactory; + private readonly properties: Map; + private readonly hostInfo: HostInfo; + private stopped: boolean = false; + + private monitoringClient: ClientWrapper | null = null; + + private readonly failureDetectionTimeNano: number; + private readonly failureDetectionIntervalNanos: number; + private readonly failureDetectionCount: number; + + private invalidHostStartTimeNano: number; + private failureCount: number; + private hostUnhealthy: boolean = false; + private readonly abortedConnectionsCounter: TelemetryCounter; + + constructor( + pluginService: PluginService, + hostInfo: HostInfo, + properties: Map, + failureDetectionTimeMillis: number, + failureDetectionIntervalMillis: number, + failureDetectionCount: number, + abortedConnectionsCounter: TelemetryCounter + ) { + this.pluginService = pluginService; + this.telemetryFactory = this.pluginService.getTelemetryFactory(); + this.hostInfo = hostInfo; + this.properties = properties; + this.failureDetectionTimeNano = failureDetectionTimeMillis * 1000000; + this.failureDetectionIntervalNanos = failureDetectionIntervalMillis * 1000000; + this.failureDetectionCount = failureDetectionCount; + this.abortedConnectionsCounter = abortedConnectionsCounter; + + const hostId: string = this.hostInfo.hostId ?? this.hostInfo.host; + + this.telemetryFactory.createGauge(`efm2.newContexts.size.${hostId}`, () => MonitorImpl.newContexts.size === Number.MAX_SAFE_INTEGER); + + this.telemetryFactory.createGauge(`efm2.activeContexts.size.${hostId}`, () => this.activeContexts.length === Number.MAX_SAFE_INTEGER); + + this.telemetryFactory.createGauge(`efm2.hostHealthy.${hostId}`, () => (this.hostUnhealthy ? 0 : 1)); + const task1 = this.newContextRun(); + const task2 = this.run(); + Promise.race([task1, task2]).finally(() => { + this.stopped = true; + }); + } + + canDispose(): boolean { + return this.activeContexts.length === 0 && MonitorImpl.newContexts.size === 0; + } + + startMonitoring(context: MonitorConnectionContext): void { + if (this.isStopped()) { + logger.warn(Messages.get("MonitorImpl.monitorIsStopped", this.hostInfo.host)); + } + + const startMonitorTimeNano = this.getCurrentTimeNano()+ this.failureDetectionTimeNano; + const connectionQueue = MapUtils.computeIfAbsent( + MonitorImpl.newContexts, + startMonitorTimeNano, + () => new Array>() + ); + connectionQueue.push(new WeakRef(context)); + } + + async newContextRun(): Promise { + logger.debug(Messages.get("MonitorImpl.startMonitoringTaskNewContext", this.hostInfo.host)); + + try { + while (!this.isStopped()) { + const currentTimeNanos = this.getCurrentTimeNano(); + // Get entries with key (that is a time in nanos) less or equal than current time. + const processedKeys: number[] = new Array(); + for (const [key, val] of MonitorImpl.newContexts.entries()) { + if (key < currentTimeNanos) { + const queue: Array> = val; + processedKeys.push(key); + // Each value of found entry is a queue of monitoring contexts awaiting active monitoring. + // Add all contexts to an active monitoring contexts queue. + // Ignore disposed contexts. + let monitorContextRef: WeakRef | undefined; + while ((monitorContextRef = queue.shift()) != null) { + const monitorContext: MonitorConnectionContext = monitorContextRef.deref(); + if (monitorContext !== null && monitorContext.isActive()) { + this.activeContexts.push(monitorContextRef); + } + } + } + } + processedKeys.forEach((key) => { + MonitorImpl.newContexts.delete(key); + }); + await sleep(1000); + } + return; + } catch (err) { + // do nothing, exit task + } + logger.debug(Messages.get("MonitorImpl.stopMonitoringTaskNewContext", this.hostInfo.host)); + } + + async run(): Promise { + logger.debug(Messages.get("MonitorImpl.startMonitoring", this.hostInfo.host)); + + try { + while (!this.isStopped()) { + try { + if (this.activeContexts.length === 0 && !this.hostUnhealthy) { + await sleep(MonitorImpl.TASK_SLEEP_NANOS / 1000000); + } + + const statusCheckStartTimeNanos: number = this.getCurrentTimeNano(); + const isValid = await this.checkConnectionStatus(); + const statusCheckEndTimeNanos: number = this.getCurrentTimeNano(); + + await this.updateHostHealthStatus(isValid, statusCheckStartTimeNanos, statusCheckEndTimeNanos); + + if (this.hostUnhealthy) { + this.pluginService.setAvailability(this.hostInfo.aliases, HostAvailability.NOT_AVAILABLE); + } + const tmpActiveContexts: WeakRef[] = []; + + let monitorContextRef: WeakRef | undefined; + + while ((monitorContextRef = this.activeContexts.shift()) != null) { + if (this.isStopped()) { + break; + } + const monitorContext = monitorContextRef.deref(); + if (monitorContext == null) { + continue; + } + + if (this.hostUnhealthy) { + // Kill connection + monitorContext.setHostUnhealthy(true); + monitorContext.setInactive(); + const connectionToAbort = monitorContext.getClient(); + if (connectionToAbort != null) { + await this.endMonitoringClient(); + this.abortedConnectionsCounter.inc(); + } + } else if (monitorContext.isActive()) { + tmpActiveContexts.push(monitorContextRef); + } + + // activeContexts is empty now and tmpActiveContexts contains all yet active contexts + // Add active contexts back to the queue. + this.activeContexts.push(...tmpActiveContexts); + + const delayNanos = this.failureDetectionIntervalNanos - (statusCheckEndTimeNanos - statusCheckStartTimeNanos); + await sleep(Math.max(delayNanos, MonitorImpl.TASK_SLEEP_NANOS) / 1000000); + } + } catch (error: any) { + logger.debug(Messages.get("MonitorImpl.exceptionDuringMonitoringContinue", error.message)); + } + } + } catch (error: any) { + logger.debug(Messages.get("MonitorImpl.exceptionDuringMonitoringStop", error.message)); + } finally { + await this.endMonitoringClient(); + await sleep(3000); + } + + logger.debug(Messages.get("MonitorImpl.stopMonitoring", this.hostInfo.host)); + } + + /** + * Check the status of the monitored server by sending a ping. + * + * @return whether the server is still alive and the elapsed time spent checking. + */ + async checkConnectionStatus(): Promise { + const connectContext = this.telemetryFactory.openTelemetryContext("Connection status check", TelemetryTraceLevel.FORCE_TOP_LEVEL); + connectContext.setAttribute("url", this.hostInfo.host); + try { + if (!(await this.pluginService.isClientValid(this.monitoringClient))) { + // Open a new connection. + const monitoringConnProperties: Map = new Map(this.properties); + + for (const key of this.properties.keys()) { + if (!key.startsWith(WrapperProperties.MONITORING_PROPERTY_PREFIX)) { + continue; + } + + monitoringConnProperties.set(key.substring(WrapperProperties.MONITORING_PROPERTY_PREFIX.length), this.properties.get(key)); + monitoringConnProperties.delete(key); + } + + logger.debug(`Opening a monitoring connection to ${this.hostInfo.url}`); + this.monitoringClient = await this.pluginService.forceConnect(this.hostInfo, monitoringConnProperties); + logger.debug(`Successfully opened monitoring connection to ${this.monitoringClient.id} - ${this.hostInfo.url}`); + return true; + } + return true; + } catch (error: any) { + return false; + } + } + + isStopped(): boolean { + return this.stopped; + } + + async end(): Promise { + this.stopped = true; + // Waiting for 30s gives a task enough time to exit monitoring loop and close database connection. + await sleep(30000); + logger.debug(Messages.get("MonitorImpl.stopped", this.hostInfo.host)); + } + + updateHostHealthStatus(connectionValid: boolean, statusCheckStartNano: number, statusCheckEndNano: number): Promise { + if (!connectionValid) { + this.failureCount += 1; + + if (this.invalidHostStartTimeNano === 0) { + this.invalidHostStartTimeNano = statusCheckStartNano; + } + + const invalidHostDurationNano = statusCheckEndNano - this.invalidHostStartTimeNano; + const maxInvalidHostDurationNano = this.failureDetectionIntervalNanos * Math.max(0, this.failureDetectionCount - 1); + + if (invalidHostDurationNano >= maxInvalidHostDurationNano) { + logger.debug(Messages.get("MonitorConnectionContext.hostDead", this.hostInfo.host)); + this.hostUnhealthy = true; + return Promise.resolve(); + } + logger.debug(Messages.get("MonitorConnectionContext.hostNotResponding", this.hostInfo.host, this.failureCount.toString())); + return Promise.resolve(); + } + + if (this.failureCount > 0) { + // Host is back alive + logger.debug(Messages.get("MonitorConnectionContext.hostAlive", this.hostInfo.host)); + } + this.failureCount = 0; + this.invalidHostStartTimeNano = 0; + this.hostUnhealthy = false; + } + + protected getCurrentTimeNano() { + return Number(process.hrtime.bigint()); + } + + async releaseResources() { + this.stopped = true; + this.activeContexts = null; + await this.endMonitoringClient(); + // Allow time for monitor loop to close. + await sleep(500); + } + + async endMonitoringClient() { + if (this.monitoringClient) { + await this.pluginService.abortTargetClient(this.monitoringClient); + this.monitoringClient = null; + } + } +} diff --git a/common/lib/plugins/efm2/monitor_connection_context.ts b/common/lib/plugins/efm2/monitor_connection_context.ts new file mode 100644 index 00000000..5b29025c --- /dev/null +++ b/common/lib/plugins/efm2/monitor_connection_context.ts @@ -0,0 +1,57 @@ +/* + Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + + Licensed under the Apache License, Version 2.0 (the "License"). + You may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +import { uniqueId } from "../../../logutils"; +import { ClientWrapper } from "../../client_wrapper"; + +/** + * Monitoring context for each connection. This contains each connection's criteria for whether a + * server should be considered unhealthy. The context is shared between the main task and the monitor task. + */ +export class MonitorConnectionContext { + private clientToAbortRef: WeakRef | undefined; + isHostUnhealthy: boolean = false; + id: string = uniqueId("_monitorContext"); + + /** + * Constructor. + * + * @param clientToAbort A reference to the connection associated with this context that will be aborted. + */ + constructor(clientToAbort: any) { + this.clientToAbortRef = new WeakRef(clientToAbort); + } + + setHostUnhealthy(hostUnhealthy: boolean) { + this.isHostUnhealthy = hostUnhealthy; + } + + shouldAbort(): boolean { + return this.isHostUnhealthy && this.clientToAbortRef != null; + } + + setInactive(): void { + this.clientToAbortRef = null; + } + + getClient(): ClientWrapper { + return this.clientToAbortRef.deref() ?? null; + } + + isActive() { + return this.clientToAbortRef.deref() !== null; + } +} diff --git a/common/lib/plugins/efm2/monitor_service.ts b/common/lib/plugins/efm2/monitor_service.ts new file mode 100644 index 00000000..39459714 --- /dev/null +++ b/common/lib/plugins/efm2/monitor_service.ts @@ -0,0 +1,175 @@ +/* + Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + + Licensed under the Apache License, Version 2.0 (the "License"). + You may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +import { MonitorConnectionContext } from "./monitor_connection_context"; +import { HostInfo } from "../../host_info"; +import { AwsWrapperError } from "../../utils/errors"; +import { Monitor, MonitorImpl } from "./monitor"; +import { WrapperProperties } from "../../wrapper_property"; +import { SlidingExpirationCache } from "../../utils/sliding_expiration_cache"; +import { PluginService } from "../../plugin_service"; +import { Messages } from "../../utils/messages"; +import { TelemetryCounter } from "../../utils/telemetry/telemetry_counter"; +import { TelemetryFactory } from "../../utils/telemetry/telemetry_factory"; +import { ClientWrapper } from "../../client_wrapper"; +import { logger } from "../../../logutils"; + +export interface MonitorService { + startMonitoring( + clientToAbort: any, + hostInfo: HostInfo, + properties: Map, + failureDetectionTimeMillis: number, + failureDetectionIntervalMillis: number, + failureDetectionCount: number + ): Promise; + + /** + * Stop monitoring for a connection represented by the given {@link MonitorConnectionContext}. + * Removes the context from the {@link MonitorImpl}. + * + * @param context The {@link MonitorConnectionContext} representing a connection. + * @param clientToAbort A reference to the connection associated with this context that will be aborted. + */ + stopMonitoring(context: MonitorConnectionContext, clientToAbort: any): Promise; + + releaseResources(): Promise; +} + +export class MonitorServiceImpl implements MonitorService { + private static readonly CACHE_CLEANUP_NANOS = BigInt(60_000_000_000); + + protected static readonly monitors: SlidingExpirationCache = new SlidingExpirationCache( + MonitorServiceImpl.CACHE_CLEANUP_NANOS, + (monitor: Monitor) => monitor.canDispose(), + async (monitor: Monitor) => { + { + try { + await monitor.endMonitoringClient(); + } catch (error) { + // ignore + } + } + } + ); + private readonly pluginService: PluginService; + private telemetryFactory: TelemetryFactory; + private readonly abortedConnectionsCounter: TelemetryCounter; + monitorSupplier = ( + pluginService: PluginService, + hostInfo: HostInfo, + properties: Map, + failureDetectionTimeMillis: number, + failureDetectionIntervalMillis: number, + failureDetectionCount: number, + abortedConnectionsCounter: TelemetryCounter + ) => + new MonitorImpl( + pluginService, + hostInfo, + properties, + failureDetectionTimeMillis, + failureDetectionIntervalMillis, + failureDetectionCount, + abortedConnectionsCounter + ); + + constructor(pluginService: PluginService) { + this.pluginService = pluginService; + this.telemetryFactory = pluginService.getTelemetryFactory(); + this.abortedConnectionsCounter = this.telemetryFactory.createCounter("efm2.connections.aborted"); + } + + async startMonitoring( + clientToAbort: any, + hostInfo: HostInfo, + properties: Map, + failureDetectionTimeMillis: number, + failureDetectionIntervalMillis: number, + failureDetectionCount: number + ): Promise { + const monitor = await this.getMonitor(hostInfo, properties, failureDetectionTimeMillis, failureDetectionIntervalMillis, failureDetectionCount); + + if (monitor) { + const context = new MonitorConnectionContext(clientToAbort); + monitor.startMonitoring(context); + return context; + } + + throw new AwsWrapperError(Messages.get("MonitorService.startMonitoringNullMonitor", hostInfo.host)); + } + + async stopMonitoring(context: MonitorConnectionContext, clientToAbort: ClientWrapper): Promise { + if (context.shouldAbort()) { + context.setInactive(); + try { + await clientToAbort.abort(); + this.abortedConnectionsCounter.inc(); + } catch (error) { + // ignore + logger.debug(Messages.get("MonitorConnectionContext.errorAbortingConnection", error.message)); + } + } else { + context.setInactive(); + } + } + + async getMonitor( + hostInfo: HostInfo, + properties: Map, + failureDetectionTimeMillis: number, + failureDetectionIntervalMillis: number, + failureDetectionCount: number + ): Promise { + const monitorKey: string = + failureDetectionTimeMillis.toString() + + " " + + failureDetectionIntervalMillis.toString() + + " " + + failureDetectionCount.toString() + + " " + + hostInfo.url; + + const cacheExpirationNanos = BigInt(WrapperProperties.MONITOR_DISPOSAL_TIME_MS.get(properties) * 1_000_000); + return MonitorServiceImpl.monitors.computeIfAbsent( + monitorKey, + () => + this.monitorSupplier( + this.pluginService, + hostInfo, + properties, + failureDetectionTimeMillis, + failureDetectionIntervalMillis, + failureDetectionCount, + this.abortedConnectionsCounter + ), + cacheExpirationNanos + ); + } + + async releaseResources() { + for (const [_key, monitor] of MonitorServiceImpl.monitors.entries) { + if (monitor.item) { + await monitor.item.releaseResources(); + } + } + MonitorServiceImpl.clearMonitors(); + } + + static clearMonitors() { + MonitorServiceImpl.monitors.clear(); + } +} diff --git a/common/lib/utils/locales/en.json b/common/lib/utils/locales/en.json index a4e14f67..755d3f32 100644 --- a/common/lib/utils/locales/en.json +++ b/common/lib/utils/locales/en.json @@ -126,7 +126,7 @@ "HostAvailabilityStrategy.invalidInitialBackoffTime": "Invalid value of '%s' for configuration parameter `hostAvailabilityStrategyInitialBackoffTime`. It must be an integer greater or equal to 1.", "MonitorConnectionContext.errorAbortingConnection": "Error during aborting connection: %s.", "MonitorConnectionContext.hostDead": "Host %s is *dead*.", - "MonitorConnectionContext.hostNotResponding": "Host %s is not *responding* %s", + "MonitorConnectionContext.hostNotResponding": "Host %s is not *responding* with failureCount: %s", "MonitorConnectionContext.hostAlive": "Host %s is *alive*.", "MonitorImpl.contextNullWarning": "Parameter 'context' should not be null or undefined.", "MonitorImpl.errorDuringMonitoringContinue": "Continuing monitoring after unhandled error was thrown in monitoring for host %s.", @@ -135,6 +135,8 @@ "MonitorImpl.stopped": "Stopped monitoring for host '%s'.", "MonitorImpl.startMonitoring": "Start monitoring for %s.", "MonitorImpl.stopMonitoring": "Stop monitoring for %s.", + "MonitorImpl.startMonitoringTaskNewContext": "Start monitoring thread for checking new contexts for '%s'", + "MonitorImpl.stopMonitoringTaskNewContext": "Stop monitoring thread for checking new contexts for '%s'", "MonitorService.startMonitoringNullMonitor": "Start monitoring called but could not find monitor for host: '%s'.", "MonitorService.emptyAliasSet": "Empty alias set passed for '%s'. Set should not be empty.", "PluginService.hostListEmpty": "Current host list is empty.", @@ -150,6 +152,7 @@ "HostMonitoringConnectionPlugin.unableToIdentifyConnection": "Unable to identify the given connection: '%s', please ensure the correct host list provider is specified. The host list provider in use is: '%s'.", "HostMonitoringConnectionPlugin.errorIdentifyingConnection": "Error occurred while identifying connection: '%s'.", "HostMonitoringConnectionPlugin.unavailableHost": "Host '%s' is unavailable.", + "HostMonitoringConnectionPlugin.identifyClusterConnection": "Monitoring host info is associated with a cluster endpoint, plugin needs to identify the cluster connection.", "PluginServiceImpl.failedToRetrieveHostPort": "PluginServiceImpl.failedToRetrieveHostPort", "AuroraInitialConnectionStrategyPlugin.unsupportedStrategy": "Unsupported host selection strategy '%s'.", "AuroraInitialConnectionStrategyPlugin.requireDynamicProvider": "Dynamic host list provider is required.", @@ -226,5 +229,6 @@ "HostMonitor.startMonitoring": "Host monitor '%s' started.", "HostMonitor.detectedWriter": "Detected writer: '%s'.", "HostMonitor.endMonitoring": "Host monitor '%s' completed in '%s'.", - "HostMonitor.writerHostChanged": "Writer host has changed from '%s' to '%s'." + "HostMonitor.writerHostChanged": "Writer host has changed from '%s' to '%s'.", + "HostMonitoringConnectionPlugin.monitoringDeactivated": "Monitoring deactivated for method '%s'" } diff --git a/common/lib/wrapper_property.ts b/common/lib/wrapper_property.ts index 452aead7..3e068eb6 100644 --- a/common/lib/wrapper_property.ts +++ b/common/lib/wrapper_property.ts @@ -46,7 +46,7 @@ export class WrapperProperty { export class WrapperProperties { static readonly MONITORING_PROPERTY_PREFIX: string = "monitoring_"; - static readonly DEFAULT_PLUGINS = "auroraConnectionTracker,failover,efm"; + static readonly DEFAULT_PLUGINS = "auroraConnectionTracker,failover,efm2"; static readonly DEFAULT_TOKEN_EXPIRATION_SEC = 15 * 60; static readonly PLUGINS = new WrapperProperty( diff --git a/examples/aws_driver_example/aws_internal_connection_pooling_postgres_example.ts b/examples/aws_driver_example/aws_internal_connection_pooling_postgres_example.ts index 2e1ed548..cc2609fe 100644 --- a/examples/aws_driver_example/aws_internal_connection_pooling_postgres_example.ts +++ b/examples/aws_driver_example/aws_internal_connection_pooling_postgres_example.ts @@ -55,7 +55,7 @@ const client = new AwsPGClient({ user: username, password: password, database: database, - plugins: "readWriteSplitting,failover,efm", + plugins: "readWriteSplitting,failover,efm2", // Optional: PoolKey property value and connection provider used in internal connection pools. connectionProvider: provider, diff --git a/examples/aws_driver_example/aws_read_write_splitting_mysql_example.ts b/examples/aws_driver_example/aws_read_write_splitting_mysql_example.ts index 27b013b5..a909e418 100644 --- a/examples/aws_driver_example/aws_read_write_splitting_mysql_example.ts +++ b/examples/aws_driver_example/aws_read_write_splitting_mysql_example.ts @@ -31,7 +31,7 @@ const client = new AwsMySQLClient({ user: username, password: password, database: database, - plugins: "readWriteSplitting, failover, efm" + plugins: "readWriteSplitting, failover, efm2" }); // Setup Step: Open connection and create tables - uncomment this section to create table and test values. diff --git a/examples/aws_driver_example/fastest_response_strategy_mysql_example.ts b/examples/aws_driver_example/fastest_response_strategy_mysql_example.ts index dae0c39e..1e64f979 100644 --- a/examples/aws_driver_example/fastest_response_strategy_mysql_example.ts +++ b/examples/aws_driver_example/fastest_response_strategy_mysql_example.ts @@ -25,13 +25,13 @@ const database = "database"; const port = 3306; const client = new AwsMySQLClient({ - // Configure connection parameters. Enable readWriteSplitting, failover, and efm plugins. + // Configure connection parameters. Enable readWriteSplitting, failover, and efm2 plugins. host: mysqlHost, port: port, user: username, password: password, database: database, - plugins: "readWriteSplitting, fastestResponseStrategy, failover, efm", + plugins: "readWriteSplitting, fastestResponseStrategy, failover, efm2", readerHostSelectorStrategy: "fastestResponse" }); diff --git a/examples/aws_driver_example/fastest_response_strategy_postgres_example.ts b/examples/aws_driver_example/fastest_response_strategy_postgres_example.ts index 7ba07d44..d5485a0a 100644 --- a/examples/aws_driver_example/fastest_response_strategy_postgres_example.ts +++ b/examples/aws_driver_example/fastest_response_strategy_postgres_example.ts @@ -31,7 +31,7 @@ const client = new AwsPGClient({ user: username, password: password, database: database, - plugins: "readWriteSplitting, fastestResponseStrategy, failover, efm", + plugins: "readWriteSplitting, fastestResponseStrategy, failover, efm2", readerHostSelectorStrategy: "fastestResponse" }); diff --git a/tests/integration/container/tests/autoscaling.test.ts b/tests/integration/container/tests/autoscaling.test.ts index 46818447..757ffd0e 100644 --- a/tests/integration/container/tests/autoscaling.test.ts +++ b/tests/integration/container/tests/autoscaling.test.ts @@ -72,7 +72,7 @@ async function initConfigWithFailover(host: string, port: number, provider: Inte database: env.databaseInfo.defaultDbName, password: env.databaseInfo.password, port: port, - plugins: "readWriteSplitting,failover", + plugins: "readWriteSplitting,failover,efm2", connectionProvider: provider, failoverTimeoutMs: 400000, enableTelemetry: true, diff --git a/tests/integration/container/tests/basic_connectivity.test.ts b/tests/integration/container/tests/basic_connectivity.test.ts index 45aec100..91c68327 100644 --- a/tests/integration/container/tests/basic_connectivity.test.ts +++ b/tests/integration/container/tests/basic_connectivity.test.ts @@ -73,7 +73,7 @@ describe("basic_connectivity", () => { database: env.databaseInfo.defaultDbName, password: env.databaseInfo.password, port: env.databaseInfo.clusterEndpointPort, - plugins: "failover,efm", + plugins: "failover,efm2", enableTelemetry: true, telemetryTracesBackend: "OTLP", telemetryMetricsBackend: "OTLP" @@ -99,7 +99,7 @@ describe("basic_connectivity", () => { database: env.databaseInfo.defaultDbName, password: env.databaseInfo.password, port: env.databaseInfo.clusterEndpointPort, - plugins: "failover,efm", + plugins: "failover,efm2", enableTelemetry: true, telemetryTracesBackend: "OTLP", telemetryMetricsBackend: "OTLP" @@ -125,7 +125,7 @@ describe("basic_connectivity", () => { database: env.databaseInfo.defaultDbName, password: env.databaseInfo.password, port: env.databaseInfo.clusterEndpointPort, - plugins: "failover,efm", + plugins: "failover,efm2", enableTelemetry: true, telemetryTracesBackend: "OTLP", telemetryMetricsBackend: "OTLP" diff --git a/tests/integration/container/tests/performance.test.ts b/tests/integration/container/tests/performance.test.ts index 6144111e..3abfa69d 100644 --- a/tests/integration/container/tests/performance.test.ts +++ b/tests/integration/container/tests/performance.test.ts @@ -163,7 +163,7 @@ async function executeFailureDetectionTimeFailoverAndEfmEnabled( ) { const props = new Map(); const config = await initDefaultConfig(env.proxyDatabaseInfo.writerInstanceEndpoint, env.proxyDatabaseInfo.clusterEndpointPort); - config["plugins"] = "efm,failover"; + config["plugins"] = "efm2,failover"; config[WrapperProperties.FAILURE_DETECTION_TIME_MS.name] = detectionTimeMillis; config[WrapperProperties.FAILURE_DETECTION_INTERVAL_MS.name] = detectionIntervalMillis; config[WrapperProperties.FAILURE_DETECTION_COUNT.name] = detectionCount; diff --git a/tests/integration/container/tests/read_write_splitting.test.ts b/tests/integration/container/tests/read_write_splitting.test.ts index d673dbb3..3f378798 100644 --- a/tests/integration/container/tests/read_write_splitting.test.ts +++ b/tests/integration/container/tests/read_write_splitting.test.ts @@ -75,7 +75,7 @@ async function initConfigWithFailover(host: string, port: number, connectToProxy database: env.databaseInfo.defaultDbName, password: env.databaseInfo.password, port: port, - plugins: "readWriteSplitting,failover", + plugins: "readWriteSplitting,efm2,failover", failoverTimeoutMs: 400000, enableTelemetry: true, telemetryTracesBackend: "OTLP", diff --git a/tests/integration/container/tests/session_state.test.ts b/tests/integration/container/tests/session_state.test.ts index 3ce23be8..59a68092 100644 --- a/tests/integration/container/tests/session_state.test.ts +++ b/tests/integration/container/tests/session_state.test.ts @@ -30,7 +30,9 @@ import { AwsMySQLClient } from "../../../../mysql/lib"; import { TransactionIsolationLevel } from "../../../../common/lib/utils/transaction_isolation_level"; const itIf = - !features.includes(TestEnvironmentFeatures.PERFORMANCE) && !features.includes(TestEnvironmentFeatures.RUN_AUTOSCALING_TESTS_ONLY) ? it : it.skip; + !features.includes(TestEnvironmentFeatures.PERFORMANCE) && !features.includes(TestEnvironmentFeatures.RUN_AUTOSCALING_TESTS_ONLY) + ? it.skip + : it.skip; let client: any; @@ -74,111 +76,115 @@ class TestAwsPGClient extends AwsPGClient { } describe("session state", () => { - it.only("test update state", async () => { - const env = await TestEnvironment.getCurrent(); - const driver = DriverHelper.getDriverForDatabaseEngine(env.engine); - let initClientFunc; - switch (driver) { - case TestDriver.MYSQL: - initClientFunc = (options: any) => new TestAwsMySQLClient(options); - break; - case TestDriver.PG: - initClientFunc = (options: any) => new TestAwsPGClient(options); - break; - default: - throw new Error("invalid driver"); - } - - let props = { - user: env.databaseInfo.username, - host: env.databaseInfo.clusterEndpoint, - database: env.databaseInfo.defaultDbName, - password: env.databaseInfo.password, - port: env.databaseInfo.clusterEndpointPort - }; - props = DriverHelper.addDriverSpecificConfiguration(props, env.engine); - client = initClientFunc(props); - - const newClient = initClientFunc(props); + itIf( + "test update state", + async () => { + const env = await TestEnvironment.getCurrent(); + const driver = DriverHelper.getDriverForDatabaseEngine(env.engine); + let initClientFunc; + switch (driver) { + case TestDriver.MYSQL: + initClientFunc = (options: any) => new TestAwsMySQLClient(options); + break; + case TestDriver.PG: + initClientFunc = (options: any) => new TestAwsPGClient(options); + break; + default: + throw new Error("invalid driver"); + } - try { - await client.connect(); - await newClient.connect(); - const targetClient = client.targetClient; - const newTargetClient = newClient.targetClient; - - expect(targetClient).not.toEqual(newTargetClient); - if (driver === TestDriver.MYSQL) { - await DriverHelper.executeQuery(env.engine, client, "CREATE DATABASE IF NOT EXISTS testSessionState"); - await client.setReadOnly(true); - await client.setCatalog("testSessionState"); - await client.setTransactionIsolation(TransactionIsolationLevel.TRANSACTION_SERIALIZABLE); - await client.setAutoCommit(false); - - // Assert new client's session states are using server default values. - let readOnly = await DriverHelper.executeQuery(env.engine, newClient, "SELECT @@SESSION.transaction_read_only AS readonly"); - let catalog = await DriverHelper.executeQuery(env.engine, newClient, "SELECT DATABASE() AS catalog"); - let autoCommit = await DriverHelper.executeQuery(env.engine, newClient, "SELECT @@SESSION.autocommit AS autocommit"); - let transactionIsolation = await DriverHelper.executeQuery(env.engine, newClient, "SELECT @@SESSION.transaction_isolation AS level"); - expect(readOnly[0][0].readonly).toEqual(0); - expect(catalog[0][0].catalog).toEqual(env.databaseInfo.defaultDbName); - expect(autoCommit[0][0].autocommit).toEqual(1); - expect(transactionIsolation[0][0].level).toEqual("REPEATABLE-READ"); - - await client.getPluginService().setCurrentClient(newClient.targetClient); - - expect(client.targetClient).not.toEqual(targetClient); - expect(client.targetClient).toEqual(newTargetClient); - - // Assert new client's session states are set. - readOnly = await DriverHelper.executeQuery(env.engine, newClient, "SELECT @@SESSION.transaction_read_only AS readonly"); - catalog = await DriverHelper.executeQuery(env.engine, newClient, "SELECT DATABASE() AS catalog"); - autoCommit = await DriverHelper.executeQuery(env.engine, newClient, "SELECT @@SESSION.autocommit AS autocommit"); - transactionIsolation = await DriverHelper.executeQuery(env.engine, newClient, "SELECT @@SESSION.transaction_isolation AS level"); - expect(readOnly[0][0].readonly).toEqual(1); - expect(catalog[0][0].catalog).toEqual("testSessionState"); - expect(autoCommit[0][0].autocommit).toEqual(0); - expect(transactionIsolation[0][0].level).toEqual("SERIALIZABLE"); - - await client.setReadOnly(false); - await client.setAutoCommit(true); - await DriverHelper.executeQuery(env.engine, client, "DROP DATABASE IF EXISTS testSessionState"); - } else if (driver === TestDriver.PG) { - // End any current transaction before we can create a new test database. - await DriverHelper.executeQuery(env.engine, client, "END TRANSACTION"); - await DriverHelper.executeQuery(env.engine, client, "DROP DATABASE IF EXISTS testSessionState"); - await DriverHelper.executeQuery(env.engine, client, "CREATE DATABASE testSessionState"); - await client.setReadOnly(true); - await client.setSchema("testSessionState"); - await client.setTransactionIsolation(TransactionIsolationLevel.TRANSACTION_SERIALIZABLE); - - // Assert new client's session states are using server default values. - let readOnly = await DriverHelper.executeQuery(env.engine, newClient, "SHOW transaction_read_only"); - let schema = await DriverHelper.executeQuery(env.engine, newClient, "SHOW search_path"); - let transactionIsolation = await DriverHelper.executeQuery(env.engine, newClient, "SHOW TRANSACTION ISOLATION LEVEL"); - expect(readOnly.rows[0]["transaction_read_only"]).toEqual("off"); - expect(schema.rows[0]["search_path"]).not.toEqual("testSessionState"); - expect(transactionIsolation.rows[0]["transaction_isolation"]).toEqual("read committed"); - - await client.getPluginService().setCurrentClient(newClient.targetClient); - expect(client.targetClient).not.toEqual(targetClient); - expect(client.targetClient).toEqual(newTargetClient); - - // Assert new client's session states are set. - readOnly = await DriverHelper.executeQuery(env.engine, newClient, "SHOW transaction_read_only"); - schema = await DriverHelper.executeQuery(env.engine, newClient, "SHOW search_path"); - transactionIsolation = await DriverHelper.executeQuery(env.engine, newClient, "SHOW TRANSACTION ISOLATION LEVEL"); - expect(readOnly.rows[0]["transaction_read_only"]).toEqual("on"); - expect(schema.rows[0]["search_path"]).toEqual("testsessionstate"); - expect(transactionIsolation.rows[0]["transaction_isolation"]).toEqual("serializable"); - - await client.setReadOnly(false); - await DriverHelper.executeQuery(env.engine, client, "DROP DATABASE IF EXISTS testSessionState"); + let props = { + user: env.databaseInfo.username, + host: env.databaseInfo.clusterEndpoint, + database: env.databaseInfo.defaultDbName, + password: env.databaseInfo.password, + port: env.databaseInfo.clusterEndpointPort + }; + props = DriverHelper.addDriverSpecificConfiguration(props, env.engine); + client = initClientFunc(props); + + const newClient = initClientFunc(props); + + try { + await client.connect(); + await newClient.connect(); + const targetClient = client.targetClient; + const newTargetClient = newClient.targetClient; + + expect(targetClient).not.toEqual(newTargetClient); + if (driver === TestDriver.MYSQL) { + await DriverHelper.executeQuery(env.engine, client, "CREATE DATABASE IF NOT EXISTS testSessionState"); + await client.setReadOnly(true); + await client.setCatalog("testSessionState"); + await client.setTransactionIsolation(TransactionIsolationLevel.TRANSACTION_SERIALIZABLE); + await client.setAutoCommit(false); + + // Assert new client's session states are using server default values. + let readOnly = await DriverHelper.executeQuery(env.engine, newClient, "SELECT @@SESSION.transaction_read_only AS readonly"); + let catalog = await DriverHelper.executeQuery(env.engine, newClient, "SELECT DATABASE() AS catalog"); + let autoCommit = await DriverHelper.executeQuery(env.engine, newClient, "SELECT @@SESSION.autocommit AS autocommit"); + let transactionIsolation = await DriverHelper.executeQuery(env.engine, newClient, "SELECT @@SESSION.transaction_isolation AS level"); + expect(readOnly[0][0].readonly).toEqual(0); + expect(catalog[0][0].catalog).toEqual(env.databaseInfo.defaultDbName); + expect(autoCommit[0][0].autocommit).toEqual(1); + expect(transactionIsolation[0][0].level).toEqual("REPEATABLE-READ"); + + await client.getPluginService().setCurrentClient(newClient.targetClient); + + expect(client.targetClient).not.toEqual(targetClient); + expect(client.targetClient).toEqual(newTargetClient); + + // Assert new client's session states are set. + readOnly = await DriverHelper.executeQuery(env.engine, newClient, "SELECT @@SESSION.transaction_read_only AS readonly"); + catalog = await DriverHelper.executeQuery(env.engine, newClient, "SELECT DATABASE() AS catalog"); + autoCommit = await DriverHelper.executeQuery(env.engine, newClient, "SELECT @@SESSION.autocommit AS autocommit"); + transactionIsolation = await DriverHelper.executeQuery(env.engine, newClient, "SELECT @@SESSION.transaction_isolation AS level"); + expect(readOnly[0][0].readonly).toEqual(1); + expect(catalog[0][0].catalog).toEqual("testSessionState"); + expect(autoCommit[0][0].autocommit).toEqual(0); + expect(transactionIsolation[0][0].level).toEqual("SERIALIZABLE"); + + await client.setReadOnly(false); + await client.setAutoCommit(true); + await DriverHelper.executeQuery(env.engine, client, "DROP DATABASE IF EXISTS testSessionState"); + } else if (driver === TestDriver.PG) { + // End any current transaction before we can create a new test database. + await DriverHelper.executeQuery(env.engine, client, "END TRANSACTION"); + await DriverHelper.executeQuery(env.engine, client, "DROP DATABASE IF EXISTS testSessionState"); + await DriverHelper.executeQuery(env.engine, client, "CREATE DATABASE testSessionState"); + await client.setReadOnly(true); + await client.setSchema("testSessionState"); + await client.setTransactionIsolation(TransactionIsolationLevel.TRANSACTION_SERIALIZABLE); + + // Assert new client's session states are using server default values. + let readOnly = await DriverHelper.executeQuery(env.engine, newClient, "SHOW transaction_read_only"); + let schema = await DriverHelper.executeQuery(env.engine, newClient, "SHOW search_path"); + let transactionIsolation = await DriverHelper.executeQuery(env.engine, newClient, "SHOW TRANSACTION ISOLATION LEVEL"); + expect(readOnly.rows[0]["transaction_read_only"]).toEqual("off"); + expect(schema.rows[0]["search_path"]).not.toEqual("testSessionState"); + expect(transactionIsolation.rows[0]["transaction_isolation"]).toEqual("read committed"); + + await client.getPluginService().setCurrentClient(newClient.targetClient); + expect(client.targetClient).not.toEqual(targetClient); + expect(client.targetClient).toEqual(newTargetClient); + + // Assert new client's session states are set. + readOnly = await DriverHelper.executeQuery(env.engine, newClient, "SHOW transaction_read_only"); + schema = await DriverHelper.executeQuery(env.engine, newClient, "SHOW search_path"); + transactionIsolation = await DriverHelper.executeQuery(env.engine, newClient, "SHOW TRANSACTION ISOLATION LEVEL"); + expect(readOnly.rows[0]["transaction_read_only"]).toEqual("on"); + expect(schema.rows[0]["search_path"]).toEqual("testsessionstate"); + expect(transactionIsolation.rows[0]["transaction_isolation"]).toEqual("serializable"); + + await client.setReadOnly(false); + await DriverHelper.executeQuery(env.engine, client, "DROP DATABASE IF EXISTS testSessionState"); + } + } catch (e) { + await client.end(); + await newClient.end(); + throw e; } - } catch (e) { - await client.end(); - await newClient.end(); - throw e; - } - }, 1320000); + }, + 1320000 + ); });