Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: multi-az failover2 #396

Merged
merged 2 commits into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions common/lib/host_list_provider/host_list_provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ export type StaticHostListProvider = HostListProvider;

export interface BlockingHostListProvider extends HostListProvider {
forceMonitoringRefresh(shouldVerifyWriter: boolean, timeoutMs: number): Promise<HostInfo[]>;

clearAll(): Promise<void>;
}

export interface HostListProvider {
Expand Down
2 changes: 1 addition & 1 deletion common/lib/plugin_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ export class PluginService implements ErrorHandler, HostListProviderService {
return false;
}

protected isBlockingHostListProvider(arg: any): arg is BlockingHostListProvider {
isBlockingHostListProvider(arg: any): arg is BlockingHostListProvider {
return arg;
}

Expand Down
9 changes: 6 additions & 3 deletions common/lib/plugins/failover2/failover2_plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ import { HostAvailability } from "../../host_availability/host_availability";
import { TelemetryTraceLevel } from "../../utils/telemetry/telemetry_trace_level";
import { HostRole } from "../../host_role";
import { CanReleaseResources } from "../../can_release_resources";
import { MonitoringRdsHostListProvider } from "../../host_list_provider/monitoring/monitoring_host_list_provider";
import { ReaderFailoverResult } from "../failover/reader_failover_result";
import { HostListProvider } from "../../host_list_provider/host_list_provider";

export class Failover2Plugin extends AbstractConnectionPlugin implements CanReleaseResources {
private static readonly TELEMETRY_WRITER_FAILOVER = "failover to writer instance";
Expand Down Expand Up @@ -401,7 +401,7 @@ export class Failover2Plugin extends AbstractConnectionPlugin implements CanRele

if ((await this.pluginService.getHostRole(writerCandidateClient)) !== HostRole.WRITER) {
try {
await writerCandidateClient.end();
await writerCandidateClient?.end();
} catch (error) {
// Do nothing.
}
Expand Down Expand Up @@ -481,6 +481,9 @@ export class Failover2Plugin extends AbstractConnectionPlugin implements CanRele
}

async releaseResources(): Promise<void> {
await (this.pluginService.getHostListProvider() as MonitoringRdsHostListProvider).clearAll();
const hostListProvider: HostListProvider = this.pluginService.getHostListProvider();
if (!!this.pluginService.isBlockingHostListProvider(hostListProvider)) {
await hostListProvider.clearAll();
}
}
}
3 changes: 2 additions & 1 deletion common/lib/topology_aware_database_dialect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@ export interface TopologyAwareDatabaseDialect {

getHostRole(client: ClientWrapper): Promise<HostRole>;

getWriterId(client: ClientWrapper): Promise<string | null>;
// Returns the host id of the targetClient if it is connected to a writer, null otherwise.
getWriterId(targetClient: ClientWrapper): Promise<string | null>;
}
31 changes: 27 additions & 4 deletions mysql/lib/dialect/rds_multi_az_mysql_database_dialect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,21 @@ import { AwsWrapperError } from "../../../common/lib/utils/errors";
import { TopologyAwareDatabaseDialect } from "../../../common/lib/topology_aware_database_dialect";
import { RdsHostListProvider } from "../../../common/lib/host_list_provider/rds_host_list_provider";
import { FailoverRestriction } from "../../../common/lib/plugins/failover/failover_restriction";
import { WrapperProperties } from "../../../common/lib/wrapper_property";
import { PluginService } from "../../../common/lib/plugin_service";
import { MonitoringRdsHostListProvider } from "../../../common/lib/host_list_provider/monitoring/monitoring_host_list_provider";

export class RdsMultiAZMySQLDatabaseDialect extends MySQLDatabaseDialect implements TopologyAwareDatabaseDialect {
private static readonly TOPOLOGY_QUERY: string = "SELECT id, endpoint, port FROM mysql.rds_topology";
private static readonly TOPOLOGY_TABLE_EXIST_QUERY: string =
"SELECT 1 AS tmp FROM information_schema.tables WHERE" + " table_schema = 'mysql' AND table_name = 'rds_topology'";
// For reader hosts, the query should return a writer host id. For a writer host, the query should return no data.
private static readonly FETCH_WRITER_HOST_QUERY: string = "SHOW REPLICA STATUS";
private static readonly FETCH_WRITER_HOST_QUERY_COLUMN_NAME: string = "Source_Server_Id";
private static readonly HOST_ID_QUERY: string = "SELECT @@server_id AS host";
private static readonly HOST_ID_QUERY_COLUMN_NAME: string = "host";
private static readonly IS_READER_QUERY: string = "SELECT @@read_only";
private static readonly IS_READER_QUERY: string = "SELECT @@read_only AS is_reader";
private static readonly IS_READER_QUERY_COLUMN_NAME: string = "is_reader";

async isDialect(targetClient: ClientWrapper): Promise<boolean> {
const res = await targetClient.query(RdsMultiAZMySQLDatabaseDialect.TOPOLOGY_TABLE_EXIST_QUERY).catch(() => false);
Expand All @@ -48,6 +53,9 @@ export class RdsMultiAZMySQLDatabaseDialect extends MySQLDatabaseDialect impleme
}

getHostListProvider(props: Map<string, any>, originalUrl: string, hostListProviderService: HostListProviderService): HostListProvider {
if (WrapperProperties.PLUGINS.get(props).includes("failover2")) {
return new MonitoringRdsHostListProvider(props, originalUrl, hostListProviderService, <PluginService>hostListProviderService);
}
return new RdsHostListProvider(props, originalUrl, hostListProviderService);
}

Expand Down Expand Up @@ -118,11 +126,26 @@ export class RdsMultiAZMySQLDatabaseDialect extends MySQLDatabaseDialect impleme
}

async getHostRole(client: ClientWrapper): Promise<HostRole> {
return (await this.executeTopologyRelatedQuery(client, RdsMultiAZMySQLDatabaseDialect.IS_READER_QUERY)) ? HostRole.WRITER : HostRole.READER;
return (await this.executeTopologyRelatedQuery(client, RdsMultiAZMySQLDatabaseDialect.IS_READER_QUERY, RdsMultiAZMySQLDatabaseDialect.IS_READER_QUERY_COLUMN_NAME)) == "0" ? HostRole.WRITER : HostRole.READER;
}

getWriterId(client: ClientWrapper): Promise<string> {
throw new Error("Method not implemented.");
async getWriterId(targetClient: ClientWrapper): Promise<string> {
try {
const writerHostId: string = await this.executeTopologyRelatedQuery(
targetClient,
RdsMultiAZMySQLDatabaseDialect.FETCH_WRITER_HOST_QUERY,
RdsMultiAZMySQLDatabaseDialect.FETCH_WRITER_HOST_QUERY_COLUMN_NAME
);
// The above query returns the writer host id if it is a reader, nothing if the writer.
if (!writerHostId) {
const currentConnection = await this.identifyConnection(targetClient);
return currentConnection ?? null;
} else {
return null;
}
} catch (error: any) {
throw new AwsWrapperError(Messages.get("RdsMultiAZMySQLDatabaseDialect.invalidQuery", error.message));
}
}

async identifyConnection(client: ClientWrapper): Promise<string> {
Expand Down
29 changes: 23 additions & 6 deletions pg/lib/dialect/rds_multi_az_pg_database_dialect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import { RdsHostListProvider } from "../../../common/lib/host_list_provider/rds_
import { PgDatabaseDialect } from "./pg_database_dialect";
import { ErrorHandler } from "../../../common/lib/error_handler";
import { MultiAzPgErrorHandler } from "../multi_az_pg_error_handler";
import { error, info, query } from "winston";
import { WrapperProperties } from "../../../common/lib/wrapper_property";
import { PluginService } from "../../../common/lib/plugin_service";
import { MonitoringRdsHostListProvider } from "../../../common/lib/host_list_provider/monitoring/monitoring_host_list_provider";

export class RdsMultiAZPgDatabaseDialect extends PgDatabaseDialect implements TopologyAwareDatabaseDialect {
constructor() {
Expand All @@ -42,7 +44,8 @@ export class RdsMultiAZPgDatabaseDialect extends PgDatabaseDialect implements To
private static readonly FETCH_WRITER_HOST_QUERY_COLUMN_NAME: string = "multi_az_db_cluster_source_dbi_resource_id";
private static readonly HOST_ID_QUERY: string = "SELECT dbi_resource_id FROM rds_tools.dbi_resource_id()";
private static readonly HOST_ID_QUERY_COLUMN_NAME: string = "dbi_resource_id";
private static readonly IS_READER_QUERY: string = "SELECT pg_is_in_recovery()";
private static readonly IS_READER_QUERY: string = "SELECT pg_is_in_recovery() AS is_reader";
private static readonly IS_READER_QUERY_COLUMN_NAME: string = "is_reader";

async isDialect(targetClient: ClientWrapper): Promise<boolean> {
const res = await targetClient.query(RdsMultiAZPgDatabaseDialect.WRITER_HOST_FUNC_EXIST_QUERY).catch(() => false);
Expand All @@ -55,6 +58,9 @@ export class RdsMultiAZPgDatabaseDialect extends PgDatabaseDialect implements To
}

getHostListProvider(props: Map<string, any>, originalUrl: string, hostListProviderService: HostListProviderService): HostListProvider {
if (WrapperProperties.PLUGINS.get(props).includes("failover2")) {
return new MonitoringRdsHostListProvider(props, originalUrl, hostListProviderService, <PluginService>hostListProviderService);
}
return new RdsHostListProvider(props, originalUrl, hostListProviderService);
}

Expand All @@ -77,7 +83,7 @@ export class RdsMultiAZPgDatabaseDialect extends PgDatabaseDialect implements To
}
}

private async executeTopologyRelatedQuery(targetClient: ClientWrapper, query: string, resultColumnName?: string): Promise<string> {
private async executeTopologyRelatedQuery(targetClient: ClientWrapper, query: string, resultColumnName?: string): Promise<any> {
const res = await targetClient.query(query);
const rows: any[] = res.rows;
if (rows.length > 0) {
Expand Down Expand Up @@ -125,11 +131,22 @@ export class RdsMultiAZPgDatabaseDialect extends PgDatabaseDialect implements To
}

async getHostRole(client: ClientWrapper): Promise<HostRole> {
return (await this.executeTopologyRelatedQuery(client, RdsMultiAZPgDatabaseDialect.IS_READER_QUERY)) ? HostRole.WRITER : HostRole.READER;
return (await this.executeTopologyRelatedQuery(client, RdsMultiAZPgDatabaseDialect.IS_READER_QUERY, RdsMultiAZPgDatabaseDialect.IS_READER_QUERY_COLUMN_NAME)) === false ? HostRole.WRITER : HostRole.READER;
}

getWriterId(client: ClientWrapper): Promise<string> {
throw new Error("Method not implemented.");
async getWriterId(targetClient: ClientWrapper): Promise<string> {
try {
const writerHostId: string = await this.executeTopologyRelatedQuery(
targetClient,
RdsMultiAZPgDatabaseDialect.FETCH_WRITER_HOST_QUERY,
RdsMultiAZPgDatabaseDialect.FETCH_WRITER_HOST_QUERY_COLUMN_NAME
);
const currentConnection = await this.identifyConnection(targetClient);

return (currentConnection && currentConnection === writerHostId) ? currentConnection : null;
} catch (error: any) {
throw new AwsWrapperError(Messages.get("RdsMultiAZPgDatabaseDialect.invalidQuery", error.message));
}
}

getErrorHandler(): ErrorHandler {
Expand Down
145 changes: 4 additions & 141 deletions tests/integration/container/tests/aurora_failover2.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ import { TestEnvironment } from "./utils/test_environment";
import { DriverHelper } from "./utils/driver_helper";
import { AuroraTestUtility } from "./utils/aurora_test_utility";
import { FailoverSuccessError, TransactionResolutionUnknownError } from "../../../../common/lib/utils/errors";
import { DatabaseEngine } from "./utils/database_engine";
import { QueryResult } from "pg";
import { ProxyHelper } from "./utils/proxy_helper";
import { RdsUtils } from "../../../../common/lib/utils/rds_utils";
import { logger } from "../../../../common/logutils";
import { features, instanceCount } from "./config";
import { TestEnvironmentFeatures } from "./utils/test_environment_features";
import { PluginManager } from "../../../../common/lib";
import { DatabaseEngine } from "./utils/database_engine";
import { TransactionIsolationLevel } from "../../../../common/lib/utils/transaction_isolation_level";
import { RdsUtils } from "../../../../common/lib/utils/rds_utils";
import { QueryResult } from "pg";

const itIf =
features.includes(TestEnvironmentFeatures.FAILOVER_SUPPORTED) &&
Expand All @@ -36,7 +36,6 @@ const itIf =
? it
: it.skip;
const itIfTwoInstance = instanceCount == 2 ? itIf : it.skip;
const itIfMinThreeInstance = instanceCount >= 3 ? itIf : it.skip;

let env: TestEnvironment;
let driver;
Expand Down Expand Up @@ -66,27 +65,6 @@ async function initDefaultConfig(host: string, port: number, connectToProxy: boo
return config;
}

async function initConfigWithRWSplitting(host: string, port: number, connectToProxy: boolean): Promise<any> {
let config: any = {
user: env.databaseInfo.username,
host: host,
database: env.databaseInfo.defaultDbName,
password: env.databaseInfo.password,
port: port,
plugins: "readWriteSplitting,failover2",
failoverTimeoutMs: 400000,
enableTelemetry: true,
telemetryTracesBackend: "OTLP",
telemetryMetricsBackend: "OTLP"
};

if (connectToProxy) {
config["clusterInstanceHostPattern"] = "?." + env.proxyDatabaseInfo.instanceEndpointSuffix;
}
config = DriverHelper.addDriverSpecificConfiguration(config, env.engine);
return config;
}

describe("aurora failover2", () => {
beforeEach(async () => {
logger.info(`Test started: ${expect.getState().currentTestName}`);
Expand All @@ -97,8 +75,6 @@ describe("aurora failover2", () => {
initClientFunc = DriverHelper.getClient(driver);
await ProxyHelper.enableAllConnectivity();
await TestEnvironment.verifyClusterStatus();
await TestEnvironment.verifyAllInstancesHasRightState("available");
await TestEnvironment.verifyAllInstancesUp();

client = null;
secondaryClient = null;
Expand All @@ -112,6 +88,7 @@ describe("aurora failover2", () => {
// pass
}
}

if (secondaryClient !== null) {
try {
await secondaryClient.end();
Expand Down Expand Up @@ -281,118 +258,4 @@ describe("aurora failover2", () => {
},
1320000
);

itIfMinThreeInstance(
"test failover to new writer set read only true false",
async () => {
// Connect to writer instance
const writerConfig = await initConfigWithRWSplitting(
env.proxyDatabaseInfo.writerInstanceEndpoint,
env.proxyDatabaseInfo.instanceEndpointPort,
true
);
client = initClientFunc(writerConfig);
await client.connect();

const initialWriterId = await auroraTestUtility.queryInstanceId(client);
expect(await auroraTestUtility.isDbInstanceWriter(initialWriterId)).toStrictEqual(true);

// Kill all reader instances
for (const host of env.proxyDatabaseInfo.instances) {
if (host.instanceId && host.instanceId !== initialWriterId) {
await ProxyHelper.disableConnectivity(env.engine, host.instanceId);
}
}

// Force internal reader connection to the writer instance
await client.setReadOnly(true);
const currentId0 = await auroraTestUtility.queryInstanceId(client);

expect(currentId0).toStrictEqual(initialWriterId);
await client.setReadOnly(false);

await ProxyHelper.enableAllConnectivity();
// Crash instance 1 and nominate a new writer
await auroraTestUtility.failoverClusterAndWaitUntilWriterChanged();
await TestEnvironment.verifyClusterStatus();

await expect(async () => {
await auroraTestUtility.queryInstanceId(client);
}).rejects.toThrow(FailoverSuccessError);
const newWriterId = await auroraTestUtility.queryInstanceId(client);
expect(await auroraTestUtility.isDbInstanceWriter(newWriterId)).toStrictEqual(true);
expect(newWriterId).not.toBe(initialWriterId);

await client.setReadOnly(true);
const currentReaderId = await auroraTestUtility.queryInstanceId(client);
expect(currentReaderId).not.toBe(newWriterId);

await client.setReadOnly(false);
const currentId = await auroraTestUtility.queryInstanceId(client);
expect(currentId).toStrictEqual(newWriterId);
},
1320000
);

itIfMinThreeInstance(
"test failover to new reader set read only false true",
async () => {
// Connect to writer instance
const writerConfig = await initConfigWithRWSplitting(
env.proxyDatabaseInfo.writerInstanceEndpoint,
env.proxyDatabaseInfo.instanceEndpointPort,
true
);
writerConfig["failoverMode"] = "reader-or-writer";
client = initClientFunc(writerConfig);

await client.connect();
const initialWriterId = await auroraTestUtility.queryInstanceId(client);
expect(await auroraTestUtility.isDbInstanceWriter(initialWriterId)).toStrictEqual(true);
await client.setReadOnly(true);

const readerConnectionId = await auroraTestUtility.queryInstanceId(client);
expect(readerConnectionId).not.toBe(initialWriterId);

// Get a reader instance
let otherReaderId;
for (const host of env.proxyDatabaseInfo.instances) {
if (host.instanceId && host.instanceId !== readerConnectionId && host.instanceId !== initialWriterId) {
otherReaderId = host.instanceId;
break;
}
}

if (!otherReaderId) {
throw new Error("Could not find a reader instance");
}
// Kill all instances except one other reader
for (const host of env.proxyDatabaseInfo.instances) {
if (host.instanceId && host.instanceId !== otherReaderId) {
await ProxyHelper.disableConnectivity(env.engine, host.instanceId);
}
}

await expect(async () => {
await auroraTestUtility.queryInstanceId(client);
}).rejects.toThrow(FailoverSuccessError);

const currentReaderId0 = await auroraTestUtility.queryInstanceId(client);

expect(currentReaderId0).toStrictEqual(otherReaderId);
expect(currentReaderId0).not.toBe(readerConnectionId);

await ProxyHelper.enableAllConnectivity();
await client.setReadOnly(false);

const currentId = await auroraTestUtility.queryInstanceId(client);
expect(currentId).toStrictEqual(initialWriterId);

await client.setReadOnly(true);

const currentReaderId2 = await auroraTestUtility.queryInstanceId(client);
expect(currentReaderId2).toStrictEqual(otherReaderId);
},
1320000
);
});
Loading