From 64227d1bce829e373e3da73e880a3d0a8a2bd69a Mon Sep 17 00:00:00 2001 From: joyc-bq <95259163+joyc-bq@users.noreply.github.com> Date: Wed, 12 Feb 2025 15:37:39 -0800 Subject: [PATCH] feat: efm2 (#371) --- common/lib/connection_plugin_chain_builder.ts | 3 + .../monitoring/cluster_topology_monitor.ts | 6 +- .../efm/host_monitoring_connection_plugin.ts | 47 +-- common/lib/plugins/efm/monitor.ts | 36 +- .../plugins/efm/monitor_connection_context.ts | 5 +- common/lib/plugins/efm/monitor_service.ts | 7 +- .../host_monitoring2_connection_plugin.ts | 176 ++++++++++ .../efm2/host_monitoring2_plugin_factory.ts | 37 ++ common/lib/plugins/efm2/monitor.ts | 315 ++++++++++++++++++ .../efm2/monitor_connection_context.ts | 57 ++++ common/lib/plugins/efm2/monitor_service.ts | 166 +++++++++ .../host_response_time_monitor.ts | 5 +- common/lib/utils/locales/en.json | 11 +- common/lib/utils/utils.ts | 4 + common/lib/wrapper_property.ts | 2 +- ...nal_connection_pooling_postgres_example.ts | 2 +- .../aws_read_write_splitting_mysql_example.ts | 2 +- ...fastest_response_strategy_mysql_example.ts | 4 +- ...test_response_strategy_postgres_example.ts | 2 +- .../rds_multi_az_mysql_database_dialect.ts | 8 +- .../rds_multi_az_pg_database_dialect.ts | 10 +- .../container/tests/autoscaling.test.ts | 2 +- .../tests/basic_connectivity.test.ts | 6 +- .../container/tests/performance.test.ts | 2 +- .../tests/read_write_splitting.test.ts | 2 +- .../container/tests/session_state.test.ts | 4 +- .../container/tests/utils/driver_helper.ts | 15 +- 27 files changed, 853 insertions(+), 83 deletions(-) create mode 100644 common/lib/plugins/efm2/host_monitoring2_connection_plugin.ts create mode 100644 common/lib/plugins/efm2/host_monitoring2_plugin_factory.ts create mode 100644 common/lib/plugins/efm2/monitor.ts create mode 100644 common/lib/plugins/efm2/monitor_connection_context.ts create mode 100644 common/lib/plugins/efm2/monitor_service.ts diff --git a/common/lib/connection_plugin_chain_builder.ts b/common/lib/connection_plugin_chain_builder.ts index 68cbd292..23b3f434 100644 --- a/common/lib/connection_plugin_chain_builder.ts +++ b/common/lib/connection_plugin_chain_builder.ts @@ -41,6 +41,7 @@ import { LimitlessConnectionPluginFactory } from "./plugins/limitless/limitless_ import { FastestResponseStrategyPluginFactory } from "./plugins/strategy/fastest_response/fastest_respose_strategy_plugin_factory"; import { CustomEndpointPluginFactory } from "./plugins/custom_endpoint/custom_endpoint_plugin_factory"; import { ConfigurationProfile } from "./profile/configuration_profile"; +import { HostMonitoring2PluginFactory } from "./plugins/efm2/host_monitoring2_plugin_factory"; /* Type alias used for plugin factory sorting. It holds a reference to a plugin @@ -63,6 +64,7 @@ export class ConnectionPluginChainBuilder { ["failover", { factory: FailoverPluginFactory, weight: 700 }], ["failover2", { factory: Failover2PluginFactory, weight: 710 }], ["efm", { factory: HostMonitoringPluginFactory, weight: 800 }], + ["efm2", { factory: HostMonitoring2PluginFactory, weight: 810 }], ["fastestResponseStrategy", { factory: FastestResponseStrategyPluginFactory, weight: 900 }], ["limitless", { factory: LimitlessConnectionPluginFactory, weight: 950 }], ["iam", { factory: IamAuthenticationPluginFactory, weight: 1000 }], @@ -82,6 +84,7 @@ export class ConnectionPluginChainBuilder { [FailoverPluginFactory, 700], [Failover2PluginFactory, 710], [HostMonitoringPluginFactory, 800], + [HostMonitoring2PluginFactory, 810], [LimitlessConnectionPluginFactory, 950], [IamAuthenticationPluginFactory, 1000], [AwsSecretsManagerPluginFactory, 1100], diff --git a/common/lib/host_list_provider/monitoring/cluster_topology_monitor.ts b/common/lib/host_list_provider/monitoring/cluster_topology_monitor.ts index 689d72e3..16df9364 100644 --- a/common/lib/host_list_provider/monitoring/cluster_topology_monitor.ts +++ b/common/lib/host_list_provider/monitoring/cluster_topology_monitor.ts @@ -196,7 +196,7 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor { } private async openAnyClientAndUpdateTopology(): Promise { - let writerVerifiedByThisThread = false; + let writerVerifiedByThisTask = false; if (!this.monitoringClient) { let client: ClientWrapper; try { @@ -215,7 +215,7 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor { this.isVerifiedWriterConnection = true; this.writerHostInfo = this.initialHostInfo; logger.info(Messages.get("ClusterTopologyMonitor.writerMonitoringConnection", this.initialHostInfo.hostId)); - writerVerifiedByThisThread = true; + writerVerifiedByThisTask = true; } } catch (error) { // Do nothing. @@ -228,7 +228,7 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor { } const hosts: HostInfo[] = await this.fetchTopologyAndUpdateCache(this.monitoringClient); - if (writerVerifiedByThisThread) { + if (writerVerifiedByThisTask) { if (this.ignoreNewTopologyRequestsEndTimeMs === -1) { this.ignoreNewTopologyRequestsEndTimeMs = 0; } else { diff --git a/common/lib/plugins/efm/host_monitoring_connection_plugin.ts b/common/lib/plugins/efm/host_monitoring_connection_plugin.ts index 97a1f444..8f21006b 100644 --- a/common/lib/plugins/efm/host_monitoring_connection_plugin.ts +++ b/common/lib/plugins/efm/host_monitoring_connection_plugin.ts @@ -121,6 +121,7 @@ export class HostMonitoringConnectionPlugin extends AbstractConnectionPlugin imp } finally { if (monitorContext != null) { await this.monitorService.stopMonitoring(monitorContext); + logger.debug(Messages.get("HostMonitoringConnectionPlugin.monitoringDeactivated", methodName)); if (monitorContext.isHostUnhealthy) { const monitoringHostInfo = await this.getMonitoringHostInfo(); @@ -148,7 +149,8 @@ export class HostMonitoringConnectionPlugin extends AbstractConnectionPlugin imp return result; } - private throwUnableToIdentifyConnection(host: HostInfo | null, provider: HostListProvider | null): never { + private throwUnableToIdentifyConnection(host: HostInfo | null): never { + const provider: HostListProvider | null = this.pluginService.getHostListProvider(); throw new AwsWrapperError( Messages.get( "HostMonitoringConnectionPlugin.unableToIdentifyConnection", @@ -159,32 +161,31 @@ export class HostMonitoringConnectionPlugin extends AbstractConnectionPlugin imp } async getMonitoringHostInfo(): Promise { + if (this.monitoringHostInfo) { + return this.monitoringHostInfo; + } + this.monitoringHostInfo = this.pluginService.getCurrentHostInfo(); if (this.monitoringHostInfo == null) { - this.monitoringHostInfo = this.pluginService.getCurrentHostInfo(); - const provider: HostListProvider | null = this.pluginService.getHostListProvider(); - if (this.monitoringHostInfo == null) { - this.throwUnableToIdentifyConnection(null, provider); - } - const rdsUrlType: RdsUrlType = this.rdsUtils.identifyRdsType(this.monitoringHostInfo.url); - - try { - if (rdsUrlType.isRdsCluster) { - logger.debug("Monitoring host info is associated with a cluster endpoint, plugin needs to identify the cluster connection"); - this.monitoringHostInfo = await this.pluginService.identifyConnection(this.pluginService.getCurrentClient().targetClient!); - if (this.monitoringHostInfo == null) { - const host: HostInfo | null = this.pluginService.getCurrentHostInfo(); - this.throwUnableToIdentifyConnection(host, provider); - } - await this.pluginService.fillAliases(this.pluginService.getCurrentClient().targetClient!, this.monitoringHostInfo); - } - } catch (error: any) { - if (!(error instanceof AwsWrapperError)) { - logger.debug(Messages.get("HostMonitoringConnectionPlugin.errorIdentifyingConnection", error.message)); + this.throwUnableToIdentifyConnection(null); + } + const rdsUrlType: RdsUrlType = this.rdsUtils.identifyRdsType(this.monitoringHostInfo.url); + + try { + if (rdsUrlType.isRdsCluster) { + logger.debug(Messages.get("HostMonitoringConnectionPlugin.identifyClusterConnection")); + this.monitoringHostInfo = await this.pluginService.identifyConnection(this.pluginService.getCurrentClient().targetClient!); + if (this.monitoringHostInfo == null) { + const host: HostInfo | null = this.pluginService.getCurrentHostInfo(); + this.throwUnableToIdentifyConnection(host); } - throw error; + await this.pluginService.fillAliases(this.pluginService.getCurrentClient().targetClient!, this.monitoringHostInfo); } + } catch (error: any) { + if (!(error instanceof AwsWrapperError)) { + logger.debug(Messages.get("HostMonitoringConnectionPlugin.errorIdentifyingConnection", error.message)); + } + throw error; } - return this.monitoringHostInfo; } diff --git a/common/lib/plugins/efm/monitor.ts b/common/lib/plugins/efm/monitor.ts index 301df40a..bcd0ec6c 100644 --- a/common/lib/plugins/efm/monitor.ts +++ b/common/lib/plugins/efm/monitor.ts @@ -20,11 +20,12 @@ import { PluginService } from "../../plugin_service"; import { logger } from "../../../logutils"; import { Messages } from "../../utils/messages"; import { ClientWrapper } from "../../client_wrapper"; -import { sleep } from "../../utils/utils"; -import { WrapperProperties } from "../../wrapper_property"; +import { getCurrentTimeNano, sleep } from "../../utils/utils"; import { TelemetryFactory } from "../../utils/telemetry/telemetry_factory"; import { TelemetryCounter } from "../../utils/telemetry/telemetry_counter"; import { TelemetryTraceLevel } from "../../utils/telemetry/telemetry_trace_level"; +import { HostResponseTimeMonitor } from "../strategy/fastest_response/host_response_time_monitor"; +import { WrapperProperties } from "../../wrapper_property"; export interface Monitor { startMonitoring(context: MonitorConnectionContext): void; @@ -79,7 +80,7 @@ export class MonitorImpl implements Monitor { this.properties = properties; this.hostInfo = hostInfo; this.monitorDisposalTimeMillis = monitorDisposalTimeMillis; - this.contextLastUsedTimestampNanos = this.getCurrentTimeNano(); + this.contextLastUsedTimestampNanos = getCurrentTimeNano(); const instanceId = this.hostInfo.hostId ?? this.hostInfo.host; this.instanceInvalidCounter = this.telemetryFactory.createCounter(`efm.hostUnhealthy.count.${instanceId}`); } @@ -94,7 +95,7 @@ export class MonitorImpl implements Monitor { logger.warn(Messages.get("MonitorImpl.monitorIsStopped", this.hostInfo.host)); } - const currentTimeNanos: number = this.getCurrentTimeNano(); + const currentTimeNanos: number = getCurrentTimeNano(); context.startMonitorTimeNano = currentTimeNanos; this.contextLastUsedTimestampNanos = currentTimeNanos; this.newContexts.push(context); @@ -110,7 +111,7 @@ export class MonitorImpl implements Monitor { } context.isActiveContext = false; - this.contextLastUsedTimestampNanos = this.getCurrentTimeNano(); + this.contextLastUsedTimestampNanos = getCurrentTimeNano(); } async run(): Promise { @@ -121,7 +122,7 @@ export class MonitorImpl implements Monitor { try { let newMonitorContext: MonitorConnectionContext | undefined; let firstAddedNewMonitorContext: MonitorConnectionContext | null = null; - const currentTimeNano: number = this.getCurrentTimeNano(); + const currentTimeNano: number = getCurrentTimeNano(); while ((newMonitorContext = this.newContexts.shift()) != null) { if (firstAddedNewMonitorContext === newMonitorContext) { @@ -140,9 +141,9 @@ export class MonitorImpl implements Monitor { } if (this.activeContexts.length > 0) { - this.contextLastUsedTimestampNanos = this.getCurrentTimeNano(); + this.contextLastUsedTimestampNanos = getCurrentTimeNano(); - const statusCheckStartTimeNanos: number = this.getCurrentTimeNano(); + const statusCheckStartTimeNanos: number = getCurrentTimeNano(); this.contextLastUsedTimestampNanos = statusCheckStartTimeNanos; const status: ConnectionStatus = await this.checkConnectionStatus(); @@ -199,7 +200,7 @@ export class MonitorImpl implements Monitor { this.delayMillisTimeoutId = setTimeout(resolve, delayMillis); }); } else { - if (this.getCurrentTimeNano() - this.contextLastUsedTimestampNanos >= this.monitorDisposalTimeMillis * 1_000_000) { + if (getCurrentTimeNano() - this.contextLastUsedTimestampNanos >= this.monitorDisposalTimeMillis * 1_000_000) { break; } await new Promise((resolve) => { @@ -229,24 +230,21 @@ export class MonitorImpl implements Monitor { const connectContext = this.telemetryFactory.openTelemetryContext("Connection status check", TelemetryTraceLevel.FORCE_TOP_LEVEL); connectContext.setAttribute("url", this.hostInfo.host); return await connectContext.start(async () => { - const startNanos = this.getCurrentTimeNano(); + const startNanos = getCurrentTimeNano(); try { const clientIsValid = this.monitoringClient && (await this.pluginService.isClientValid(this.monitoringClient)); if (this.monitoringClient !== null && clientIsValid) { - return Promise.resolve(new ConnectionStatus(clientIsValid, this.getCurrentTimeNano() - startNanos)); + return Promise.resolve(new ConnectionStatus(clientIsValid, getCurrentTimeNano() - startNanos)); } await this.endMonitoringClient(); - // Open a new connection. const monitoringConnProperties: Map = new Map(this.properties); - - for (const key of this.properties.keys()) { + for (const key of monitoringConnProperties.keys()) { if (!key.startsWith(WrapperProperties.MONITORING_PROPERTY_PREFIX)) { continue; } - monitoringConnProperties.set(key.substring(WrapperProperties.MONITORING_PROPERTY_PREFIX.length), this.properties.get(key)); monitoringConnProperties.delete(key); } @@ -254,11 +252,11 @@ export class MonitorImpl implements Monitor { logger.debug(`Opening a monitoring connection to ${this.hostInfo.url}`); this.monitoringClient = await this.pluginService.forceConnect(this.hostInfo, monitoringConnProperties); logger.debug(`Successfully opened monitoring connection to ${this.monitoringClient.id} - ${this.hostInfo.url}`); - return Promise.resolve(new ConnectionStatus(true, this.getCurrentTimeNano() - startNanos)); + return Promise.resolve(new ConnectionStatus(true, getCurrentTimeNano() - startNanos)); } catch (error: any) { this.instanceInvalidCounter.inc(); await this.endMonitoringClient(); - return Promise.resolve(new ConnectionStatus(false, this.getCurrentTimeNano() - startNanos)); + return Promise.resolve(new ConnectionStatus(false, getCurrentTimeNano() - startNanos)); } }); } @@ -272,10 +270,6 @@ export class MonitorImpl implements Monitor { return this.stopped || this.cancelled; } - protected getCurrentTimeNano() { - return Number(process.hrtime.bigint()); - } - async releaseResources() { this.cancelled = true; clearTimeout(this.delayMillisTimeoutId); diff --git a/common/lib/plugins/efm/monitor_connection_context.ts b/common/lib/plugins/efm/monitor_connection_context.ts index 3781da62..a6889322 100644 --- a/common/lib/plugins/efm/monitor_connection_context.ts +++ b/common/lib/plugins/efm/monitor_connection_context.ts @@ -43,7 +43,7 @@ export class MonitorConnectionContext { constructor( monitor: Monitor, - clientToAbort: any, + clientToAbort: ClientWrapper, failureDetectionTimeMillis: number, failureDetectionIntervalMillis: number, failureDetectionCount: number, @@ -127,7 +127,6 @@ export class MonitorConnectionContext { const invalidHostDurationNano: number = statusCheckEndNano - this.invalidHostStartTimeNano; const maxInvalidHostDurationMillis: number = this.failureDetectionIntervalMillis * Math.max(0, this.failureDetectionCount); - if (this.failureCount >= this.failureDetectionCount || invalidHostDurationNano >= maxInvalidHostDurationMillis * 1_000_000) { logger.debug(Messages.get("MonitorConnectionContext.hostDead", hostName)); this.isHostUnhealthy = true; @@ -135,7 +134,7 @@ export class MonitorConnectionContext { return; } - logger.debug(Messages.get("MonitorConnectionContext.hostNotResponding", hostName, String(this.failureCount))); + logger.debug(Messages.get("MonitorConnectionContext.hostNotResponding", hostName)); return; } diff --git a/common/lib/plugins/efm/monitor_service.ts b/common/lib/plugins/efm/monitor_service.ts index ce480ca9..00318ea3 100644 --- a/common/lib/plugins/efm/monitor_service.ts +++ b/common/lib/plugins/efm/monitor_service.ts @@ -22,10 +22,11 @@ import { WrapperProperties } from "../../wrapper_property"; import { SlidingExpirationCache } from "../../utils/sliding_expiration_cache"; import { PluginService } from "../../plugin_service"; import { Messages } from "../../utils/messages"; +import { ClientWrapper } from "../../client_wrapper"; export interface MonitorService { startMonitoring( - clientToAbort: any, + clientToAbort: ClientWrapper, hostKeys: Set, hostInfo: HostInfo, properties: Map, @@ -59,7 +60,7 @@ export class MonitorServiceImpl implements MonitorService { } async startMonitoring( - clientToAbort: any, + clientToAbort: ClientWrapper, hostKeys: Set, hostInfo: HostInfo, properties: Map, @@ -158,7 +159,7 @@ export class MonitorServiceImpl implements MonitorService { } async releaseResources() { - for (const [key, monitor] of MonitorServiceImpl.monitors.entries) { + for (const [_key, monitor] of MonitorServiceImpl.monitors.entries) { if (monitor.item) { await monitor.item.releaseResources(); } diff --git a/common/lib/plugins/efm2/host_monitoring2_connection_plugin.ts b/common/lib/plugins/efm2/host_monitoring2_connection_plugin.ts new file mode 100644 index 00000000..11674e25 --- /dev/null +++ b/common/lib/plugins/efm2/host_monitoring2_connection_plugin.ts @@ -0,0 +1,176 @@ +/* + Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + + Licensed under the Apache License, Version 2.0 (the "License"). + You may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +import { PluginService } from "../../plugin_service"; +import { HostChangeOptions } from "../../host_change_options"; +import { HostInfo } from "../../host_info"; +import { OldConnectionSuggestionAction } from "../../old_connection_suggestion_action"; +import { RdsUtils } from "../../utils/rds_utils"; +import { AbstractConnectionPlugin } from "../../abstract_connection_plugin"; +import { RdsUrlType } from "../../utils/rds_url_type"; +import { WrapperProperties } from "../../wrapper_property"; +import { MonitorConnectionContext } from "./monitor_connection_context"; +import { logger, uniqueId } from "../../../logutils"; +import { Messages } from "../../utils/messages"; +import { MonitorService, MonitorServiceImpl } from "./monitor_service"; +import { AwsWrapperError } from "../../utils/errors"; +import { HostListProvider } from "../../host_list_provider/host_list_provider"; +import { CanReleaseResources } from "../../can_release_resources"; +import { SubscribedMethodHelper } from "../../utils/subscribed_method_helper"; +import { ClientWrapper } from "../../client_wrapper"; + +export class HostMonitoring2ConnectionPlugin extends AbstractConnectionPlugin implements CanReleaseResources { + id: string = uniqueId("_efm2Plugin"); + private readonly properties: Map; + private pluginService: PluginService; + private rdsUtils: RdsUtils; + private monitoringHostInfo: HostInfo | null = null; + private monitorService: MonitorService; + + constructor(pluginService: PluginService, properties: Map, rdsUtils: RdsUtils = new RdsUtils(), monitorService?: MonitorServiceImpl) { + super(); + this.pluginService = pluginService; + this.properties = properties; + this.rdsUtils = rdsUtils; + this.monitorService = monitorService ?? new MonitorServiceImpl(pluginService); + } + + getSubscribedMethods(): Set { + return new Set(["*"]); + } + + connect( + hostInfo: HostInfo, + props: Map, + isInitialConnection: boolean, + connectFunc: () => Promise + ): Promise { + return this.connectInternal(hostInfo, connectFunc); + } + + forceConnect( + hostInfo: HostInfo, + props: Map, + isInitialConnection: boolean, + forceConnectFunc: () => Promise + ): Promise { + return this.connectInternal(hostInfo, forceConnectFunc); + } + + private async connectInternal(hostInfo: HostInfo, connectFunc: () => Promise): Promise { + const targetClient = await connectFunc(); + if (targetClient != null) { + const type: RdsUrlType = this.rdsUtils.identifyRdsType(hostInfo.host); + if (type.isRdsCluster) { + hostInfo.resetAliases(); + await this.pluginService.fillAliases(targetClient, hostInfo); + } + } + return targetClient; + } + + async execute(methodName: string, methodFunc: () => Promise, methodArgs: any): Promise { + const isEnabled: boolean = WrapperProperties.FAILURE_DETECTION_ENABLED.get(this.properties); + + if (!isEnabled || !SubscribedMethodHelper.NETWORK_BOUND_METHODS.includes(methodName)) { + return methodFunc(); + } + + const failureDetectionTimeMillis: number = WrapperProperties.FAILURE_DETECTION_TIME_MS.get(this.properties); + const failureDetectionIntervalMillis: number = WrapperProperties.FAILURE_DETECTION_INTERVAL_MS.get(this.properties); + const failureDetectionCount: number = WrapperProperties.FAILURE_DETECTION_COUNT.get(this.properties); + + let result: T; + let monitorContext: MonitorConnectionContext | null = null; + + try { + logger.debug(Messages.get("HostMonitoringConnectionPlugin.activatedMonitoring", methodName)); + const monitoringHostInfo: HostInfo = await this.getMonitoringHostInfo(); + + monitorContext = await this.monitorService.startMonitoring( + this.pluginService.getCurrentClient().targetClient, + monitoringHostInfo, + this.properties, + failureDetectionTimeMillis, + failureDetectionIntervalMillis, + failureDetectionCount + ); + + result = await methodFunc(); + } finally { + if (monitorContext != null) { + await this.monitorService.stopMonitoring(monitorContext, this.pluginService.getCurrentClient().targetClient); + + logger.debug(Messages.get("HostMonitoringConnectionPlugin.monitoringDeactivated", methodName)); + } + } + + return result; + } + + private throwUnableToIdentifyConnection(host: HostInfo | null): never { + const provider: HostListProvider | null = this.pluginService.getHostListProvider(); + throw new AwsWrapperError( + Messages.get( + "HostMonitoringConnectionPlugin.unableToIdentifyConnection", + host !== null ? host.host : "unknown host", + provider !== null ? provider.getHostProviderType() : "unknown provider" + ) + ); + } + + async getMonitoringHostInfo(): Promise { + if (this.monitoringHostInfo) { + return this.monitoringHostInfo; + } + this.monitoringHostInfo = this.pluginService.getCurrentHostInfo(); + if (this.monitoringHostInfo === null) { + this.throwUnableToIdentifyConnection(null); + } + const rdsUrlType: RdsUrlType = this.rdsUtils.identifyRdsType(this.monitoringHostInfo.url); + + try { + if (rdsUrlType.isRdsCluster) { + logger.debug(Messages.get("HostMonitoringConnectionPlugin.identifyClusterConnection")); + this.monitoringHostInfo = await this.pluginService.identifyConnection(this.pluginService.getCurrentClient().targetClient!); + if (this.monitoringHostInfo == null) { + const host: HostInfo | null = this.pluginService.getCurrentHostInfo(); + this.throwUnableToIdentifyConnection(host); + } + await this.pluginService.fillAliases(this.pluginService.getCurrentClient().targetClient!, this.monitoringHostInfo); + } + } catch (error: any) { + if (!(error instanceof AwsWrapperError)) { + logger.debug(Messages.get("HostMonitoringConnectionPlugin.errorIdentifyingConnection", error.message)); + } + throw error; + } + + return this.monitoringHostInfo; + } + + async notifyConnectionChanged(changes: Set): Promise { + if (changes.has(HostChangeOptions.HOSTNAME) || changes.has(HostChangeOptions.HOST_CHANGED)) { + // Reset monitoring host info since the associated connection has changed. + this.monitoringHostInfo = null; + } + return OldConnectionSuggestionAction.NO_OPINION; + } + + async releaseResources(): Promise { + return this.monitorService.releaseResources(); + } +} diff --git a/common/lib/plugins/efm2/host_monitoring2_plugin_factory.ts b/common/lib/plugins/efm2/host_monitoring2_plugin_factory.ts new file mode 100644 index 00000000..6763b028 --- /dev/null +++ b/common/lib/plugins/efm2/host_monitoring2_plugin_factory.ts @@ -0,0 +1,37 @@ +/* + Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + + Licensed under the Apache License, Version 2.0 (the "License"). + You may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +import { ConnectionPluginFactory } from "../../plugin_factory"; +import { PluginService } from "../../plugin_service"; +import { ConnectionPlugin } from "../../connection_plugin"; +import { RdsUtils } from "../../utils/rds_utils"; +import { AwsWrapperError } from "../../utils/errors"; +import { Messages } from "../../utils/messages"; + +export class HostMonitoring2PluginFactory extends ConnectionPluginFactory { + private static hostMonitoring2Plugin: any; + + async getInstance(pluginService: PluginService, properties: Map): Promise { + try { + if (!HostMonitoring2PluginFactory.hostMonitoring2Plugin) { + HostMonitoring2PluginFactory.hostMonitoring2Plugin = await import("./host_monitoring2_connection_plugin"); + } + return new HostMonitoring2PluginFactory.hostMonitoring2Plugin.HostMonitoring2ConnectionPlugin(pluginService, properties, new RdsUtils()); + } catch (error: any) { + throw new AwsWrapperError(Messages.get("ConnectionPluginChainBuilder.errorImportingPlugin", error.message, "HostMonitoringPlugin")); + } + } +} diff --git a/common/lib/plugins/efm2/monitor.ts b/common/lib/plugins/efm2/monitor.ts new file mode 100644 index 00000000..ace40e5d --- /dev/null +++ b/common/lib/plugins/efm2/monitor.ts @@ -0,0 +1,315 @@ +/* + Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + + Licensed under the Apache License, Version 2.0 (the "License"). + You may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +import { MonitorConnectionContext } from "./monitor_connection_context"; +import { HostInfo } from "../../host_info"; +import { PluginService } from "../../plugin_service"; +import { logger } from "../../../logutils"; +import { Messages } from "../../utils/messages"; +import { ClientWrapper } from "../../client_wrapper"; +import { getCurrentTimeNano, sleep } from "../../utils/utils"; +import { TelemetryFactory } from "../../utils/telemetry/telemetry_factory"; +import { TelemetryCounter } from "../../utils/telemetry/telemetry_counter"; +import { TelemetryTraceLevel } from "../../utils/telemetry/telemetry_trace_level"; +import { HostAvailability } from "../../host_availability/host_availability"; +import { MapUtils } from "../../utils/map_utils"; +import { WrapperProperties } from "../../wrapper_property"; + +export interface Monitor { + startMonitoring(context: MonitorConnectionContext): void; + + run(): Promise; + + canDispose(): boolean; + + endMonitoringClient(): Promise; + + end(): Promise; + + releaseResources(): Promise; +} + +export class MonitorImpl implements Monitor { + private static readonly TASK_SLEEP_MILLIS: number = 100; + private activeContexts: WeakRef[] = []; + static newContexts: Map>> = new Map>>(); + private readonly pluginService: PluginService; + private readonly telemetryFactory: TelemetryFactory; + private readonly properties: Map; + private readonly hostInfo: HostInfo; + private stopped: boolean = false; + + private monitoringClient: ClientWrapper | null = null; + + private readonly failureDetectionTimeNano: number; + private readonly failureDetectionIntervalNanos: number; + private readonly failureDetectionCount: number; + + private invalidHostStartTimeNano: number; + private failureCount: number; + private hostUnhealthy: boolean = false; + private readonly abortedConnectionsCounter: TelemetryCounter; + private delayMillisTimeoutId: any; + private sleepWhenHostHealthyTimeoutId: any; + + constructor( + pluginService: PluginService, + hostInfo: HostInfo, + properties: Map, + failureDetectionTimeMillis: number, + failureDetectionIntervalMillis: number, + failureDetectionCount: number, + abortedConnectionsCounter: TelemetryCounter + ) { + this.pluginService = pluginService; + this.telemetryFactory = this.pluginService.getTelemetryFactory(); + this.hostInfo = hostInfo; + this.properties = properties; + this.failureDetectionTimeNano = failureDetectionTimeMillis * 1000000; + this.failureDetectionIntervalNanos = failureDetectionIntervalMillis * 1000000; + this.failureDetectionCount = failureDetectionCount; + this.abortedConnectionsCounter = abortedConnectionsCounter; + + const hostId: string = this.hostInfo.hostId ?? this.hostInfo.host; + + this.telemetryFactory.createGauge(`efm2.newContexts.size.${hostId}`, () => MonitorImpl.newContexts.size === Number.MAX_SAFE_INTEGER); + + this.telemetryFactory.createGauge(`efm2.activeContexts.size.${hostId}`, () => this.activeContexts.length === Number.MAX_SAFE_INTEGER); + + this.telemetryFactory.createGauge(`efm2.hostHealthy.${hostId}`, () => (this.hostUnhealthy ? 0 : 1)); + + Promise.race([this.newContextRun(), this.run()]).finally(() => { + this.stopped = true; + }); + } + + canDispose(): boolean { + return this.activeContexts.length === 0 && MonitorImpl.newContexts.size === 0; + } + + startMonitoring(context: MonitorConnectionContext): void { + if (this.isStopped()) { + logger.warn(Messages.get("MonitorImpl.monitorIsStopped", this.hostInfo.host)); + } + + const startMonitorTimeNano = getCurrentTimeNano() + this.failureDetectionTimeNano; + const connectionQueue = MapUtils.computeIfAbsent( + MonitorImpl.newContexts, + startMonitorTimeNano, + () => new Array>() + ); + connectionQueue.push(new WeakRef(context)); + } + + async newContextRun(): Promise { + logger.debug(Messages.get("MonitorImpl.startMonitoringTaskNewContext", this.hostInfo.host)); + + try { + while (!this.isStopped()) { + const currentTimeNanos = getCurrentTimeNano(); + // Get entries with key (that is a time in nanos) less than current time. + const processedKeys: number[] = new Array(); + for (const [key, val] of MonitorImpl.newContexts.entries()) { + if (key < currentTimeNanos) { + const queue: Array> = val; + processedKeys.push(key); + // Each value of found entry is a queue of monitoring contexts awaiting active monitoring. + // Add all contexts to an active monitoring contexts queue. + // Ignore disposed contexts. + let monitorContextRef: WeakRef | undefined; + while ((monitorContextRef = queue.shift())) { + const monitorContext: MonitorConnectionContext = monitorContextRef.deref(); + if (monitorContext && monitorContext.isActive()) { + this.activeContexts.push(monitorContextRef); + } + } + } + } + processedKeys.forEach((key) => { + MonitorImpl.newContexts.delete(key); + }); + await sleep(1000); + } + return; + } catch (err) { + // do nothing, exit task + } + logger.debug(Messages.get("MonitorImpl.stopMonitoringTaskNewContext", this.hostInfo.host)); + } + + async run(): Promise { + logger.debug(Messages.get("MonitorImpl.startMonitoring", this.hostInfo.host)); + + try { + while (!this.isStopped()) { + try { + if (this.activeContexts.length === 0 && !this.hostUnhealthy) { + await new Promise((resolve) => { + this.delayMillisTimeoutId = setTimeout(resolve, MonitorImpl.TASK_SLEEP_MILLIS); + }); + continue; + } + + const statusCheckStartTimeNanos: number = getCurrentTimeNano(); + const isValid = await this.checkConnectionStatus(); + const statusCheckEndTimeNanos: number = getCurrentTimeNano(); + + await this.updateHostHealthStatus(isValid, statusCheckStartTimeNanos, statusCheckEndTimeNanos); + + if (this.hostUnhealthy) { + this.pluginService.setAvailability(this.hostInfo.aliases, HostAvailability.NOT_AVAILABLE); + } + const tmpActiveContexts: WeakRef[] = []; + + let monitorContextRef: WeakRef | undefined; + + while ((monitorContextRef = this.activeContexts.shift())) { + if (this.isStopped()) { + break; + } + const monitorContext: MonitorConnectionContext = monitorContextRef?.deref() ?? null; + if (!monitorContext) { + continue; + } + + if (this.hostUnhealthy) { + // Kill connection + monitorContext.setHostUnhealthy(true); + monitorContext.setInactive(); + const connectionToAbort = monitorContext.getClient(); + if (connectionToAbort != null) { + await this.endMonitoringClient(); + this.abortedConnectionsCounter.inc(); + } + } else if (monitorContext && monitorContext.isActive()) { + tmpActiveContexts.push(monitorContextRef); + } + } + + // activeContexts is empty now and tmpActiveContexts contains all yet active contexts + // Add active contexts back to the queue. + this.activeContexts.push(...tmpActiveContexts); + + const delayMillis = (this.failureDetectionIntervalNanos - (statusCheckEndTimeNanos - statusCheckStartTimeNanos)) / 1000000; + + await new Promise((resolve) => { + this.delayMillisTimeoutId = setTimeout( + resolve, + delayMillis < MonitorImpl.TASK_SLEEP_MILLIS ? MonitorImpl.TASK_SLEEP_MILLIS : delayMillis + ); + }); + } catch (error: any) { + logger.debug(Messages.get("MonitorImpl.errorDuringMonitoringContinue", error.message)); + } + } + } catch (error: any) { + logger.debug(Messages.get("MonitorImpl.errorDuringMonitoringStop", error.message)); + } finally { + await this.endMonitoringClient(); + await sleep(3000); + } + + logger.debug(Messages.get("MonitorImpl.stopMonitoring", this.hostInfo.host)); + } + + /** + * Check the status of the monitored server by sending a ping. + * + * @return whether the server is still alive and the elapsed time spent checking. + */ + async checkConnectionStatus(): Promise { + const connectContext = this.telemetryFactory.openTelemetryContext("Connection status check", TelemetryTraceLevel.FORCE_TOP_LEVEL); + connectContext.setAttribute("url", this.hostInfo.host); + try { + if (!(await this.pluginService.isClientValid(this.monitoringClient))) { + // Open a new connection. + const monitoringConnProperties: Map = new Map(this.properties); + for (const key of monitoringConnProperties.keys()) { + if (!key.startsWith(WrapperProperties.MONITORING_PROPERTY_PREFIX)) { + continue; + } + monitoringConnProperties.set(key.substring(WrapperProperties.MONITORING_PROPERTY_PREFIX.length), this.properties.get(key)); + monitoringConnProperties.delete(key); + } + + logger.debug(`Opening a monitoring connection to ${this.hostInfo.url}`); + this.monitoringClient = await this.pluginService.forceConnect(this.hostInfo, monitoringConnProperties); + logger.debug(`Successfully opened monitoring connection to ${this.monitoringClient.id} - ${this.hostInfo.url}`); + return true; + } + return true; + } catch (error: any) { + return false; + } + } + + isStopped(): boolean { + return this.stopped; + } + + async end(): Promise { + this.stopped = true; + // Waiting for 30s gives a task enough time to exit monitoring loop and close database connection. + await sleep(30000); + logger.debug(Messages.get("MonitorImpl.stopped", this.hostInfo.host)); + } + + updateHostHealthStatus(connectionValid: boolean, statusCheckStartNano: number, statusCheckEndNano: number): Promise { + if (!connectionValid) { + this.failureCount++; + + if (this.invalidHostStartTimeNano === 0) { + this.invalidHostStartTimeNano = statusCheckStartNano; + } + + const invalidHostDurationNano = statusCheckEndNano - this.invalidHostStartTimeNano; + const maxInvalidHostDurationNano = this.failureDetectionIntervalNanos * Math.max(0, this.failureDetectionCount - 1); + + if (invalidHostDurationNano >= maxInvalidHostDurationNano) { + logger.debug(Messages.get("MonitorConnectionContext.hostDead", this.hostInfo.host)); + this.hostUnhealthy = true; + return Promise.resolve(); + } + logger.debug(Messages.get("MonitorConnectionContext.hostNotResponding", this.hostInfo.host)); + return Promise.resolve(); + } + + if (this.failureCount > 0) { + // Host is back alive + logger.debug(Messages.get("MonitorConnectionContext.hostAlive", this.hostInfo.host)); + } + this.failureCount = 0; + this.invalidHostStartTimeNano = 0; + this.hostUnhealthy = false; + } + + async releaseResources() { + this.stopped = true; + clearTimeout(this.delayMillisTimeoutId); + clearTimeout(this.sleepWhenHostHealthyTimeoutId); + this.activeContexts = null; + await this.endMonitoringClient(); + // Allow time for monitor loop to close. + await sleep(500); + } + + async endMonitoringClient() { + if (this.monitoringClient) { + await this.pluginService.abortTargetClient(this.monitoringClient); + this.monitoringClient = null; + } + } +} diff --git a/common/lib/plugins/efm2/monitor_connection_context.ts b/common/lib/plugins/efm2/monitor_connection_context.ts new file mode 100644 index 00000000..7b0b9146 --- /dev/null +++ b/common/lib/plugins/efm2/monitor_connection_context.ts @@ -0,0 +1,57 @@ +/* + Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + + Licensed under the Apache License, Version 2.0 (the "License"). + You may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +import { uniqueId } from "../../../logutils"; +import { ClientWrapper } from "../../client_wrapper"; + +/** + * Monitoring context for each connection. This contains each connection's criteria for whether a + * server should be considered unhealthy. The context is shared between the main task and the monitor task. + */ +export class MonitorConnectionContext { + private clientToAbortRef: WeakRef | undefined; + isHostUnhealthy: boolean = false; + id: string = uniqueId("_monitorContext"); + + /** + * Constructor. + * + * @param clientToAbort A reference to the connection associated with this context that will be aborted. + */ + constructor(clientToAbort: ClientWrapper) { + this.clientToAbortRef = new WeakRef(clientToAbort); + } + + setHostUnhealthy(hostUnhealthy: boolean) { + this.isHostUnhealthy = hostUnhealthy; + } + + shouldAbort(): boolean { + return this.isHostUnhealthy && this.clientToAbortRef != null; + } + + setInactive(): void { + this.clientToAbortRef = null; + } + + getClient(): ClientWrapper | null { + return this.clientToAbortRef.deref() ?? null; + } + + isActive() { + return !!this.clientToAbortRef?.deref(); + } +} diff --git a/common/lib/plugins/efm2/monitor_service.ts b/common/lib/plugins/efm2/monitor_service.ts new file mode 100644 index 00000000..332cec92 --- /dev/null +++ b/common/lib/plugins/efm2/monitor_service.ts @@ -0,0 +1,166 @@ +/* + Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + + Licensed under the Apache License, Version 2.0 (the "License"). + You may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +import { MonitorConnectionContext } from "./monitor_connection_context"; +import { HostInfo } from "../../host_info"; +import { AwsWrapperError } from "../../utils/errors"; +import { Monitor, MonitorImpl } from "./monitor"; +import { WrapperProperties } from "../../wrapper_property"; +import { SlidingExpirationCache } from "../../utils/sliding_expiration_cache"; +import { PluginService } from "../../plugin_service"; +import { Messages } from "../../utils/messages"; +import { TelemetryCounter } from "../../utils/telemetry/telemetry_counter"; +import { TelemetryFactory } from "../../utils/telemetry/telemetry_factory"; +import { ClientWrapper } from "../../client_wrapper"; +import { logger } from "../../../logutils"; + +export interface MonitorService { + startMonitoring( + clientToAbort: ClientWrapper, + hostInfo: HostInfo, + properties: Map, + failureDetectionTimeMillis: number, + failureDetectionIntervalMillis: number, + failureDetectionCount: number + ): Promise; + + /** + * Stop monitoring for a connection represented by the given {@link MonitorConnectionContext}. + * Removes the context from the {@link MonitorImpl}. + * + * @param context The {@link MonitorConnectionContext} representing a connection. + * @param clientToAbort A reference to the connection associated with this context that will be aborted. + */ + stopMonitoring(context: MonitorConnectionContext, clientToAbort: ClientWrapper): Promise; + + releaseResources(): Promise; +} + +export class MonitorServiceImpl implements MonitorService { + private static readonly CACHE_CLEANUP_NANOS = BigInt(60_000_000_000); + + protected static readonly monitors: SlidingExpirationCache = new SlidingExpirationCache( + MonitorServiceImpl.CACHE_CLEANUP_NANOS, + (monitor: Monitor) => monitor.canDispose(), + async (monitor: Monitor) => { + { + try { + await monitor.endMonitoringClient(); + } catch (error) { + // ignore + } + } + } + ); + private readonly pluginService: PluginService; + private telemetryFactory: TelemetryFactory; + private readonly abortedConnectionsCounter: TelemetryCounter; + monitorSupplier = ( + pluginService: PluginService, + hostInfo: HostInfo, + properties: Map, + failureDetectionTimeMillis: number, + failureDetectionIntervalMillis: number, + failureDetectionCount: number, + abortedConnectionsCounter: TelemetryCounter + ) => + new MonitorImpl( + pluginService, + hostInfo, + properties, + failureDetectionTimeMillis, + failureDetectionIntervalMillis, + failureDetectionCount, + abortedConnectionsCounter + ); + + constructor(pluginService: PluginService) { + this.pluginService = pluginService; + this.telemetryFactory = pluginService.getTelemetryFactory(); + this.abortedConnectionsCounter = this.telemetryFactory.createCounter("efm2.connections.aborted"); + } + + async startMonitoring( + clientToAbort: ClientWrapper, + hostInfo: HostInfo, + properties: Map, + failureDetectionTimeMillis: number, + failureDetectionIntervalMillis: number, + failureDetectionCount: number + ): Promise { + const monitor = await this.getMonitor(hostInfo, properties, failureDetectionTimeMillis, failureDetectionIntervalMillis, failureDetectionCount); + + if (monitor) { + const context = new MonitorConnectionContext(clientToAbort); + monitor.startMonitoring(context); + return context; + } + + throw new AwsWrapperError(Messages.get("MonitorService.startMonitoringNullMonitor", hostInfo.host)); + } + + async stopMonitoring(context: MonitorConnectionContext, clientToAbort: ClientWrapper): Promise { + context.setInactive(); + if (context.shouldAbort()) { + try { + await clientToAbort.abort(); + this.abortedConnectionsCounter.inc(); + } catch (error) { + // ignore + logger.debug(Messages.get("MonitorConnectionContext.errorAbortingConnection", error.message)); + } + } + } + + async getMonitor( + hostInfo: HostInfo, + properties: Map, + failureDetectionTimeMillis: number, + failureDetectionIntervalMillis: number, + failureDetectionCount: number + ): Promise { + const monitorKey: string = `${failureDetectionTimeMillis.toString()} ${failureDetectionIntervalMillis.toString()} ${failureDetectionCount.toString()} ${hostInfo.host}`; + + const cacheExpirationNanos = BigInt(WrapperProperties.MONITOR_DISPOSAL_TIME_MS.get(properties) * 1_000_000); + return MonitorServiceImpl.monitors.computeIfAbsent( + monitorKey, + () => + this.monitorSupplier( + this.pluginService, + hostInfo, + properties, + failureDetectionTimeMillis, + failureDetectionIntervalMillis, + failureDetectionCount, + this.abortedConnectionsCounter + ), + cacheExpirationNanos + ); + } + + async releaseResources() { + for (const [_key, monitor] of MonitorServiceImpl.monitors.entries) { + if (monitor.item) { + await monitor.item.releaseResources(); + } + } + MonitorServiceImpl.clearMonitors(); + } + + static clearMonitors() { + MonitorServiceImpl.monitors.clear(); + } +} diff --git a/common/lib/plugins/strategy/fastest_response/host_response_time_monitor.ts b/common/lib/plugins/strategy/fastest_response/host_response_time_monitor.ts index d9df756d..5e0a8b80 100644 --- a/common/lib/plugins/strategy/fastest_response/host_response_time_monitor.ts +++ b/common/lib/plugins/strategy/fastest_response/host_response_time_monitor.ts @@ -23,6 +23,7 @@ import { Messages } from "../../../utils/messages"; import { TelemetryTraceLevel } from "../../../utils/telemetry/telemetry_trace_level"; import { ClientWrapper } from "../../../client_wrapper"; import { TelemetryContext } from "../../../utils/telemetry/telemetry_context"; +import { WrapperProperties } from "../../../wrapper_property"; export class HostResponseTimeMonitor { static readonly MONITORING_PROPERTY_PREFIX = "frt_"; @@ -125,10 +126,10 @@ export class HostResponseTimeMonitor { } const monitoringConnProperties: Map = new Map(this.properties); for (const key of monitoringConnProperties.keys()) { - if (!key.startsWith(HostResponseTimeMonitor.MONITORING_PROPERTY_PREFIX)) { + if (!key.startsWith(WrapperProperties.MONITORING_PROPERTY_PREFIX)) { continue; } - monitoringConnProperties.set(key.substring(HostResponseTimeMonitor.MONITORING_PROPERTY_PREFIX.length), this.properties.get(key)); + monitoringConnProperties.set(key.substring(WrapperProperties.MONITORING_PROPERTY_PREFIX.length), this.properties.get(key)); monitoringConnProperties.delete(key); } logger.debug(Messages.get("HostResponseTimeMonitor.openingConnection", this.hostInfo.url)); diff --git a/common/lib/utils/locales/en.json b/common/lib/utils/locales/en.json index 9f932a26..e201757e 100644 --- a/common/lib/utils/locales/en.json +++ b/common/lib/utils/locales/en.json @@ -5,7 +5,7 @@ "PluginManager.unableToRetrievePlugin": "Unable to retrieve plugin instance.", "ConnectionProvider.unsupportedHostSelectorStrategy": "Unsupported host selection strategy '%s' specified for this connection provider '%s'. Please visit the documentation for all supported strategies.", "ConnectionPluginChainBuilder.errorImportingPlugin": "The plugin could not be imported due to error '%s'. Please ensure the required dependencies have been installed. Plugin: '%s'", - "ClientUtils.queryTaskTimeout": "Client query task timed out, if a network error did not occur, please review the usage of the 'mysqlQueryTimeout' connection parameter.", + "ClientUtils.queryTaskTimeout": "Client query task timed out, if a network error did not occur, please review the usage of the 'wrapperQueryTimeout' connection parameter.", "ClientUtils.connectTimeout": "Client connect timed out.", "DatabaseDialectManager.unknownDialectCode": "Unknown dialect code: '%s'.", "DatabaseDialectManager.getDialectError": "Was not able to get a database dialect.", @@ -127,7 +127,7 @@ "HostAvailabilityStrategy.invalidInitialBackoffTime": "Invalid value of '%s' for configuration parameter `hostAvailabilityStrategyInitialBackoffTime`. It must be an integer greater or equal to 1.", "MonitorConnectionContext.errorAbortingConnection": "Error during aborting connection: %s.", "MonitorConnectionContext.hostDead": "Host %s is *dead*.", - "MonitorConnectionContext.hostNotResponding": "Host %s is not *responding* %s", + "MonitorConnectionContext.hostNotResponding": "Host %s is *not responding*", "MonitorConnectionContext.hostAlive": "Host %s is *alive*.", "MonitorImpl.contextNullWarning": "Parameter 'context' should not be null or undefined.", "MonitorImpl.errorDuringMonitoringContinue": "Continuing monitoring after unhandled error was thrown in monitoring for host %s.", @@ -136,6 +136,8 @@ "MonitorImpl.stopped": "Stopped monitoring for host '%s'.", "MonitorImpl.startMonitoring": "Start monitoring for %s.", "MonitorImpl.stopMonitoring": "Stop monitoring for %s.", + "MonitorImpl.startMonitoringTaskNewContext": "Start monitoring thread for checking new contexts for '%s'", + "MonitorImpl.stopMonitoringTaskNewContext": "Stop monitoring thread for checking new contexts for '%s'", "MonitorService.startMonitoringNullMonitor": "Start monitoring called but could not find monitor for host: '%s'.", "MonitorService.emptyAliasSet": "Empty alias set passed for '%s'. Set should not be empty.", "PluginService.hostListEmpty": "Current host list is empty.", @@ -143,7 +145,7 @@ "PluginService.hostsChangeListEmpty": "There are no changes in the hosts' availability.", "PluginService.failedToRetrieveHostPort": "Could not retrieve Host:Port for connection.", "PluginService.nonEmptyAliases": "fillAliases called when HostInfo already contains the following aliases: '%s'.", - "PluginService.forceMonitoringRefreshTimeout": "A timeout exception occurred after waiting '%s' ms for refreshed topology.", + "PluginService.forceMonitoringRefreshTimeout": "A timeout error occurred after waiting '%s' ms for refreshed topology.", "PluginService.requiredBlockingHostListProvider": "The detected host list provider is not a BlockingHostListProvider. A BlockingHostListProvider is required to force refresh the host list. Detected host list provider: '%s'.", "PluginService.currentHostNotAllowed": "The current host is not in the list of allowed hosts. Current host: '%s'. Allowed hosts: '%s'.", "PluginService.currentHostNotDefined": "The current host is undefined.", @@ -153,6 +155,8 @@ "HostMonitoringConnectionPlugin.unableToIdentifyConnection": "Unable to identify the given connection: '%s', please ensure the correct host list provider is specified. The host list provider in use is: '%s'.", "HostMonitoringConnectionPlugin.errorIdentifyingConnection": "Error occurred while identifying connection: '%s'.", "HostMonitoringConnectionPlugin.unavailableHost": "Host '%s' is unavailable.", + "HostMonitoringConnectionPlugin.identifyClusterConnection": "Monitoring host info is associated with a cluster endpoint, plugin needs to identify the cluster connection.", + "PluginServiceImpl.failedToRetrieveHostPort": "PluginServiceImpl.failedToRetrieveHostPort", "AuroraInitialConnectionStrategyPlugin.unsupportedStrategy": "Unsupported host selection strategy '%s'.", "AuroraInitialConnectionStrategyPlugin.requireDynamicProvider": "Dynamic host list provider is required.", "OpenedConnectionTracker.unableToPopulateOpenedConnectionQueue": "The driver is unable to track this opened connection because the instance endpoint is unknown: '%s'", @@ -229,6 +233,7 @@ "HostMonitor.detectedWriter": "Detected writer: '%s'.", "HostMonitor.endMonitoring": "Host monitor '%s' completed in '%s'.", "HostMonitor.writerHostChanged": "Writer host has changed from '%s' to '%s'.", + "HostMonitoringConnectionPlugin.monitoringDeactivated": "Monitoring deactivated for method '%s'.", "CustomEndpointPlugin.connectionRequestToCustomEndpoint": "Detected a connection request to a custom endpoint URL: '%s'.", "CustomEndpointPlugin.errorParsingEndpointIdentifier": "Unable to parse custom endpoint identifier from URL: '%s'.", "CustomEndpointPlugin.unableToDetermineRegion": "Unable to determine connection region. If you are using a non-standard RDS URL, please set the '%s' property.", diff --git a/common/lib/utils/utils.ts b/common/lib/utils/utils.ts index b5f29c0c..1ff3cead 100644 --- a/common/lib/utils/utils.ts +++ b/common/lib/utils/utils.ts @@ -34,6 +34,10 @@ export function getTimeoutTask(timer: any, message: string, timeoutValue: number }); } +export function getCurrentTimeNano() { + return Number(process.hrtime.bigint()); +} + export function shuffleList(list: any[]) { for (let i = list.length - 1; i > 0; i--) { const j = Math.floor(Math.random() * (i + 1)); diff --git a/common/lib/wrapper_property.ts b/common/lib/wrapper_property.ts index 7ef996d4..e69fb5d1 100644 --- a/common/lib/wrapper_property.ts +++ b/common/lib/wrapper_property.ts @@ -46,7 +46,7 @@ export class WrapperProperty { export class WrapperProperties { static readonly MONITORING_PROPERTY_PREFIX: string = "monitoring_"; - static readonly DEFAULT_PLUGINS = "auroraConnectionTracker,failover,efm"; + static readonly DEFAULT_PLUGINS = "auroraConnectionTracker,failover,efm2"; static readonly DEFAULT_TOKEN_EXPIRATION_SEC = 15 * 60; static readonly PLUGINS = new WrapperProperty( diff --git a/examples/aws_driver_example/aws_internal_connection_pooling_postgres_example.ts b/examples/aws_driver_example/aws_internal_connection_pooling_postgres_example.ts index 2e1ed548..cc2609fe 100644 --- a/examples/aws_driver_example/aws_internal_connection_pooling_postgres_example.ts +++ b/examples/aws_driver_example/aws_internal_connection_pooling_postgres_example.ts @@ -55,7 +55,7 @@ const client = new AwsPGClient({ user: username, password: password, database: database, - plugins: "readWriteSplitting,failover,efm", + plugins: "readWriteSplitting,failover,efm2", // Optional: PoolKey property value and connection provider used in internal connection pools. connectionProvider: provider, diff --git a/examples/aws_driver_example/aws_read_write_splitting_mysql_example.ts b/examples/aws_driver_example/aws_read_write_splitting_mysql_example.ts index 27b013b5..a909e418 100644 --- a/examples/aws_driver_example/aws_read_write_splitting_mysql_example.ts +++ b/examples/aws_driver_example/aws_read_write_splitting_mysql_example.ts @@ -31,7 +31,7 @@ const client = new AwsMySQLClient({ user: username, password: password, database: database, - plugins: "readWriteSplitting, failover, efm" + plugins: "readWriteSplitting, failover, efm2" }); // Setup Step: Open connection and create tables - uncomment this section to create table and test values. diff --git a/examples/aws_driver_example/fastest_response_strategy_mysql_example.ts b/examples/aws_driver_example/fastest_response_strategy_mysql_example.ts index dae0c39e..1e64f979 100644 --- a/examples/aws_driver_example/fastest_response_strategy_mysql_example.ts +++ b/examples/aws_driver_example/fastest_response_strategy_mysql_example.ts @@ -25,13 +25,13 @@ const database = "database"; const port = 3306; const client = new AwsMySQLClient({ - // Configure connection parameters. Enable readWriteSplitting, failover, and efm plugins. + // Configure connection parameters. Enable readWriteSplitting, failover, and efm2 plugins. host: mysqlHost, port: port, user: username, password: password, database: database, - plugins: "readWriteSplitting, fastestResponseStrategy, failover, efm", + plugins: "readWriteSplitting, fastestResponseStrategy, failover, efm2", readerHostSelectorStrategy: "fastestResponse" }); diff --git a/examples/aws_driver_example/fastest_response_strategy_postgres_example.ts b/examples/aws_driver_example/fastest_response_strategy_postgres_example.ts index 7ba07d44..d5485a0a 100644 --- a/examples/aws_driver_example/fastest_response_strategy_postgres_example.ts +++ b/examples/aws_driver_example/fastest_response_strategy_postgres_example.ts @@ -31,7 +31,7 @@ const client = new AwsPGClient({ user: username, password: password, database: database, - plugins: "readWriteSplitting, fastestResponseStrategy, failover, efm", + plugins: "readWriteSplitting, fastestResponseStrategy, failover, efm2", readerHostSelectorStrategy: "fastestResponse" }); diff --git a/mysql/lib/dialect/rds_multi_az_mysql_database_dialect.ts b/mysql/lib/dialect/rds_multi_az_mysql_database_dialect.ts index 2689604a..0a8ebddc 100644 --- a/mysql/lib/dialect/rds_multi_az_mysql_database_dialect.ts +++ b/mysql/lib/dialect/rds_multi_az_mysql_database_dialect.ts @@ -126,7 +126,13 @@ export class RdsMultiAZMySQLDatabaseDialect extends MySQLDatabaseDialect impleme } async getHostRole(client: ClientWrapper): Promise { - return (await this.executeTopologyRelatedQuery(client, RdsMultiAZMySQLDatabaseDialect.IS_READER_QUERY, RdsMultiAZMySQLDatabaseDialect.IS_READER_QUERY_COLUMN_NAME)) == "0" ? HostRole.WRITER : HostRole.READER; + return (await this.executeTopologyRelatedQuery( + client, + RdsMultiAZMySQLDatabaseDialect.IS_READER_QUERY, + RdsMultiAZMySQLDatabaseDialect.IS_READER_QUERY_COLUMN_NAME + )) == "0" + ? HostRole.WRITER + : HostRole.READER; } async getWriterId(targetClient: ClientWrapper): Promise { diff --git a/pg/lib/dialect/rds_multi_az_pg_database_dialect.ts b/pg/lib/dialect/rds_multi_az_pg_database_dialect.ts index 6c11f86b..2ef3a3f4 100644 --- a/pg/lib/dialect/rds_multi_az_pg_database_dialect.ts +++ b/pg/lib/dialect/rds_multi_az_pg_database_dialect.ts @@ -131,7 +131,13 @@ export class RdsMultiAZPgDatabaseDialect extends PgDatabaseDialect implements To } async getHostRole(client: ClientWrapper): Promise { - return (await this.executeTopologyRelatedQuery(client, RdsMultiAZPgDatabaseDialect.IS_READER_QUERY, RdsMultiAZPgDatabaseDialect.IS_READER_QUERY_COLUMN_NAME)) === false ? HostRole.WRITER : HostRole.READER; + return (await this.executeTopologyRelatedQuery( + client, + RdsMultiAZPgDatabaseDialect.IS_READER_QUERY, + RdsMultiAZPgDatabaseDialect.IS_READER_QUERY_COLUMN_NAME + )) === false + ? HostRole.WRITER + : HostRole.READER; } async getWriterId(targetClient: ClientWrapper): Promise { @@ -143,7 +149,7 @@ export class RdsMultiAZPgDatabaseDialect extends PgDatabaseDialect implements To ); const currentConnection = await this.identifyConnection(targetClient); - return (currentConnection && currentConnection === writerHostId) ? currentConnection : null; + return currentConnection && currentConnection === writerHostId ? currentConnection : null; } catch (error: any) { throw new AwsWrapperError(Messages.get("RdsMultiAZPgDatabaseDialect.invalidQuery", error.message)); } diff --git a/tests/integration/container/tests/autoscaling.test.ts b/tests/integration/container/tests/autoscaling.test.ts index 46818447..757ffd0e 100644 --- a/tests/integration/container/tests/autoscaling.test.ts +++ b/tests/integration/container/tests/autoscaling.test.ts @@ -72,7 +72,7 @@ async function initConfigWithFailover(host: string, port: number, provider: Inte database: env.databaseInfo.defaultDbName, password: env.databaseInfo.password, port: port, - plugins: "readWriteSplitting,failover", + plugins: "readWriteSplitting,failover,efm2", connectionProvider: provider, failoverTimeoutMs: 400000, enableTelemetry: true, diff --git a/tests/integration/container/tests/basic_connectivity.test.ts b/tests/integration/container/tests/basic_connectivity.test.ts index 45aec100..91c68327 100644 --- a/tests/integration/container/tests/basic_connectivity.test.ts +++ b/tests/integration/container/tests/basic_connectivity.test.ts @@ -73,7 +73,7 @@ describe("basic_connectivity", () => { database: env.databaseInfo.defaultDbName, password: env.databaseInfo.password, port: env.databaseInfo.clusterEndpointPort, - plugins: "failover,efm", + plugins: "failover,efm2", enableTelemetry: true, telemetryTracesBackend: "OTLP", telemetryMetricsBackend: "OTLP" @@ -99,7 +99,7 @@ describe("basic_connectivity", () => { database: env.databaseInfo.defaultDbName, password: env.databaseInfo.password, port: env.databaseInfo.clusterEndpointPort, - plugins: "failover,efm", + plugins: "failover,efm2", enableTelemetry: true, telemetryTracesBackend: "OTLP", telemetryMetricsBackend: "OTLP" @@ -125,7 +125,7 @@ describe("basic_connectivity", () => { database: env.databaseInfo.defaultDbName, password: env.databaseInfo.password, port: env.databaseInfo.clusterEndpointPort, - plugins: "failover,efm", + plugins: "failover,efm2", enableTelemetry: true, telemetryTracesBackend: "OTLP", telemetryMetricsBackend: "OTLP" diff --git a/tests/integration/container/tests/performance.test.ts b/tests/integration/container/tests/performance.test.ts index 6144111e..3abfa69d 100644 --- a/tests/integration/container/tests/performance.test.ts +++ b/tests/integration/container/tests/performance.test.ts @@ -163,7 +163,7 @@ async function executeFailureDetectionTimeFailoverAndEfmEnabled( ) { const props = new Map(); const config = await initDefaultConfig(env.proxyDatabaseInfo.writerInstanceEndpoint, env.proxyDatabaseInfo.clusterEndpointPort); - config["plugins"] = "efm,failover"; + config["plugins"] = "efm2,failover"; config[WrapperProperties.FAILURE_DETECTION_TIME_MS.name] = detectionTimeMillis; config[WrapperProperties.FAILURE_DETECTION_INTERVAL_MS.name] = detectionIntervalMillis; config[WrapperProperties.FAILURE_DETECTION_COUNT.name] = detectionCount; diff --git a/tests/integration/container/tests/read_write_splitting.test.ts b/tests/integration/container/tests/read_write_splitting.test.ts index 385ab750..fc5b671a 100644 --- a/tests/integration/container/tests/read_write_splitting.test.ts +++ b/tests/integration/container/tests/read_write_splitting.test.ts @@ -81,7 +81,7 @@ async function initConfigWithFailover(host: string, port: number, connectToProxy } async function initConfigWithFailover2(host: string, port: number, connectToProxy: boolean): Promise { - const config: any = await initConfig(host, port, connectToProxy, "readWriteSplitting,failover2"); + const config: any = await initConfig(host, port, connectToProxy, "readWriteSplitting,efm2,failover2"); config["failoverTimeoutMs"] = 400000; return config; } diff --git a/tests/integration/container/tests/session_state.test.ts b/tests/integration/container/tests/session_state.test.ts index 04d5bfa7..59a68092 100644 --- a/tests/integration/container/tests/session_state.test.ts +++ b/tests/integration/container/tests/session_state.test.ts @@ -30,7 +30,9 @@ import { AwsMySQLClient } from "../../../../mysql/lib"; import { TransactionIsolationLevel } from "../../../../common/lib/utils/transaction_isolation_level"; const itIf = - !features.includes(TestEnvironmentFeatures.PERFORMANCE) && !features.includes(TestEnvironmentFeatures.RUN_AUTOSCALING_TESTS_ONLY) ? it : it.skip; + !features.includes(TestEnvironmentFeatures.PERFORMANCE) && !features.includes(TestEnvironmentFeatures.RUN_AUTOSCALING_TESTS_ONLY) + ? it.skip + : it.skip; let client: any; diff --git a/tests/integration/container/tests/utils/driver_helper.ts b/tests/integration/container/tests/utils/driver_helper.ts index aa02e773..5a823923 100644 --- a/tests/integration/container/tests/utils/driver_helper.ts +++ b/tests/integration/container/tests/utils/driver_helper.ts @@ -109,17 +109,14 @@ export class DriverHelper { static addDriverSpecificConfiguration(props: any, engine: DatabaseEngine, performance: boolean = false) { if (engine === DatabaseEngine.PG && !performance) { - props["query_timeout"] = 10000; props["ssl"] = { rejectUnauthorized: false }; - } else if (engine === DatabaseEngine.PG && performance) { - props["query_timeout"] = 120000; - props["connectionTimeoutMillis"] = 3000; - props["monitoring_query_timeout"] = 3000; - } else if (engine === DatabaseEngine.MYSQL && performance) { - props["connectTimeout"] = 3000; - props["monitoring_wrapperQueryTimeout"] = 3000; - props["wrapperQueryTimeout"] = 120000; } + + props["wrapperConnectTimeout"] = 3000; + props["wrapperQueryTimeout"] = 120000; + props["monitoring_wrapperQueryTimeout"] = 3000; + props["monitoring_wrapperConnectTimeout"] = 3000; + props["failureDetectionTime"] = 1000; return props; } }