From 87f1656c4a61a3497d68c24b55bdee68a50c28a1 Mon Sep 17 00:00:00 2001 From: Sophia Chu Date: Mon, 3 Feb 2025 08:59:56 -0800 Subject: [PATCH 1/2] feat: failover2 for multi-az clusters --- .../host_list_provider/host_list_provider.ts | 2 + common/lib/plugin_service.ts | 2 +- .../lib/plugins/failover2/failover2_plugin.ts | 9 +- common/lib/topology_aware_database_dialect.ts | 3 +- .../rds_multi_az_mysql_database_dialect.ts | 31 +++- .../rds_multi_az_pg_database_dialect.ts | 33 +++- .../container/tests/aurora_failover2.test.ts | 145 +----------------- .../tests/read_write_splitting.test.ts | 145 +++++++++++++++--- 8 files changed, 196 insertions(+), 174 deletions(-) diff --git a/common/lib/host_list_provider/host_list_provider.ts b/common/lib/host_list_provider/host_list_provider.ts index ffa0d2ae..2c2c24ec 100644 --- a/common/lib/host_list_provider/host_list_provider.ts +++ b/common/lib/host_list_provider/host_list_provider.ts @@ -25,6 +25,8 @@ export type StaticHostListProvider = HostListProvider; export interface BlockingHostListProvider extends HostListProvider { forceMonitoringRefresh(shouldVerifyWriter: boolean, timeoutMs: number): Promise; + + clearAll(): Promise; } export interface HostListProvider { diff --git a/common/lib/plugin_service.ts b/common/lib/plugin_service.ts index f93cdfee..c77a5ddf 100644 --- a/common/lib/plugin_service.ts +++ b/common/lib/plugin_service.ts @@ -198,7 +198,7 @@ export class PluginService implements ErrorHandler, HostListProviderService { return false; } - protected isBlockingHostListProvider(arg: any): arg is BlockingHostListProvider { + isBlockingHostListProvider(arg: any): arg is BlockingHostListProvider { return arg; } diff --git a/common/lib/plugins/failover2/failover2_plugin.ts b/common/lib/plugins/failover2/failover2_plugin.ts index ddadb8fa..7bb00e23 100644 --- a/common/lib/plugins/failover2/failover2_plugin.ts +++ b/common/lib/plugins/failover2/failover2_plugin.ts @@ -39,8 +39,8 @@ import { HostAvailability } from "../../host_availability/host_availability"; import { TelemetryTraceLevel } from "../../utils/telemetry/telemetry_trace_level"; import { HostRole } from "../../host_role"; import { CanReleaseResources } from "../../can_release_resources"; -import { MonitoringRdsHostListProvider } from "../../host_list_provider/monitoring/monitoring_host_list_provider"; import { ReaderFailoverResult } from "../failover/reader_failover_result"; +import { HostListProvider } from "../../host_list_provider/host_list_provider"; export class Failover2Plugin extends AbstractConnectionPlugin implements CanReleaseResources { private static readonly TELEMETRY_WRITER_FAILOVER = "failover to writer instance"; @@ -401,7 +401,7 @@ export class Failover2Plugin extends AbstractConnectionPlugin implements CanRele if ((await this.pluginService.getHostRole(writerCandidateClient)) !== HostRole.WRITER) { try { - await writerCandidateClient.end(); + await writerCandidateClient?.end(); } catch (error) { // Do nothing. } @@ -481,6 +481,9 @@ export class Failover2Plugin extends AbstractConnectionPlugin implements CanRele } async releaseResources(): Promise { - await (this.pluginService.getHostListProvider() as MonitoringRdsHostListProvider).clearAll(); + const hostListProvider: HostListProvider = this.pluginService.getHostListProvider(); + if (!!this.pluginService.isBlockingHostListProvider(hostListProvider)) { + await hostListProvider.clearAll(); + } } } diff --git a/common/lib/topology_aware_database_dialect.ts b/common/lib/topology_aware_database_dialect.ts index 37e459cf..e5d6bc1c 100644 --- a/common/lib/topology_aware_database_dialect.ts +++ b/common/lib/topology_aware_database_dialect.ts @@ -26,5 +26,6 @@ export interface TopologyAwareDatabaseDialect { getHostRole(client: ClientWrapper): Promise; - getWriterId(client: ClientWrapper): Promise; + // Returns the host id of the targetClient if it is connected to a writer, null otherwise. + getWriterId(targetClient: ClientWrapper): Promise; } diff --git a/mysql/lib/dialect/rds_multi_az_mysql_database_dialect.ts b/mysql/lib/dialect/rds_multi_az_mysql_database_dialect.ts index 80319707..c18a369a 100644 --- a/mysql/lib/dialect/rds_multi_az_mysql_database_dialect.ts +++ b/mysql/lib/dialect/rds_multi_az_mysql_database_dialect.ts @@ -26,16 +26,21 @@ import { AwsWrapperError } from "../../../common/lib/utils/errors"; import { TopologyAwareDatabaseDialect } from "../../../common/lib/topology_aware_database_dialect"; import { RdsHostListProvider } from "../../../common/lib/host_list_provider/rds_host_list_provider"; import { FailoverRestriction } from "../../../common/lib/plugins/failover/failover_restriction"; +import { WrapperProperties } from "../../../common/lib/wrapper_property"; +import { PluginService } from "../../../common/lib/plugin_service"; +import { MonitoringRdsHostListProvider } from "../../../common/lib/host_list_provider/monitoring/monitoring_host_list_provider"; export class RdsMultiAZMySQLDatabaseDialect extends MySQLDatabaseDialect implements TopologyAwareDatabaseDialect { private static readonly TOPOLOGY_QUERY: string = "SELECT id, endpoint, port FROM mysql.rds_topology"; private static readonly TOPOLOGY_TABLE_EXIST_QUERY: string = "SELECT 1 AS tmp FROM information_schema.tables WHERE" + " table_schema = 'mysql' AND table_name = 'rds_topology'"; + // For reader hosts, the query should return a writer host id. For a writer host, the query should return no data. private static readonly FETCH_WRITER_HOST_QUERY: string = "SHOW REPLICA STATUS"; private static readonly FETCH_WRITER_HOST_QUERY_COLUMN_NAME: string = "Source_Server_Id"; private static readonly HOST_ID_QUERY: string = "SELECT @@server_id AS host"; private static readonly HOST_ID_QUERY_COLUMN_NAME: string = "host"; - private static readonly IS_READER_QUERY: string = "SELECT @@read_only"; + private static readonly IS_READER_QUERY: string = "SELECT @@read_only AS is_reader"; + private static readonly IS_READER_QUERY_COLUMN_NAME: string = "is_reader"; async isDialect(targetClient: ClientWrapper): Promise { const res = await targetClient.query(RdsMultiAZMySQLDatabaseDialect.TOPOLOGY_TABLE_EXIST_QUERY).catch(() => false); @@ -48,6 +53,9 @@ export class RdsMultiAZMySQLDatabaseDialect extends MySQLDatabaseDialect impleme } getHostListProvider(props: Map, originalUrl: string, hostListProviderService: HostListProviderService): HostListProvider { + if (WrapperProperties.PLUGINS.get(props).includes("failover2")) { + return new MonitoringRdsHostListProvider(props, originalUrl, hostListProviderService, hostListProviderService); + } return new RdsHostListProvider(props, originalUrl, hostListProviderService); } @@ -118,11 +126,26 @@ export class RdsMultiAZMySQLDatabaseDialect extends MySQLDatabaseDialect impleme } async getHostRole(client: ClientWrapper): Promise { - return (await this.executeTopologyRelatedQuery(client, RdsMultiAZMySQLDatabaseDialect.IS_READER_QUERY)) ? HostRole.WRITER : HostRole.READER; + return (await this.executeTopologyRelatedQuery(client, RdsMultiAZMySQLDatabaseDialect.IS_READER_QUERY, RdsMultiAZMySQLDatabaseDialect.IS_READER_QUERY_COLUMN_NAME)) == "0" ? HostRole.WRITER : HostRole.READER; } - getWriterId(client: ClientWrapper): Promise { - throw new Error("Method not implemented."); + async getWriterId(targetClient: ClientWrapper): Promise { + try { + const writerHostId: string = await this.executeTopologyRelatedQuery( + targetClient, + RdsMultiAZMySQLDatabaseDialect.FETCH_WRITER_HOST_QUERY, + RdsMultiAZMySQLDatabaseDialect.FETCH_WRITER_HOST_QUERY_COLUMN_NAME + ); + // The above query returns the writer host id if it is a reader, nothing if the writer. + if ((!writerHostId)) { + const currentConnection = await this.identifyConnection(targetClient); + return currentConnection ? currentConnection : null; + } else { + return null; + } + } catch (error: any) { + throw new AwsWrapperError(Messages.get("RdsMultiAZMySQLDatabaseDialect.invalidQuery", error.message)); + } } async identifyConnection(client: ClientWrapper): Promise { diff --git a/pg/lib/dialect/rds_multi_az_pg_database_dialect.ts b/pg/lib/dialect/rds_multi_az_pg_database_dialect.ts index 12ed88c9..e7316113 100644 --- a/pg/lib/dialect/rds_multi_az_pg_database_dialect.ts +++ b/pg/lib/dialect/rds_multi_az_pg_database_dialect.ts @@ -27,7 +27,9 @@ import { RdsHostListProvider } from "../../../common/lib/host_list_provider/rds_ import { PgDatabaseDialect } from "./pg_database_dialect"; import { ErrorHandler } from "../../../common/lib/error_handler"; import { MultiAzPgErrorHandler } from "../multi_az_pg_error_handler"; -import { error, info, query } from "winston"; +import { WrapperProperties } from "../../../common/lib/wrapper_property"; +import { PluginService } from "../../../common/lib/plugin_service"; +import { MonitoringRdsHostListProvider } from "../../../common/lib/host_list_provider/monitoring/monitoring_host_list_provider"; export class RdsMultiAZPgDatabaseDialect extends PgDatabaseDialect implements TopologyAwareDatabaseDialect { constructor() { @@ -42,7 +44,8 @@ export class RdsMultiAZPgDatabaseDialect extends PgDatabaseDialect implements To private static readonly FETCH_WRITER_HOST_QUERY_COLUMN_NAME: string = "multi_az_db_cluster_source_dbi_resource_id"; private static readonly HOST_ID_QUERY: string = "SELECT dbi_resource_id FROM rds_tools.dbi_resource_id()"; private static readonly HOST_ID_QUERY_COLUMN_NAME: string = "dbi_resource_id"; - private static readonly IS_READER_QUERY: string = "SELECT pg_is_in_recovery()"; + private static readonly IS_READER_QUERY: string = "SELECT pg_is_in_recovery() AS is_reader"; + private static readonly IS_READER_QUERY_COLUMN_NAME: string = "is_reader"; async isDialect(targetClient: ClientWrapper): Promise { const res = await targetClient.query(RdsMultiAZPgDatabaseDialect.WRITER_HOST_FUNC_EXIST_QUERY).catch(() => false); @@ -55,6 +58,9 @@ export class RdsMultiAZPgDatabaseDialect extends PgDatabaseDialect implements To } getHostListProvider(props: Map, originalUrl: string, hostListProviderService: HostListProviderService): HostListProvider { + if (WrapperProperties.PLUGINS.get(props).includes("failover2")) { + return new MonitoringRdsHostListProvider(props, originalUrl, hostListProviderService, hostListProviderService); + } return new RdsHostListProvider(props, originalUrl, hostListProviderService); } @@ -77,7 +83,7 @@ export class RdsMultiAZPgDatabaseDialect extends PgDatabaseDialect implements To } } - private async executeTopologyRelatedQuery(targetClient: ClientWrapper, query: string, resultColumnName?: string): Promise { + private async executeTopologyRelatedQuery(targetClient: ClientWrapper, query: string, resultColumnName?: string): Promise { const res = await targetClient.query(query); const rows: any[] = res.rows; if (rows.length > 0) { @@ -125,11 +131,26 @@ export class RdsMultiAZPgDatabaseDialect extends PgDatabaseDialect implements To } async getHostRole(client: ClientWrapper): Promise { - return (await this.executeTopologyRelatedQuery(client, RdsMultiAZPgDatabaseDialect.IS_READER_QUERY)) ? HostRole.WRITER : HostRole.READER; + return (await this.executeTopologyRelatedQuery(client, RdsMultiAZPgDatabaseDialect.IS_READER_QUERY, RdsMultiAZPgDatabaseDialect.IS_READER_QUERY_COLUMN_NAME)) === false ? HostRole.WRITER : HostRole.READER; } - getWriterId(client: ClientWrapper): Promise { - throw new Error("Method not implemented."); + async getWriterId(targetClient: ClientWrapper): Promise { + try { + const writerHostId: string = await this.executeTopologyRelatedQuery( + targetClient, + RdsMultiAZPgDatabaseDialect.FETCH_WRITER_HOST_QUERY, + RdsMultiAZPgDatabaseDialect.FETCH_WRITER_HOST_QUERY_COLUMN_NAME + ); + const currentConnection = await this.identifyConnection(targetClient); + + if (currentConnection === writerHostId) { + return currentConnection ? currentConnection : null; + } else { + return null; + } + } catch (error: any) { + throw new AwsWrapperError(Messages.get("RdsMultiAZPgDatabaseDialect.invalidQuery", error.message)); + } } getErrorHandler(): ErrorHandler { diff --git a/tests/integration/container/tests/aurora_failover2.test.ts b/tests/integration/container/tests/aurora_failover2.test.ts index 6f874671..a0b9263e 100644 --- a/tests/integration/container/tests/aurora_failover2.test.ts +++ b/tests/integration/container/tests/aurora_failover2.test.ts @@ -18,15 +18,15 @@ import { TestEnvironment } from "./utils/test_environment"; import { DriverHelper } from "./utils/driver_helper"; import { AuroraTestUtility } from "./utils/aurora_test_utility"; import { FailoverSuccessError, TransactionResolutionUnknownError } from "../../../../common/lib/utils/errors"; +import { DatabaseEngine } from "./utils/database_engine"; +import { QueryResult } from "pg"; import { ProxyHelper } from "./utils/proxy_helper"; +import { RdsUtils } from "../../../../common/lib/utils/rds_utils"; import { logger } from "../../../../common/logutils"; import { features, instanceCount } from "./config"; import { TestEnvironmentFeatures } from "./utils/test_environment_features"; import { PluginManager } from "../../../../common/lib"; -import { DatabaseEngine } from "./utils/database_engine"; import { TransactionIsolationLevel } from "../../../../common/lib/utils/transaction_isolation_level"; -import { RdsUtils } from "../../../../common/lib/utils/rds_utils"; -import { QueryResult } from "pg"; const itIf = features.includes(TestEnvironmentFeatures.FAILOVER_SUPPORTED) && @@ -36,7 +36,6 @@ const itIf = ? it : it.skip; const itIfTwoInstance = instanceCount == 2 ? itIf : it.skip; -const itIfMinThreeInstance = instanceCount >= 3 ? itIf : it.skip; let env: TestEnvironment; let driver; @@ -66,27 +65,6 @@ async function initDefaultConfig(host: string, port: number, connectToProxy: boo return config; } -async function initConfigWithRWSplitting(host: string, port: number, connectToProxy: boolean): Promise { - let config: any = { - user: env.databaseInfo.username, - host: host, - database: env.databaseInfo.defaultDbName, - password: env.databaseInfo.password, - port: port, - plugins: "readWriteSplitting,failover2", - failoverTimeoutMs: 400000, - enableTelemetry: true, - telemetryTracesBackend: "OTLP", - telemetryMetricsBackend: "OTLP" - }; - - if (connectToProxy) { - config["clusterInstanceHostPattern"] = "?." + env.proxyDatabaseInfo.instanceEndpointSuffix; - } - config = DriverHelper.addDriverSpecificConfiguration(config, env.engine); - return config; -} - describe("aurora failover2", () => { beforeEach(async () => { logger.info(`Test started: ${expect.getState().currentTestName}`); @@ -97,8 +75,6 @@ describe("aurora failover2", () => { initClientFunc = DriverHelper.getClient(driver); await ProxyHelper.enableAllConnectivity(); await TestEnvironment.verifyClusterStatus(); - await TestEnvironment.verifyAllInstancesHasRightState("available"); - await TestEnvironment.verifyAllInstancesUp(); client = null; secondaryClient = null; @@ -112,6 +88,7 @@ describe("aurora failover2", () => { // pass } } + if (secondaryClient !== null) { try { await secondaryClient.end(); @@ -281,118 +258,4 @@ describe("aurora failover2", () => { }, 1320000 ); - - itIfMinThreeInstance( - "test failover to new writer set read only true false", - async () => { - // Connect to writer instance - const writerConfig = await initConfigWithRWSplitting( - env.proxyDatabaseInfo.writerInstanceEndpoint, - env.proxyDatabaseInfo.instanceEndpointPort, - true - ); - client = initClientFunc(writerConfig); - await client.connect(); - - const initialWriterId = await auroraTestUtility.queryInstanceId(client); - expect(await auroraTestUtility.isDbInstanceWriter(initialWriterId)).toStrictEqual(true); - - // Kill all reader instances - for (const host of env.proxyDatabaseInfo.instances) { - if (host.instanceId && host.instanceId !== initialWriterId) { - await ProxyHelper.disableConnectivity(env.engine, host.instanceId); - } - } - - // Force internal reader connection to the writer instance - await client.setReadOnly(true); - const currentId0 = await auroraTestUtility.queryInstanceId(client); - - expect(currentId0).toStrictEqual(initialWriterId); - await client.setReadOnly(false); - - await ProxyHelper.enableAllConnectivity(); - // Crash instance 1 and nominate a new writer - await auroraTestUtility.failoverClusterAndWaitUntilWriterChanged(); - await TestEnvironment.verifyClusterStatus(); - - await expect(async () => { - await auroraTestUtility.queryInstanceId(client); - }).rejects.toThrow(FailoverSuccessError); - const newWriterId = await auroraTestUtility.queryInstanceId(client); - expect(await auroraTestUtility.isDbInstanceWriter(newWriterId)).toStrictEqual(true); - expect(newWriterId).not.toBe(initialWriterId); - - await client.setReadOnly(true); - const currentReaderId = await auroraTestUtility.queryInstanceId(client); - expect(currentReaderId).not.toBe(newWriterId); - - await client.setReadOnly(false); - const currentId = await auroraTestUtility.queryInstanceId(client); - expect(currentId).toStrictEqual(newWriterId); - }, - 1320000 - ); - - itIfMinThreeInstance( - "test failover to new reader set read only false true", - async () => { - // Connect to writer instance - const writerConfig = await initConfigWithRWSplitting( - env.proxyDatabaseInfo.writerInstanceEndpoint, - env.proxyDatabaseInfo.instanceEndpointPort, - true - ); - writerConfig["failoverMode"] = "reader-or-writer"; - client = initClientFunc(writerConfig); - - await client.connect(); - const initialWriterId = await auroraTestUtility.queryInstanceId(client); - expect(await auroraTestUtility.isDbInstanceWriter(initialWriterId)).toStrictEqual(true); - await client.setReadOnly(true); - - const readerConnectionId = await auroraTestUtility.queryInstanceId(client); - expect(readerConnectionId).not.toBe(initialWriterId); - - // Get a reader instance - let otherReaderId; - for (const host of env.proxyDatabaseInfo.instances) { - if (host.instanceId && host.instanceId !== readerConnectionId && host.instanceId !== initialWriterId) { - otherReaderId = host.instanceId; - break; - } - } - - if (!otherReaderId) { - throw new Error("Could not find a reader instance"); - } - // Kill all instances except one other reader - for (const host of env.proxyDatabaseInfo.instances) { - if (host.instanceId && host.instanceId !== otherReaderId) { - await ProxyHelper.disableConnectivity(env.engine, host.instanceId); - } - } - - await expect(async () => { - await auroraTestUtility.queryInstanceId(client); - }).rejects.toThrow(FailoverSuccessError); - - const currentReaderId0 = await auroraTestUtility.queryInstanceId(client); - - expect(currentReaderId0).toStrictEqual(otherReaderId); - expect(currentReaderId0).not.toBe(readerConnectionId); - - await ProxyHelper.enableAllConnectivity(); - await client.setReadOnly(false); - - const currentId = await auroraTestUtility.queryInstanceId(client); - expect(currentId).toStrictEqual(initialWriterId); - - await client.setReadOnly(true); - - const currentReaderId2 = await auroraTestUtility.queryInstanceId(client); - expect(currentReaderId2).toStrictEqual(otherReaderId); - }, - 1320000 - ); }); diff --git a/tests/integration/container/tests/read_write_splitting.test.ts b/tests/integration/container/tests/read_write_splitting.test.ts index d673dbb3..bca656ac 100644 --- a/tests/integration/container/tests/read_write_splitting.test.ts +++ b/tests/integration/container/tests/read_write_splitting.test.ts @@ -48,14 +48,14 @@ let secondaryClient: any; let auroraTestUtility: AuroraTestUtility; let provider: InternalPooledConnectionProvider | null; -async function initDefaultConfig(host: string, port: number, connectToProxy: boolean): Promise { +async function initConfig(host: string, port: number, connectToProxy: boolean, plugins: string): Promise { let config: any = { user: env.databaseInfo.username, host: host, database: env.databaseInfo.defaultDbName, password: env.databaseInfo.password, port: port, - plugins: "readWriteSplitting", + plugins: plugins, enableTelemetry: true, telemetryTracesBackend: "OTLP", telemetryMetricsBackend: "OTLP" @@ -68,24 +68,19 @@ async function initDefaultConfig(host: string, port: number, connectToProxy: boo return config; } +async function initDefaultConfig(host: string, port: number, connectToProxy: boolean): Promise { + return await initConfig(host, port, connectToProxy, "readWriteSplitting"); +} + async function initConfigWithFailover(host: string, port: number, connectToProxy: boolean): Promise { - let config: any = { - user: env.databaseInfo.username, - host: host, - database: env.databaseInfo.defaultDbName, - password: env.databaseInfo.password, - port: port, - plugins: "readWriteSplitting,failover", - failoverTimeoutMs: 400000, - enableTelemetry: true, - telemetryTracesBackend: "OTLP", - telemetryMetricsBackend: "OTLP" - }; + const config: any = await initConfig(host, port, connectToProxy, "readWriteSplitting,failover"); + config["failoverTimeoutMs"] = 400000; + return config; +} - if (connectToProxy) { - config["clusterInstanceHostPattern"] = "?." + env.proxyDatabaseInfo.instanceEndpointSuffix; - } - config = DriverHelper.addDriverSpecificConfiguration(config, env.engine); +async function initConfigWithFailover2(host: string, port: number, connectToProxy: boolean): Promise { + const config: any = await initConfig(host, port, connectToProxy, "readWriteSplitting,failover2"); + config["failoverTimeoutMs"] = 400000; return config; } @@ -676,4 +671,118 @@ describe("aurora read write splitting", () => { }, 1000000 ); + + itIfMinThreeInstance( + "test failover2 to new writer set read only true false", + async () => { + // Connect to writer instance + const writerConfig = await initConfigWithFailover2( + env.proxyDatabaseInfo.writerInstanceEndpoint, + env.proxyDatabaseInfo.instanceEndpointPort, + true + ); + client = initClientFunc(writerConfig); + await client.connect(); + + const initialWriterId = await auroraTestUtility.queryInstanceId(client); + expect(await auroraTestUtility.isDbInstanceWriter(initialWriterId)).toStrictEqual(true); + + // Kill all reader instances + for (const host of env.proxyDatabaseInfo.instances) { + if (host.instanceId && host.instanceId !== initialWriterId) { + await ProxyHelper.disableConnectivity(env.engine, host.instanceId); + } + } + + // Force internal reader connection to the writer instance + await client.setReadOnly(true); + const currentId0 = await auroraTestUtility.queryInstanceId(client); + + expect(currentId0).toStrictEqual(initialWriterId); + await client.setReadOnly(false); + + await ProxyHelper.enableAllConnectivity(); + // Crash instance 1 and nominate a new writer + await auroraTestUtility.failoverClusterAndWaitUntilWriterChanged(); + await TestEnvironment.verifyClusterStatus(); + + await expect(async () => { + await auroraTestUtility.queryInstanceId(client); + }).rejects.toThrow(FailoverSuccessError); + const newWriterId = await auroraTestUtility.queryInstanceId(client); + expect(await auroraTestUtility.isDbInstanceWriter(newWriterId)).toStrictEqual(true); + expect(newWriterId).not.toBe(initialWriterId); + + await client.setReadOnly(true); + const currentReaderId = await auroraTestUtility.queryInstanceId(client); + expect(currentReaderId).not.toBe(newWriterId); + + await client.setReadOnly(false); + const currentId = await auroraTestUtility.queryInstanceId(client); + expect(currentId).toStrictEqual(newWriterId); + }, + 1320000 + ); + + itIfMinThreeInstance( + "test failover2 to new reader set read only false true", + async () => { + // Connect to writer instance + const writerConfig = await initConfigWithFailover2( + env.proxyDatabaseInfo.writerInstanceEndpoint, + env.proxyDatabaseInfo.instanceEndpointPort, + true + ); + writerConfig["failoverMode"] = "reader-or-writer"; + client = initClientFunc(writerConfig); + + await client.connect(); + const initialWriterId = await auroraTestUtility.queryInstanceId(client); + expect(await auroraTestUtility.isDbInstanceWriter(initialWriterId)).toStrictEqual(true); + await client.setReadOnly(true); + + const readerConnectionId = await auroraTestUtility.queryInstanceId(client); + expect(readerConnectionId).not.toBe(initialWriterId); + + // Get a reader instance + let otherReaderId; + for (const host of env.proxyDatabaseInfo.instances) { + if (host.instanceId && host.instanceId !== readerConnectionId && host.instanceId !== initialWriterId) { + otherReaderId = host.instanceId; + break; + } + } + + if (!otherReaderId) { + throw new Error("Could not find a reader instance"); + } + // Kill all instances except one other reader + for (const host of env.proxyDatabaseInfo.instances) { + if (host.instanceId && host.instanceId !== otherReaderId) { + await ProxyHelper.disableConnectivity(env.engine, host.instanceId); + } + } + + await expect(async () => { + await auroraTestUtility.queryInstanceId(client); + }).rejects.toThrow(FailoverSuccessError); + + const currentReaderId0 = await auroraTestUtility.queryInstanceId(client); + + expect(currentReaderId0).toStrictEqual(otherReaderId); + expect(currentReaderId0).not.toBe(readerConnectionId); + + await ProxyHelper.enableAllConnectivity(); + await client.setReadOnly(false); + + const currentId = await auroraTestUtility.queryInstanceId(client); + expect(currentId).toStrictEqual(initialWriterId); + + await client.setReadOnly(true); + + const currentReaderId2 = await auroraTestUtility.queryInstanceId(client); + expect(currentReaderId2).toStrictEqual(otherReaderId); + }, + 1320000 + ); }); From f55a607ec09cfe25ff3b3073ccc20ef5191493dc Mon Sep 17 00:00:00 2001 From: Sophia Chu Date: Fri, 7 Feb 2025 11:00:01 -0800 Subject: [PATCH 2/2] apply review comments --- mysql/lib/dialect/rds_multi_az_mysql_database_dialect.ts | 4 ++-- pg/lib/dialect/rds_multi_az_pg_database_dialect.ts | 6 +----- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/mysql/lib/dialect/rds_multi_az_mysql_database_dialect.ts b/mysql/lib/dialect/rds_multi_az_mysql_database_dialect.ts index c18a369a..2689604a 100644 --- a/mysql/lib/dialect/rds_multi_az_mysql_database_dialect.ts +++ b/mysql/lib/dialect/rds_multi_az_mysql_database_dialect.ts @@ -137,9 +137,9 @@ export class RdsMultiAZMySQLDatabaseDialect extends MySQLDatabaseDialect impleme RdsMultiAZMySQLDatabaseDialect.FETCH_WRITER_HOST_QUERY_COLUMN_NAME ); // The above query returns the writer host id if it is a reader, nothing if the writer. - if ((!writerHostId)) { + if (!writerHostId) { const currentConnection = await this.identifyConnection(targetClient); - return currentConnection ? currentConnection : null; + return currentConnection ?? null; } else { return null; } diff --git a/pg/lib/dialect/rds_multi_az_pg_database_dialect.ts b/pg/lib/dialect/rds_multi_az_pg_database_dialect.ts index e7316113..6c11f86b 100644 --- a/pg/lib/dialect/rds_multi_az_pg_database_dialect.ts +++ b/pg/lib/dialect/rds_multi_az_pg_database_dialect.ts @@ -143,11 +143,7 @@ export class RdsMultiAZPgDatabaseDialect extends PgDatabaseDialect implements To ); const currentConnection = await this.identifyConnection(targetClient); - if (currentConnection === writerHostId) { - return currentConnection ? currentConnection : null; - } else { - return null; - } + return (currentConnection && currentConnection === writerHostId) ? currentConnection : null; } catch (error: any) { throw new AwsWrapperError(Messages.get("RdsMultiAZPgDatabaseDialect.invalidQuery", error.message)); }