diff --git a/common/lib/host_list_provider/host_list_provider.ts b/common/lib/host_list_provider/host_list_provider.ts index 5e657ffa..ffa0d2ae 100644 --- a/common/lib/host_list_provider/host_list_provider.ts +++ b/common/lib/host_list_provider/host_list_provider.ts @@ -23,6 +23,10 @@ export type DynamicHostListProvider = HostListProvider; export type StaticHostListProvider = HostListProvider; +export interface BlockingHostListProvider extends HostListProvider { + forceMonitoringRefresh(shouldVerifyWriter: boolean, timeoutMs: number): Promise; +} + export interface HostListProvider { refresh(): Promise; diff --git a/common/lib/host_list_provider/monitoring/cluster_topology_monitor.ts b/common/lib/host_list_provider/monitoring/cluster_topology_monitor.ts index 0510ae39..b41dc096 100644 --- a/common/lib/host_list_provider/monitoring/cluster_topology_monitor.ts +++ b/common/lib/host_list_provider/monitoring/cluster_topology_monitor.ts @@ -27,9 +27,9 @@ import { MonitoringRdsHostListProvider } from "./monitoring_host_list_provider"; import { Messages } from "../../utils/messages"; export interface ClusterTopologyMonitor { - forceRefresh(client: any, timeoutMs: number): Promise; + forceRefresh(client: ClientWrapper, timeoutMs: number): Promise; - close(): void; + close(): Promise; forceMonitoringRefresh(shouldVerifyWriter: boolean, timeoutMs: number): Promise; } @@ -50,9 +50,9 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor { private writerHostInfo: HostInfo = null; private isVerifiedWriterConnection: boolean = false; private monitoringClient: ClientWrapper = null; - private highRefreshRateEndTime: number = -1; + private highRefreshRateEndTimeMs: number = -1; private highRefreshPeriodAfterPanicMs: number = 30000; // 30 seconds. - private ignoreNewTopologyRequestsEndTime: number = -1; + private ignoreNewTopologyRequestsEndTimeMs: number = -1; private ignoreTopologyRequestMs: number = 10000; // 10 seconds. // Tracking of the host monitors. @@ -122,11 +122,12 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor { this.hostMonitors.clear(); } - async forceMonitoringRefresh(shouldVerifyWriter: boolean, timeoutMs: number): Promise { - if (this.ignoreNewTopologyRequestsEndTime > 0 && Date.now() < this.ignoreNewTopologyRequestsEndTime) { + async forceMonitoringRefresh(shouldVerifyWriter: boolean, timeoutMs: number): Promise { + if (Date.now() < this.ignoreNewTopologyRequestsEndTimeMs) { // Previous failover has just completed, use results without triggering new update. const currentHosts = this.topologyMap.get(this.clusterId); if (currentHosts !== null) { + logger.info(Messages.get("ClusterTopologyMonitoring.ignoringTopologyRequest")); return currentHosts; } } @@ -144,7 +145,7 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor { return await this.waitTillTopologyGetsUpdated(timeoutMs); } - async forceRefresh(client: any, timeoutMs: number): Promise { + async forceRefresh(client: ClientWrapper, timeoutMs: number): Promise { if (this.isVerifiedWriterConnection) { return await this.waitTillTopologyGetsUpdated(timeoutMs); } @@ -153,7 +154,7 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor { return await this.fetchTopologyAndUpdateCache(client); } - async waitTillTopologyGetsUpdated(timeoutMs: number): Promise { + async waitTillTopologyGetsUpdated(timeoutMs: number): Promise { // Signal to any monitor that might be in delay, that topology should be updated. this.requestToUpdateTopology = true; @@ -176,7 +177,7 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor { return latestHosts; } - async fetchTopologyAndUpdateCache(client: ClientWrapper): Promise { + async fetchTopologyAndUpdateCache(client: ClientWrapper): Promise { if (!client) { return null; } @@ -193,7 +194,7 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor { return null; } - private async openAnyClientAndUpdateTopology(): Promise { + private async openAnyClientAndUpdateTopology(): Promise { let writerVerifiedByThisThread = false; if (!this.monitoringClient) { let client: ClientWrapper; @@ -225,10 +226,10 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor { const hosts: HostInfo[] = await this.fetchTopologyAndUpdateCache(this.monitoringClient); if (writerVerifiedByThisThread) { - if (this.ignoreNewTopologyRequestsEndTime === -1) { - this.ignoreNewTopologyRequestsEndTime = 0; + if (this.ignoreNewTopologyRequestsEndTimeMs === -1) { + this.ignoreNewTopologyRequestsEndTimeMs = 0; } else { - this.ignoreNewTopologyRequestsEndTime = Date.now() + this.ignoreTopologyRequestMs; + this.ignoreNewTopologyRequestsEndTimeMs = Date.now() + this.ignoreTopologyRequestMs; } } @@ -239,7 +240,7 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor { return hosts; } - updateTopologyCache(hosts: HostInfo[]) { + updateTopologyCache(hosts: HostInfo[]): void { this.topologyMap.put(this.clusterId, hosts, ClusterTopologyMonitorImpl.TOPOLOGY_CACHE_EXPIRATION_NANOS); this.requestToUpdateTopology = false; } @@ -250,14 +251,14 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor { return writerHost === hostId ? writerHost : null; } - async closeConnection(client: ClientWrapper) { + async closeConnection(client: ClientWrapper): Promise { if (client !== null) { await client.abort(); client = null; } } - async updateMonitoringClient(newClient: ClientWrapper | null) { + async updateMonitoringClient(newClient: ClientWrapper | null): Promise { const clientToClose = this.monitoringClient; this.monitoringClient = newClient; if (clientToClose) { @@ -265,11 +266,11 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor { } } - private isInPanicMode() { + private isInPanicMode(): boolean { return !this.monitoringClient || !this.isVerifiedWriterConnection; } - async run() { + async run(): Promise { logger.debug(Messages.get("ClusterTopologyMonitor.startMonitoring")); try { while (!this.stopMonitoring) { @@ -317,15 +318,15 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor { await this.updateMonitoringClient(writerClient); this.writerHostInfo = writerHostInfo; this.isVerifiedWriterConnection = true; - if (this.ignoreNewTopologyRequestsEndTime === -1) { - this.ignoreNewTopologyRequestsEndTime = 0; + if (this.ignoreNewTopologyRequestsEndTimeMs === -1) { + this.ignoreNewTopologyRequestsEndTimeMs = 0; } else { - this.ignoreNewTopologyRequestsEndTime = Date.now() + this.ignoreTopologyRequestMs; + this.ignoreNewTopologyRequestsEndTimeMs = Date.now() + this.ignoreTopologyRequestMs; } - if (this.highRefreshRateEndTime === -1) { - this.highRefreshRateEndTime = 0; + if (this.highRefreshRateEndTimeMs === -1) { + this.highRefreshRateEndTimeMs = 0; } else { - this.highRefreshRateEndTime = Date.now() + this.highRefreshPeriodAfterPanicMs; + this.highRefreshRateEndTimeMs = Date.now() + this.highRefreshPeriodAfterPanicMs; } // Stop monitoring of each host, writer detected. @@ -363,18 +364,18 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor { await this.updateMonitoringClient(null); continue; } - if (this.highRefreshRateEndTime > 0 && Date.now() > this.highRefreshRateEndTime) { - this.highRefreshRateEndTime = 0; + if (this.highRefreshRateEndTimeMs > 0 && Date.now() > this.highRefreshRateEndTimeMs) { + this.highRefreshRateEndTimeMs = 0; } - if (this.highRefreshRateEndTime < 0) { + if (this.highRefreshRateEndTimeMs < 0) { // Log topology when not in high refresh rate. this.logTopology(`[clusterTopologyMonitor] `); } // Set an easily interruptible delay between topology refreshes. await this.delay(false); } - if (this.ignoreNewTopologyRequestsEndTime > 0 && Date.now() > this.ignoreNewTopologyRequestsEndTime) { - this.ignoreNewTopologyRequestsEndTime = 0; + if (this.ignoreNewTopologyRequestsEndTimeMs > 0 && Date.now() > this.ignoreNewTopologyRequestsEndTimeMs) { + this.ignoreNewTopologyRequestsEndTimeMs = 0; } } } catch (error) { @@ -387,7 +388,7 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor { } private async delay(useHighRefreshRate: boolean) { - if (this.highRefreshRateEndTime > 0 && Date.now() < this.highRefreshRateEndTime) { + if (Date.now() < this.highRefreshRateEndTimeMs) { useHighRefreshRate = true; } const endTime = Date.now() + (useHighRefreshRate ? this.highRefreshRateMs : this.refreshRateMs); @@ -418,7 +419,7 @@ export class HostMonitor { } async run() { - let client = null; + let client: ClientWrapper | null = null; let updateTopology: boolean = false; const startTime: number = Date.now(); logger.debug(Messages.get("HostMonitor.startMonitoring", this.hostInfo.hostId)); diff --git a/common/lib/host_list_provider/monitoring/monitoring_host_list_provider.ts b/common/lib/host_list_provider/monitoring/monitoring_host_list_provider.ts index f8446577..7b7f55ba 100644 --- a/common/lib/host_list_provider/monitoring/monitoring_host_list_provider.ts +++ b/common/lib/host_list_provider/monitoring/monitoring_host_list_provider.ts @@ -31,10 +31,10 @@ export class MonitoringRdsHostListProvider extends RdsHostListProvider { static readonly MONITOR_EXPIRATION_NANOS: bigint = BigInt(15 * 60_000_000_000); // 15 minutes. static readonly DEFAULT_TOPOLOGY_QUERY_TIMEOUT_MS = 5000; // 5 seconds. - private static monitors: SlidingExpirationCache = new SlidingExpirationCache( + private static monitors: SlidingExpirationCache = new SlidingExpirationCache( MonitoringRdsHostListProvider.CACHE_CLEANUP_NANOS, () => true, - async (monitor: ClusterTopologyMonitorImpl) => { + async (monitor: ClusterTopologyMonitor) => { try { await monitor.close(); } catch { @@ -52,10 +52,7 @@ export class MonitoringRdsHostListProvider extends RdsHostListProvider { async clearAll(): Promise { RdsHostListProvider.clearAll(); - for (const [key, monitor] of MonitoringRdsHostListProvider.monitors.entries) { - await monitor.item.close(); - } - MonitoringRdsHostListProvider.monitors.clear(); + await MonitoringRdsHostListProvider.monitors.clear(); } async queryForTopology(targetClient: ClientWrapper, dialect: DatabaseDialect): Promise { @@ -82,11 +79,6 @@ export class MonitoringRdsHostListProvider extends RdsHostListProvider { return await dialect.queryForTopology(targetClient, this).then((res: any) => this.processQueryResults(res)); } - async getWriterId(targetClient: ClientWrapper): Promise { - const hostInfo: HostInfo = await this.identifyConnection(targetClient, this.hostListProviderService.getDialect()); - return hostInfo ? hostInfo.hostId : null; - } - async forceMonitoringRefresh(shouldVerifyWriter: boolean, timeoutMs: number): Promise { let monitor: ClusterTopologyMonitor = MonitoringRdsHostListProvider.monitors.get( this.clusterId, diff --git a/common/lib/host_list_provider/rds_host_list_provider.ts b/common/lib/host_list_provider/rds_host_list_provider.ts index 05d0ffd7..63f6a323 100644 --- a/common/lib/host_list_provider/rds_host_list_provider.ts +++ b/common/lib/host_list_provider/rds_host_list_provider.ts @@ -148,6 +148,19 @@ export class RdsHostListProvider implements DynamicHostListProvider { } } + async getWriterId(client: ClientWrapper): Promise { + const dialect = this.hostListProviderService.getDialect(); + if (!this.isTopologyAwareDatabaseDialect(dialect)) { + throw new TypeError(Messages.get("RdsHostListProvider.incorrectDialect")); + } + + if (client) { + return await dialect.getWriterId(client); + } else { + throw new AwsWrapperError(Messages.get("AwsClient.targetClientNotDefined")); + } + } + async identifyConnection(targetClient: ClientWrapper, dialect: DatabaseDialect): Promise { if (!this.isTopologyAwareDatabaseDialect(dialect)) { throw new TypeError(Messages.get("RdsHostListProvider.incorrectDialect")); diff --git a/common/lib/plugin_service.ts b/common/lib/plugin_service.ts index e499bbd0..ca314078 100644 --- a/common/lib/plugin_service.ts +++ b/common/lib/plugin_service.ts @@ -175,9 +175,9 @@ export class PluginService implements ErrorHandler, HostListProviderService { } } - async initiateTopologyUpdate(shouldVerifyWriter: boolean, timeoutMs: number): Promise { + async forceMonitoringRefresh(shouldVerifyWriter: boolean, timeoutMs: number): Promise { const hostListProvider = this.getHostListProvider(); - if (!(hostListProvider instanceof MonitoringRdsHostListProvider)) { + if (!("forceMonitoringRefresh" in hostListProvider)) { throw new AwsWrapperError(Messages.get("PluginService.requiredMonitoringRdsHostListProvider"), typeof hostListProvider); } diff --git a/common/lib/plugins/failover/failover_plugin.ts b/common/lib/plugins/failover/failover_plugin.ts index bbbb56a4..cc734472 100644 --- a/common/lib/plugins/failover/failover_plugin.ts +++ b/common/lib/plugins/failover/failover_plugin.ts @@ -71,17 +71,15 @@ export class FailoverPlugin extends AbstractConnectionPlugin { private telemetryFailoverAdditionalTopTraceSetting: boolean = false; private _rdsUrlType: RdsUrlType | null = null; private _isInTransaction: boolean = false; - private _closedExplicitly: boolean = false; private _lastError: any; protected failoverTimeoutMsSetting: number = WrapperProperties.FAILOVER_TIMEOUT_MS.defaultValue; protected failoverClusterTopologyRefreshRateMsSetting: number = WrapperProperties.FAILOVER_CLUSTER_TOPOLOGY_REFRESH_RATE_MS.defaultValue; protected failoverWriterReconnectIntervalMsSetting: number = WrapperProperties.FAILOVER_WRITER_RECONNECT_INTERVAL_MS.defaultValue; protected failoverReaderConnectTimeoutMsSetting: number = WrapperProperties.FAILOVER_READER_CONNECT_TIMEOUT_MS.defaultValue; - protected isClosed: boolean = false; failoverMode: FailoverMode | null = null; private hostListProviderService?: HostListProviderService; - private pluginService: PluginService; + private readonly pluginService: PluginService; protected enableFailoverSetting: boolean = WrapperProperties.ENABLE_CLUSTER_AWARE_FAILOVER.defaultValue; constructor(pluginService: PluginService, properties: Map, rdsHelper: RdsUtils); @@ -229,18 +227,6 @@ export class FailoverPlugin extends AbstractConnectionPlugin { this.telemetryFailoverAdditionalTopTraceSetting = WrapperProperties.TELEMETRY_FAILOVER_ADDITIONAL_TOP_TRACE.get(this._properties); } - private async invalidInvocationOnClosedConnection() { - if (!this._closedExplicitly) { - this.isClosed = false; - await this.pickNewConnection(); - - // "The active SQL connection has changed. Please re-configure session state if required." - logger.debug(Messages.get("Failover.connectionChangedError")); - throw new FailoverSuccessError(Messages.get("Failover.connectionChangedError")); - } - throw new AwsWrapperError(Messages.get("Failover.noOperationsAfterConnectionClosed")); - } - private getCurrentWriter(): HostInfo | null { const topology = this.pluginService.getHosts(); if (topology.length == 0) { @@ -311,10 +297,6 @@ export class FailoverPlugin extends AbstractConnectionPlugin { return await methodFunc(); } - if (this.isClosed) { - await this.invalidInvocationOnClosedConnection(); - } - if (this.canUpdateTopology(methodName)) { await this.updateTopology(false); } @@ -485,10 +467,6 @@ export class FailoverPlugin extends AbstractConnectionPlugin { } async pickNewConnection() { - if (this.isClosed && this._closedExplicitly) { - logger.debug(Messages.get("Failover.transactionResolutionUnknownError")); - return; - } const currentClient = this.pluginService.getCurrentClient(); const currentWriter = this.getCurrentWriter(); if (currentWriter && currentClient.targetClient == null && !this.shouldAttemptReaderConnection()) { diff --git a/common/lib/plugins/failover/reader_failover_handler.ts b/common/lib/plugins/failover/reader_failover_handler.ts index 398c90e4..c751be7d 100644 --- a/common/lib/plugins/failover/reader_failover_handler.ts +++ b/common/lib/plugins/failover/reader_failover_handler.ts @@ -83,7 +83,7 @@ export class ClusterAwareReaderFailoverHandler implements ReaderFailoverHandler const timer: any = {}; const endTime = Date.now() + this.maxFailoverTimeoutMs; - const timeoutTask = getTimeoutTask(timer, "Internal failover task timed out.", this.maxFailoverTimeoutMs); + const timeoutTask = getTimeoutTask(timer, Messages.get("Failover.timeoutError"), this.maxFailoverTimeoutMs); const failoverTask = this.internalFailoverTask(hosts, currentHost, endTime); return await Promise.race([timeoutTask, failoverTask]) @@ -130,7 +130,7 @@ export class ClusterAwareReaderFailoverHandler implements ReaderFailoverHandler await sleep(1000); } } - throw new InternalQueryTimeoutError("Internal failover task has timed out."); + throw new InternalQueryTimeoutError(Messages.get("Failover.timeoutError")); } async failoverInternal(hosts: HostInfo[], currentHost: HostInfo | null): Promise { diff --git a/common/lib/plugins/failover2/failover2_plugin.ts b/common/lib/plugins/failover2/failover2_plugin.ts index d8681d11..fd5fd60f 100644 --- a/common/lib/plugins/failover2/failover2_plugin.ts +++ b/common/lib/plugins/failover2/failover2_plugin.ts @@ -30,25 +30,24 @@ import { AwsWrapperError, FailoverFailedError, FailoverSuccessError, + InternalQueryTimeoutError, TransactionResolutionUnknownError, UnavailableHostError } from "../../utils/errors"; -import { shuffleList } from "../../utils/utils"; +import { logTopology } from "../../utils/utils"; import { ClientWrapper } from "../../client_wrapper"; import { HostAvailability } from "../../host_availability/host_availability"; import { TelemetryTraceLevel } from "../../utils/telemetry/telemetry_trace_level"; import { HostRole } from "../../host_role"; -import { SubscribedMethodHelper } from "../../utils/subscribed_method_helper"; -import { OldConnectionSuggestionAction } from "../../old_connection_suggestion_action"; -import { HostChangeOptions } from "../../host_change_options"; import { CanReleaseResources } from "../../can_release_resources"; import { MonitoringRdsHostListProvider } from "../../host_list_provider/monitoring/monitoring_host_list_provider"; +import { ReaderFailoverResult } from "../failover/reader_failover_result"; export class Failover2Plugin extends AbstractConnectionPlugin implements CanReleaseResources { private static readonly TELEMETRY_WRITER_FAILOVER = "failover to writer instance"; - private static readonly TELEMETRY_READER_FAILOVER = "failover to replica"; + private static readonly TELEMETRY_READER_FAILOVER = "failover to reader"; private static readonly METHOD_END = "end"; - private static readonly subscribedMethods: Set = new Set(["initHostProvider", "connect", "query", "notifyConnectionChanged"]); + private static readonly SUBSCRIBED_METHODS: Set = new Set(["initHostProvider", "connect", "query"]); static readonly INTERNAL_CONNECT_PROPERTY_NAME: string = "monitoring_76c06979-49c4-4c86-9600-a63605b83f50"; private readonly _staleDnsHelper: StaleDnsHelper; private readonly _properties: Map; @@ -63,14 +62,13 @@ export class Failover2Plugin extends AbstractConnectionPlugin implements CanRele private telemetryFailoverAdditionalTopTraceSetting: boolean = false; private _rdsUrlType: RdsUrlType | null = null; private _isInTransaction: boolean = false; - private _closedExplicitly: boolean = false; private _lastError: any; - protected isClosed: boolean = false; failoverMode: FailoverMode = FailoverMode.UNKNOWN; private hostListProviderService?: HostListProviderService; protected enableFailoverSetting: boolean = WrapperProperties.ENABLE_CLUSTER_AWARE_FAILOVER.defaultValue; - private failoverTimeoutSettingMs: number = WrapperProperties.FAILOVER_TIMEOUT_MS.defaultValue; + private readonly failoverTimeoutSettingMs: number = WrapperProperties.FAILOVER_TIMEOUT_MS.defaultValue; + private readonly failoverReaderHostSelectorStrategy: string = WrapperProperties.FAILOVER_READER_HOST_SELECTOR_STRATEGY.defaultValue; constructor(pluginService: PluginService, properties: Map, rdsHelper: RdsUtils) { super(); @@ -80,6 +78,7 @@ export class Failover2Plugin extends AbstractConnectionPlugin implements CanRele this._staleDnsHelper = new StaleDnsHelper(this.pluginService); this.enableFailoverSetting = WrapperProperties.ENABLE_CLUSTER_AWARE_FAILOVER.get(this._properties); this.failoverTimeoutSettingMs = WrapperProperties.FAILOVER_TIMEOUT_MS.get(this._properties); + this.failoverReaderHostSelectorStrategy = WrapperProperties.FAILOVER_READER_HOST_SELECTOR_STRATEGY.get(this._properties); const telemetryFactory = this.pluginService.getTelemetryFactory(); this.failoverWriterTriggeredCounter = telemetryFactory.createCounter("writerFailover.triggered.count"); @@ -91,7 +90,7 @@ export class Failover2Plugin extends AbstractConnectionPlugin implements CanRele } override getSubscribedMethods(): Set { - return Failover2Plugin.subscribedMethods; + return Failover2Plugin.SUBSCRIBED_METHODS; } override initHostProvider( @@ -107,7 +106,7 @@ export class Failover2Plugin extends AbstractConnectionPlugin implements CanRele initHostProviderFunc(); - this.failoverMode = failoverModeFromValue(WrapperProperties.FAILOVER_MODE.get(this._properties)); + this.failoverMode = failoverModeFromValue(WrapperProperties.FAILOVER_MODE.get(props)); this._rdsUrlType = this._rdsHelper.identifyRdsType(hostInfo.host); if (this.failoverMode === FailoverMode.UNKNOWN) { @@ -117,10 +116,6 @@ export class Failover2Plugin extends AbstractConnectionPlugin implements CanRele logger.debug(Messages.get("Failover.parameterValue", "failoverMode", FailoverMode[this.failoverMode])); } - override notifyConnectionChanged(changes: Set): Promise { - return Promise.resolve(OldConnectionSuggestionAction.NO_OPINION); - } - private isFailoverEnabled(): boolean { return ( this.enableFailoverSetting && @@ -130,18 +125,6 @@ export class Failover2Plugin extends AbstractConnectionPlugin implements CanRele ); } - private async invalidInvocationOnClosedConnection() { - if (!this._closedExplicitly) { - this.isClosed = false; - await this.pickNewConnection(); - - // "The active SQL connection has changed. Please re-configure session state if required." - logger.debug(Messages.get("Failover.connectionChangedError")); - throw new FailoverSuccessError(Messages.get("Failover.connectionChangedError")); - } - throw new AwsWrapperError(Messages.get("Failover.noOperationsAfterConnectionClosed")); - } - async updateTopology() { const client = this.pluginService.getCurrentClient(); if (!this.isFailoverEnabled() || !(await client.isValid())) { @@ -162,14 +145,14 @@ export class Failover2Plugin extends AbstractConnectionPlugin implements CanRele return await this._staleDnsHelper.getVerifiedConnection(hostInfo.host, isInitialConnection, this.hostListProviderService!, props, connectFunc); } - let client: ClientWrapper = null; - + // Failover is not enabled, does not require additional processing. if (!this.enableFailoverSetting || !WrapperProperties.ENABLE_CLUSTER_AWARE_FAILOVER.get(props)) { return await this._staleDnsHelper.getVerifiedConnection(hostInfo.host, isInitialConnection, this.hostListProviderService!, props, connectFunc); } const hostInfoWithAvailability: HostInfo = this.pluginService.getHosts().find((x) => x.getHostAndPort() === hostInfo.getHostAndPort()); + let client: ClientWrapper = null; if (!hostInfoWithAvailability || hostInfoWithAvailability.getAvailability() != HostAvailability.NOT_AVAILABLE) { try { client = await this._staleDnsHelper.getVerifiedConnection( @@ -187,7 +170,8 @@ export class Failover2Plugin extends AbstractConnectionPlugin implements CanRele this.pluginService.setAvailability(hostInfo.allAliases, HostAvailability.NOT_AVAILABLE); try { - await this.failover(hostInfo); + // Unable to directly connect, attempt failover. + await this.failover(); } catch (error) { if (error instanceof FailoverSuccessError) { client = this.pluginService.getCurrentClient().targetClient; @@ -198,8 +182,9 @@ export class Failover2Plugin extends AbstractConnectionPlugin implements CanRele } } else { try { + // Host is unavailable or not part of the topology. Try to refresh host list and failover. await this.pluginService.refreshHostList(); - await this.failover(hostInfo); + await this.failover(); } catch (error) { if (error instanceof FailoverSuccessError) { client = this.pluginService.getCurrentClient().targetClient; @@ -210,7 +195,7 @@ export class Failover2Plugin extends AbstractConnectionPlugin implements CanRele } if (!client) { - // This should be unreachable, the above logic will either get a connection successfully or throw an exception. + // This should be unreachable, the above logic will either get a connection successfully or throw an error. throw new AwsWrapperError(Messages.get("Failover2.unableToConnect")); } @@ -222,46 +207,35 @@ export class Failover2Plugin extends AbstractConnectionPlugin implements CanRele } override async execute(methodName: string, methodFunc: () => Promise): Promise { - try { - // Verify there aren't any unexpected error emitted while the connection was idle. - if (this.pluginService.hasNetworkError()) { - // Throw the unexpected error directly to be handled. - throw this.pluginService.getUnexpectedError(); - } - - if (!this.enableFailoverSetting || this.canDirectExecute(methodName)) { - return await methodFunc(); - } - - if (this.isClosed) { - await this.invalidInvocationOnClosedConnection(); - } - - if (this.canUpdateTopology(methodName)) { - await this.updateTopology(); - } + // Verify there weren't any unexpected errors emitted while the connection was idle. + if (this.pluginService.hasNetworkError()) { + // Throw the unexpected error directly to be handled. + throw this.pluginService.getUnexpectedError(); + } + if (!this.enableFailoverSetting || this.canDirectExecute(methodName)) { return await methodFunc(); - } catch (e: any) { - logger.debug(Messages.get("Failover.detectedError", e.message)); - if (this._lastError !== e && this.shouldErrorTriggerClientSwitch(e)) { + } + + let result: T; + try { + result = await methodFunc(); + } catch (error) { + logger.debug(Messages.get("Failover.detectedError", error.message)); + if (this._lastError !== error && this.shouldErrorTriggerClientSwitch(error)) { await this.invalidateCurrentClient(); - const currentHostInfo = this.pluginService.getCurrentHostInfo(); + const currentHostInfo: HostInfo = this.pluginService.getCurrentHostInfo(); if (currentHostInfo !== null) { this.pluginService.setAvailability(currentHostInfo.allAliases ?? new Set(), HostAvailability.NOT_AVAILABLE); } - - this._lastError = e; - await this.pickNewConnection(); + await this.failover(); + this._lastError = error; } - - throw e; } + return result; } - async failover(failedHost: HostInfo) { - this.pluginService.setAvailability(failedHost.allAliases, HostAvailability.NOT_AVAILABLE); // checking if needed - + async failover() { if (this.failoverMode === FailoverMode.STRICT_WRITER) { await this.failoverWriter(); } else { @@ -291,57 +265,121 @@ export class Failover2Plugin extends AbstractConnectionPlugin implements CanRele try { logger.debug(Messages.get("Failover.startReaderFailover")); await telemetryContext.start(async () => { - if (!(await this.pluginService.initiateTopologyUpdate(false, 0))) { + if (!(await this.pluginService.forceMonitoringRefresh(false, 0))) { // Unable to establish SQL connection to an instance. this.failoverReaderFailedCounter.inc(); - logger.warn(Messages.get("Failover2.unableToFetchTopology")); + logger.error(Messages.get("Failover2.unableToFetchTopology")); + throw new FailoverFailedError(Messages.get("Failover.unableToConnectToReader")); + } + try { + const result: ReaderFailoverResult = await this.getReaderFailoverConnection(failoverEndTimeMs); + logger.info(Messages.get("Failover.establishedConnection", result.newHost.host)); + this.failoverReaderSuccessCounter.inc(); + await this.pluginService.abortCurrentClient(); + await this.pluginService.setCurrentClient(result.client, result.newHost); + this.pluginService.getCurrentHostInfo()?.removeAlias(Array.from(oldAliases)); + await this.pluginService.forceRefreshHostList(); + } catch (error) { + this.failoverReaderFailedCounter.inc(); + logger.error(Messages.get("Failover2.unableToFetchTopology")); throw new FailoverFailedError(Messages.get("Failover.unableToConnectToReader")); } + }); + } finally { + if (this.telemetryFailoverAdditionalTopTraceSetting) { + await telemetryFactory.postCopy(telemetryContext, TelemetryTraceLevel.FORCE_TOP_LEVEL); + } + } + } - // Signal to connect that this is an internal call and does not require additional processing. - const copyProps = new Map(this._properties); - copyProps.set(Failover2Plugin.INTERNAL_CONNECT_PROPERTY_NAME, true); + private async getReaderFailoverConnection(failoverEndTimeMs: number): Promise { + // The roles in the host list may not be accurate, depending on whether the new topology has become available yet. + const hosts = this.pluginService.getHosts(); + const readerCandidates = hosts.filter((x) => x.role === HostRole.READER); + //new Map(hosts.filter((x) => x.role === HostRole.READER).map((x) => [x.hostId, x])); + const originalWriter: HostInfo = hosts.find((x) => x.role === HostRole.WRITER); + let isOriginalWriterStillWriter: boolean = false; + // Signal to connect that this is an internal call and does not require additional processing. + const copyProps = new Map(this._properties); + copyProps.set(Failover2Plugin.INTERNAL_CONNECT_PROPERTY_NAME, true); + + while (Date.now() < failoverEndTimeMs) { + // Try all the original readers. + const remainingReaders = readerCandidates; + while (remainingReaders.length > 0 && Date.now() < failoverEndTimeMs) { + let readerCandidate: HostInfo = null; + try { + readerCandidate = this.pluginService.getHostInfoByStrategy(HostRole.READER, this.failoverReaderHostSelectorStrategy, remainingReaders); + } catch (error) { + logger.info(Messages.get("Failover2.errorSelectingReaderHost", error.message)); + } - while (Date.now() < failoverEndTimeMs) { - const hosts: HostInfo[] = this.pluginService.getHosts(); - const hostsByPriority = this.getHostsByPriority(hosts); - let readerCandidateClient: ClientWrapper = null; - let readerCandidateHostInfo: HostInfo = null; - - while (readerCandidateClient === null && hostsByPriority.length > 0 && Date.now() < failoverEndTimeMs) { - readerCandidateHostInfo = hostsByPriority.shift(); - try { - readerCandidateClient = await this.pluginService.connect(readerCandidateHostInfo, copyProps); - if ( - readerCandidateClient && - ((await this.pluginService.getHostRole(readerCandidateClient)) === HostRole.READER || this.failoverMode != FailoverMode.STRICT_READER) - ) { - logger.info(Messages.get("Failover.establishedConnection", readerCandidateHostInfo.host)); - this.pluginService.getCurrentHostInfo()?.removeAlias(Array.from(oldAliases)); - await this.pluginService.abortCurrentClient(); - await this.pluginService.setCurrentClient(readerCandidateClient, readerCandidateHostInfo); - await this.updateTopology(); - this.failoverReaderSuccessCounter.inc(); - return; + if (readerCandidate === null) { + logger.info(Messages.get("Failover2.readerCandidateNull")); + } else { + try { + const candidateClient: ClientWrapper = await this.pluginService.connect(readerCandidate, copyProps); + const role: HostRole = await this.pluginService.getHostRole(candidateClient); + if (role === HostRole.READER || this.failoverMode !== FailoverMode.STRICT_READER) { + if (role !== readerCandidate.role) { + // Update HostInfo to reflect correct HostRole. + readerCandidate = this.pluginService.getHostInfoBuilder().copyFrom(readerCandidate).withRole(role).build(); } - logger.debug(Messages.get("Failover2.failoverReaderNotConnectedToReader", readerCandidateHostInfo?.hostId)); - await readerCandidateClient.end(); - readerCandidateClient = null; - } catch (err) { - readerCandidateClient = null; + return new ReaderFailoverResult(candidateClient, readerCandidate, true); } + + // Unable to fail over to readerCandidate, remove from remaining readers to try. + remainingReaders.splice(remainingReaders.indexOf(readerCandidate), 1); + await candidateClient.end(); + + if (role === HostRole.WRITER) { + // The readerCandidate is a writer, remove it from the list of reader candidates. + readerCandidates.splice(readerCandidates.indexOf(readerCandidate), 1); + } else { + logger.info(Messages.get("Failover2.strictReaderUnknownHostRole")); + } + } catch { + // Unable to connect to readerCandidate, remove from remaining readers to try. + remainingReaders.splice(remainingReaders.indexOf(readerCandidate), 1); } } + } - logger.warn(Messages.get("Failover.unableToConnectToReader")); - this.failoverReaderFailedCounter.inc(); - throw new FailoverFailedError(Messages.get("Failover.unableToConnectToReader")); - }); - } finally { - if (this.telemetryFailoverAdditionalTopTraceSetting) { - await telemetryFactory.postCopy(telemetryContext, TelemetryTraceLevel.FORCE_TOP_LEVEL); + // Unable to connect to any of the original readers, try to connect to original writer. + if (originalWriter === null || Date.now() > failoverEndTimeMs) { + // No writer found in topology, or we have timed out. + continue; + } + + if (this.failoverMode === FailoverMode.STRICT_READER && isOriginalWriterStillWriter) { + // Original writer has been verified, and it is not valid in strict-reader mode. + continue; + } + + // Try the original writer, which may have been demoted. + try { + const candidateClient: ClientWrapper = await this.pluginService.connect(originalWriter, copyProps); + const role: HostRole = await this.pluginService.getHostRole(candidateClient); + if (role === HostRole.READER || this.failoverMode != FailoverMode.STRICT_READER) { + const updatedHostInfo: HostInfo = this.pluginService.getHostInfoBuilder().copyFrom(originalWriter).withRole(role).build(); + return new ReaderFailoverResult(candidateClient, updatedHostInfo, true); + } + + await candidateClient.end(); + + if (role === HostRole.WRITER) { + // Verify that writer has not been demoted, will not try to connect again. + isOriginalWriterStillWriter = true; + } else { + logger.info(Messages.get("Failover2.strictReaderUnknownHostRole")); + } + } catch { + logger.info(Messages.get("Failover.unableToConnectToReader")); } } + + logger.error(Messages.get("Failover.timeoutError")); + throw new InternalQueryTimeoutError(Messages.get("Failover.timeoutError")); } async failoverWriter() { @@ -352,14 +390,13 @@ export class Failover2Plugin extends AbstractConnectionPlugin implements CanRele try { logger.debug(Messages.get("Failover.startWriterFailover")); await telemetryContext.start(async () => { - if (!(await this.pluginService.initiateTopologyUpdate(true, this.failoverTimeoutSettingMs))) { + if (!(await this.pluginService.forceMonitoringRefresh(true, this.failoverTimeoutSettingMs))) { // Unable to establish SQL connection to writer node. this.failoverWriterFailedCounter.inc(); - logger.warn(Messages.get("Failover2.unableToFetchTopology")); + logger.error(Messages.get("Failover2.unableToFetchTopology")); throw new FailoverFailedError(Messages.get("Failover.unableToConnectToWriter")); } - await this.updateTopology(); const hosts: HostInfo[] = this.pluginService.getHosts(); // Signal to connect that this is an internal call and does not require additional processing. @@ -373,12 +410,14 @@ export class Failover2Plugin extends AbstractConnectionPlugin implements CanRele try { writerCandidateClient = await this.pluginService.connect(writerCandidateHostInfo, copyProps); } catch (err) { - // Do nothing. + logger.error(Messages.get("Failover.unableToConnectToWriter")); + this.failoverWriterFailedCounter.inc(); + throw new FailoverFailedError(Messages.get("Failover.unableToConnectToWriter")); } } if (!writerCandidateClient) { - logger.warn(Messages.get("Failover.unableToConnectToWriter")); + logger.error(Messages.get("Failover.unableToConnectToWriter")); this.failoverWriterFailedCounter.inc(); throw new FailoverFailedError(Messages.get("Failover.unableToConnectToWriter")); } @@ -389,7 +428,7 @@ export class Failover2Plugin extends AbstractConnectionPlugin implements CanRele } catch (error) { // Do nothing. } - logger.warn(Messages.get("Failover2.failoverWriterConnectedToReader")); + logger.error(Messages.get("Failover2.failoverWriterConnectedToReader")); this.failoverWriterFailedCounter.inc(); throw new FailoverFailedError(Messages.get("Failover2.failoverWriterConnectedToReader")); } @@ -397,7 +436,6 @@ export class Failover2Plugin extends AbstractConnectionPlugin implements CanRele await this.pluginService.abortCurrentClient(); await this.pluginService.setCurrentClient(writerCandidateClient, writerCandidateHostInfo); logger.info(Messages.get("Failover.establishedConnection", writerCandidateHostInfo.host)); - await this.updateTopology(); this.failoverWriterSuccessCounter.inc(); }); } finally { @@ -407,36 +445,6 @@ export class Failover2Plugin extends AbstractConnectionPlugin implements CanRele } } - getHostsByPriority(hosts: HostInfo[]): HostInfo[] { - const activeReaders: HostInfo[] = []; - const downHostList: HostInfo[] = []; - let writerHost: HostInfo | undefined; - hosts.forEach((host) => { - if (host.role === HostRole.WRITER) { - writerHost = host; - return; - } - - if (host.availability === HostAvailability.AVAILABLE) { - activeReaders.push(host); - } else { - downHostList.push(host); - } - }); - - shuffleList(activeReaders); - shuffleList(downHostList); - - const hostsByPriority: HostInfo[] = [...activeReaders]; - const numReaders: number = activeReaders.length + downHostList.length; - if (writerHost && (!(this.failoverMode === FailoverMode.STRICT_READER) || numReaders === 0)) { - hostsByPriority.push(writerHost); - } - hostsByPriority.push(...downHostList); - - return hostsByPriority; - } - async invalidateCurrentClient() { const client = this.pluginService.getCurrentClient(); if (!client || !client.targetClient) { @@ -462,19 +470,6 @@ export class Failover2Plugin extends AbstractConnectionPlugin implements CanRele } } - async pickNewConnection() { - if (this.isClosed && this._closedExplicitly) { - logger.debug(Messages.get("Failover.connectionExplicitlyClosed")); - return; - } - - await this.failover(this.pluginService.getCurrentHostInfo()); - } - - private canUpdateTopology(methodName: string) { - return SubscribedMethodHelper.METHODS_REQUIRING_UPDATED_TOPOLOGY.indexOf(methodName) > -1; - } - private canDirectExecute(methodName: string): boolean { return methodName === Failover2Plugin.METHOD_END; } diff --git a/common/lib/topology_aware_database_dialect.ts b/common/lib/topology_aware_database_dialect.ts index b8b0ce9d..37e459cf 100644 --- a/common/lib/topology_aware_database_dialect.ts +++ b/common/lib/topology_aware_database_dialect.ts @@ -25,4 +25,6 @@ export interface TopologyAwareDatabaseDialect { identifyConnection(targetClient: ClientWrapper): Promise; getHostRole(client: ClientWrapper): Promise; + + getWriterId(client: ClientWrapper): Promise; } diff --git a/common/lib/utils/locales/en.json b/common/lib/utils/locales/en.json index f1ec02a3..45a1c794 100644 --- a/common/lib/utils/locales/en.json +++ b/common/lib/utils/locales/en.json @@ -64,6 +64,7 @@ "Failover.noOperationsAfterConnectionClosed": "No operations allowed after client ended.", "Failover.transactionResolutionUnknownError": "Unknown transaction resolution error occurred during failover.", "Failover.connectionExplicitlyClosed": "Unable to failover on an explicitly closed connection.", + "Failover.timeoutError": "Internal failover task has timed out.", "StaleDnsHelper.clusterEndpointDns": "Cluster endpoint resolves to '%s'.", "StaleDnsHelper.writerHostInfo": "Writer host: '%s'.", "StaleDnsHelper.writerInetAddress": "Writer host address: '%s'", @@ -202,9 +203,13 @@ "Failover2.failoverReaderNotConnectedToReader": "Unable to establish SQL connection to the instance '%s' as a reader.", "Failover2.failoverWriterConnectedToReader": "Unable to establish SQL connection to a writer instance.", "Failover2.unableToFetchTopology": "Unable to establish SQL connection and fetch topology.", + "Failover2.errorSelectingReaderHost": "An error occurred while attempting to select a reader host candidate: '%s'.", + "Failover2.readerCandidateNull": "Reader candidate unable to be selected: '%s'.", + "Failover2.strictReaderUnknownHostRole": "Unknown host role of reader candidate in strict reader failoverMode.", "ClusterTopologyMonitor.timeoutError": "ClusterTopologyMonitor topology update timed out in '%s' ms.", "ClusterTopologyMonitor.errorFetchingTopology": "Error fetching topology: '%s'.", - "ClusterTopologyMonitor.unableToConnect": "Could not connect to inital host: '%s'.", + "ClusterTopologyMonitoring.ignoringTopologyRequest": "Previous failover has just completed, ignoring topology request.", + "ClusterTopologyMonitor.unableToConnect": "Could not connect to initial host: '%s'.", "ClusterTopologyMonitor.openedMonitoringConnection": "Opened monitoring connection to: '%s'.", "ClusterTopologyMonitor.startMonitoring": "Start cluster monitoring task.", "ClusterTopologyMonitor.errorDuringMonitoring": "Error thrown during cluster topology monitoring: '%s'.", diff --git a/common/lib/wrapper_property.ts b/common/lib/wrapper_property.ts index c96d5f9e..452aead7 100644 --- a/common/lib/wrapper_property.ts +++ b/common/lib/wrapper_property.ts @@ -195,6 +195,12 @@ export class WrapperProperties { ); static readonly FAILOVER_MODE = new WrapperProperty("failoverMode", "Set host role to follow during failover.", ""); + static readonly FAILOVER_READER_HOST_SELECTOR_STRATEGY = new WrapperProperty( + "failoverReaderHostSelectorStrategy", + "The strategy that should be used to select a new reader host while opening a new connection.", + "random" + ); + static readonly CLUSTER_TOPOLOGY_REFRESH_RATE_MS = new WrapperProperty( "clusterTopologyRefreshRateMs", "Cluster topology refresh rate in millis. " + diff --git a/mysql/lib/dialect/aurora_mysql_database_dialect.ts b/mysql/lib/dialect/aurora_mysql_database_dialect.ts index 0f97a295..285b78a0 100644 --- a/mysql/lib/dialect/aurora_mysql_database_dialect.ts +++ b/mysql/lib/dialect/aurora_mysql_database_dialect.ts @@ -36,6 +36,10 @@ export class AuroraMySQLDatabaseDialect extends MySQLDatabaseDialect implements "WHERE time_to_sec(timediff(now(), LAST_UPDATE_TIMESTAMP)) <= 300 OR SESSION_ID = 'MASTER_SESSION_ID' "; private static readonly HOST_ID_QUERY: string = "SELECT @@aurora_server_id as host"; private static readonly IS_READER_QUERY: string = "SELECT @@innodb_read_only as is_reader"; + private static readonly IS_WRITER_QUERY: string = + "SELECT server_id " + + "FROM information_schema.replica_host_status " + + "WHERE SESSION_ID = 'MASTER_SESSION_ID' AND SERVER_ID = @@aurora_server_id"; private static readonly AURORA_VERSION_QUERY = "SHOW VARIABLES LIKE 'aurora_version'"; getHostListProvider(props: Map, originalUrl: string, hostListProviderService: HostListProviderService): HostListProvider { @@ -70,7 +74,12 @@ export class AuroraMySQLDatabaseDialect extends MySQLDatabaseDialect implements async getHostRole(targetClient: ClientWrapper): Promise { const res = await targetClient.query(AuroraMySQLDatabaseDialect.IS_READER_QUERY); - return Promise.resolve(res[0]["is_reader"] === "true" ? HostRole.READER : HostRole.WRITER); + return Promise.resolve(res[0][0]["is_reader"] === 1 ? HostRole.READER : HostRole.WRITER); + } + + async getWriterId(targetClient: ClientWrapper): Promise { + const res = await targetClient.query(AuroraMySQLDatabaseDialect.IS_WRITER_QUERY); + return Promise.resolve(res[0][0]["server_id"] ? res[0][0]["server_id"] : null); } async isDialect(targetClient: ClientWrapper): Promise { diff --git a/mysql/lib/dialect/rds_multi_az_mysql_database_dialect.ts b/mysql/lib/dialect/rds_multi_az_mysql_database_dialect.ts index 52da918b..80319707 100644 --- a/mysql/lib/dialect/rds_multi_az_mysql_database_dialect.ts +++ b/mysql/lib/dialect/rds_multi_az_mysql_database_dialect.ts @@ -121,6 +121,10 @@ export class RdsMultiAZMySQLDatabaseDialect extends MySQLDatabaseDialect impleme return (await this.executeTopologyRelatedQuery(client, RdsMultiAZMySQLDatabaseDialect.IS_READER_QUERY)) ? HostRole.WRITER : HostRole.READER; } + getWriterId(client: ClientWrapper): Promise { + throw new Error("Method not implemented."); + } + async identifyConnection(client: ClientWrapper): Promise { return await this.executeTopologyRelatedQuery( client, diff --git a/pg/lib/dialect/aurora_pg_database_dialect.ts b/pg/lib/dialect/aurora_pg_database_dialect.ts index d60d4cd0..d8f7a70c 100644 --- a/pg/lib/dialect/aurora_pg_database_dialect.ts +++ b/pg/lib/dialect/aurora_pg_database_dialect.ts @@ -40,6 +40,8 @@ export class AuroraPgDatabaseDialect extends PgDatabaseDialect implements Topolo "SELECT (setting LIKE '%aurora_stat_utils%') AS aurora_stat_utils FROM pg_settings WHERE name='rds.extensions'"; private static readonly HOST_ID_QUERY: string = "SELECT aurora_db_instance_identifier() as host"; private static readonly IS_READER_QUERY: string = "SELECT pg_is_in_recovery() as is_reader"; + private static readonly IS_WRITER_QUERY: string = + "SELECT server_id " + "FROM aurora_replica_status() " + "WHERE SESSION_ID = 'MASTER_SESSION_ID' AND SERVER_ID = aurora_db_instance_identifier()"; getHostListProvider(props: Map, originalUrl: string, hostListProviderService: HostListProviderService): HostListProvider { if (WrapperProperties.PLUGINS.get(props).includes("failover2")) { @@ -76,6 +78,11 @@ export class AuroraPgDatabaseDialect extends PgDatabaseDialect implements Topolo return Promise.resolve(res.rows[0]["is_reader"] === true ? HostRole.READER : HostRole.WRITER); } + async getWriterId(targetClient: ClientWrapper): Promise { + const res = await targetClient.query(AuroraPgDatabaseDialect.IS_WRITER_QUERY); + return Promise.resolve(res.rows[0]["server_id"] ? res.rows[0]["server_id"] : null); + } + async isDialect(targetClient: ClientWrapper): Promise { if (!(await super.isDialect(targetClient))) { return false; diff --git a/pg/lib/dialect/rds_multi_az_pg_database_dialect.ts b/pg/lib/dialect/rds_multi_az_pg_database_dialect.ts index 521c34e2..12ed88c9 100644 --- a/pg/lib/dialect/rds_multi_az_pg_database_dialect.ts +++ b/pg/lib/dialect/rds_multi_az_pg_database_dialect.ts @@ -128,6 +128,10 @@ export class RdsMultiAZPgDatabaseDialect extends PgDatabaseDialect implements To return (await this.executeTopologyRelatedQuery(client, RdsMultiAZPgDatabaseDialect.IS_READER_QUERY)) ? HostRole.WRITER : HostRole.READER; } + getWriterId(client: ClientWrapper): Promise { + throw new Error("Method not implemented."); + } + getErrorHandler(): ErrorHandler { return new MultiAzPgErrorHandler(); } diff --git a/tests/unit/failover2_plugin.test.ts b/tests/unit/failover2_plugin.test.ts index 1388cb5c..4e700415 100644 --- a/tests/unit/failover2_plugin.test.ts +++ b/tests/unit/failover2_plugin.test.ts @@ -28,9 +28,8 @@ import { RdsUrlType } from "../../common/lib/utils/rds_url_type"; import { RdsUtils } from "../../common/lib/utils/rds_utils"; import { WrapperProperties } from "../../common/lib/wrapper_property"; import { AwsMySQLClient } from "../../mysql/lib"; -import { anything, instance, mock, reset, resetCalls, spy, verify, when } from "ts-mockito"; +import { anything, instance, mock, reset, spy, verify, when } from "ts-mockito"; import { Messages } from "../../common/lib/utils/messages"; -import { HostChangeOptions } from "../../common/lib/host_change_options"; import { NullTelemetryFactory } from "../../common/lib/utils/telemetry/null_telemetry_factory"; import { MySQLClientWrapper } from "../../common/lib/mysql_client_wrapper"; import { MySQL2DriverDialect } from "../../mysql/lib/dialect/mysql2_driver_dialect"; @@ -115,7 +114,7 @@ describe("reader failover handler", () => { when(spyPlugin.failoverWriter()).thenResolve(); plugin.failoverMode = FailoverMode.STRICT_WRITER; - await expect(plugin.failover(instance(mockHostInfo))).rejects.toThrow( + await expect(plugin.failover()).rejects.toThrow( new TransactionResolutionUnknownError(Messages.get("Failover.transactionResolutionUnknownError")) ); @@ -126,26 +125,27 @@ describe("reader failover handler", () => { when(mockPluginService.isInTransaction()).thenReturn(false); initializePlugin(instance(mockPluginService)); - const mockHostInfoInstance: HostInfo = instance(mockHostInfo); const spyPlugin: Failover2Plugin = spy(plugin); when(spyPlugin.failoverReader()).thenResolve(); plugin.failoverMode = FailoverMode.READER_OR_WRITER; - await expect(plugin.failover(mockHostInfoInstance)).rejects.toThrow(new FailoverSuccessError(Messages.get("Failover.connectionChangedError"))); + await expect(plugin.failover()).rejects.toThrow(new FailoverSuccessError(Messages.get("Failover.connectionChangedError"))); verify(spyPlugin.failoverReader()).once(); }); it("test failover reader success", async () => { - const hostInfo = builder.withHost("hostA").build(); + const hostInfo = builder.withHost("hostA").withRole(HostRole.READER).build(); const hosts = [hostInfo]; when(mockHostInfo.allAliases).thenReturn(new Set(["alias1", "alias2"])); when(mockHostInfo.getRawAvailability()).thenReturn(HostAvailability.AVAILABLE); when(mockPluginService.getHosts()).thenReturn(hosts); - when(mockPluginService.initiateTopologyUpdate(false, anything())).thenResolve(true); - when(mockPluginService.connect(hostInfo, anything())).thenResolve(mockClientWrapper); + when(mockPluginService.getHostInfoByStrategy(HostRole.READER, anything(), anything())).thenReturn(mockHostInfo); + when(mockPluginService.forceMonitoringRefresh(false, anything())).thenResolve(true); + when(mockPluginService.connect(anything(), anything())).thenResolve(mockClientWrapper); when(mockPluginService.getHostRole(mockClientWrapper)).thenResolve(HostRole.READER); + when(mockPluginService.getHostInfoBuilder()).thenReturn(builder); const mockPluginServiceInstance = instance(mockPluginService); const mockHostInfoInstance = instance(mockHostInfo); @@ -158,7 +158,7 @@ describe("reader failover handler", () => { await plugin.failoverReader(); - verify(mockPluginService.setCurrentClient(mockClientWrapper, hostInfo)).once(); + verify(mockPluginService.setCurrentClient(mockClientWrapper, anything())).once(); }); it("test failover reader - connection throws error", async () => { @@ -169,7 +169,7 @@ describe("reader failover handler", () => { when(mockHostInfo.allAliases).thenReturn(new Set(["alias1", "alias2"])); when(mockHostInfo.getRawAvailability()).thenReturn(HostAvailability.AVAILABLE); when(mockPluginService.getHosts()).thenReturn(hosts); - when(mockPluginService.initiateTopologyUpdate(true, anything())).thenResolve(true); + when(mockPluginService.forceMonitoringRefresh(true, anything())).thenResolve(true); when(mockPluginService.connect(mockHostInfo, anything())).thenReject(test); const mockHostInfoInstance = instance(mockHostInfo); @@ -194,7 +194,7 @@ describe("reader failover handler", () => { when(mockHostInfo.allAliases).thenReturn(new Set(["alias1", "alias2"])); when(mockPluginService.getHosts()).thenReturn(hosts); - when(mockPluginService.initiateTopologyUpdate(true, anything())).thenResolve(false); + when(mockPluginService.forceMonitoringRefresh(true, anything())).thenResolve(false); const mockHostInfoInstance = instance(mockHostInfo); const mockPluginServiceInstance = instance(mockPluginService); @@ -218,7 +218,7 @@ describe("reader failover handler", () => { when(mockHostInfo.allAliases).thenReturn(new Set(["alias1", "alias2"])); when(mockPluginService.getHosts()).thenReturn(hosts); - when(mockPluginService.initiateTopologyUpdate(true, anything())).thenResolve(true); + when(mockPluginService.forceMonitoringRefresh(true, anything())).thenResolve(true); when(mockPluginService.connect(mockHostInfo, anything())).thenResolve(null); const mockHostInfoInstance = instance(mockHostInfo); @@ -236,17 +236,17 @@ describe("reader failover handler", () => { } } - verify(mockPluginService.initiateTopologyUpdate(true, anything())).once(); + verify(mockPluginService.forceMonitoringRefresh(true, anything())).once(); verify(mockPluginService.setCurrentClient(anything(), anything())).never(); }); it("test failover writer success", async () => { - const hostInfo = builder.withHost("hostA").build(); + const hostInfo = builder.withHost("hostA").withRole(HostRole.WRITER).build(); const hosts = [hostInfo]; when(mockHostInfo.allAliases).thenReturn(new Set(["alias1", "alias2"])); when(mockPluginService.getHosts()).thenReturn(hosts); - when(mockPluginService.initiateTopologyUpdate(true, anything())).thenResolve(true); + when(mockPluginService.forceMonitoringRefresh(true, anything())).thenResolve(true); when(mockPluginService.connect(hostInfo, anything())).thenResolve(mockClientWrapper); when(mockPluginService.getHostRole(mockClientWrapper)).thenResolve(HostRole.WRITER); @@ -258,7 +258,7 @@ describe("reader failover handler", () => { await plugin.failoverWriter(); - verify(mockPluginService.initiateTopologyUpdate(true, anything())).once(); + verify(mockPluginService.forceMonitoringRefresh(true, anything())).once(); verify(mockPluginService.setCurrentClient(mockClientWrapper, hostInfo)).once(); });