Skip to content

Commit

Permalink
fix: force monitoring refresh
Browse files Browse the repository at this point in the history
  • Loading branch information
sophia-bq committed Jan 31, 2025
1 parent 6e7849f commit d4fee57
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ import { DatabaseDialect } from "../../database_dialect/database_dialect";
import { AwsWrapperError } from "../../utils/errors";
import { Messages } from "../../utils/messages";
import { WrapperProperties } from "../../wrapper_property";
import { BlockingHostListProvider } from "../host_list_provider";

export class MonitoringRdsHostListProvider extends RdsHostListProvider {
export class MonitoringRdsHostListProvider extends RdsHostListProvider implements BlockingHostListProvider {
static readonly CACHE_CLEANUP_NANOS: bigint = BigInt(60_000_000_000); // 1 minute.
static readonly MONITOR_EXPIRATION_NANOS: bigint = BigInt(15 * 60_000_000_000); // 15 minutes.
static readonly DEFAULT_TOPOLOGY_QUERY_TIMEOUT_MS = 5000; // 5 seconds.
Expand Down
11 changes: 8 additions & 3 deletions common/lib/plugin_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import { ErrorHandler } from "./error_handler";
import { HostInfo } from "./host_info";
import { AwsClient } from "./aws_client";
import { HostListProviderService } from "./host_list_provider_service";
import { HostListProvider } from "./host_list_provider/host_list_provider";
import { HostListProvider, BlockingHostListProvider } from "./host_list_provider/host_list_provider";
import { ConnectionUrlParser } from "./utils/connection_url_parser";
import { DatabaseDialect, DatabaseType } from "./database_dialect/database_dialect";
import { HostInfoBuilder } from "./host_info_builder";
Expand All @@ -44,6 +44,7 @@ import { getWriter } from "./utils/utils";
import { TelemetryFactory } from "./utils/telemetry/telemetry_factory";
import { DriverDialect } from "./driver_dialect/driver_dialect";
import { MonitoringRdsHostListProvider } from "./host_list_provider/monitoring/monitoring_host_list_provider";
import { TopologyAwareDatabaseDialect } from "./topology_aware_database_dialect";

export class PluginService implements ErrorHandler, HostListProviderService {
private readonly _currentClient: AwsClient;
Expand Down Expand Up @@ -176,8 +177,8 @@ export class PluginService implements ErrorHandler, HostListProviderService {
}

async forceMonitoringRefresh(shouldVerifyWriter: boolean, timeoutMs: number): Promise<boolean> {
const hostListProvider = this.getHostListProvider();
if (!("forceMonitoringRefresh" in hostListProvider)) {
const hostListProvider: HostListProvider = this.getHostListProvider();
if (!this.isBlockingHostListProvider(hostListProvider)) {
throw new AwsWrapperError(Messages.get("PluginService.requiredMonitoringRdsHostListProvider"), typeof hostListProvider);
}

Expand All @@ -201,6 +202,10 @@ export class PluginService implements ErrorHandler, HostListProviderService {
return false;
}

protected isBlockingHostListProvider(arg: any): arg is BlockingHostListProvider {
return arg;
}

async refreshHostList(): Promise<void>;
async refreshHostList(targetClient: ClientWrapper): Promise<void>;
async refreshHostList(targetClient?: ClientWrapper): Promise<void> {
Expand Down
22 changes: 11 additions & 11 deletions common/lib/plugins/failover2/failover2_plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import {
TransactionResolutionUnknownError,
UnavailableHostError
} from "../../utils/errors";
import { logTopology } from "../../utils/utils";
import { ClientWrapper } from "../../client_wrapper";
import { HostAvailability } from "../../host_availability/host_availability";
import { TelemetryTraceLevel } from "../../utils/telemetry/telemetry_trace_level";
Expand Down Expand Up @@ -299,9 +298,6 @@ export class Failover2Plugin extends AbstractConnectionPlugin implements CanRele
const readerCandidates = hosts.filter((x) => x.role === HostRole.READER);
const originalWriter: HostInfo = hosts.find((x) => x.role === HostRole.WRITER);
let isOriginalWriterStillWriter: boolean = false;
// Signal to connect that this is an internal call and does not require additional processing.
const copyProps = new Map<string, any>(this._properties);
copyProps.set(Failover2Plugin.INTERNAL_CONNECT_PROPERTY_NAME, true);

while (Date.now() < failoverEndTimeMs) {
// Try all the original readers.
Expand All @@ -318,7 +314,7 @@ export class Failover2Plugin extends AbstractConnectionPlugin implements CanRele
logger.info(Messages.get("Failover2.readerCandidateNull"));
} else {
try {
const candidateClient: ClientWrapper = await this.pluginService.connect(readerCandidate, copyProps);
const candidateClient: ClientWrapper = await this.createConnectionForHost(readerCandidate);
const role: HostRole = await this.pluginService.getHostRole(candidateClient);
if (role === HostRole.READER || this.failoverMode !== FailoverMode.STRICT_READER) {
if (role !== readerCandidate.role) {
Expand Down Expand Up @@ -358,7 +354,7 @@ export class Failover2Plugin extends AbstractConnectionPlugin implements CanRele

// Try the original writer, which may have been demoted.
try {
const candidateClient: ClientWrapper = await this.pluginService.connect(originalWriter, copyProps);
const candidateClient: ClientWrapper = await this.createConnectionForHost(originalWriter);
const role: HostRole = await this.pluginService.getHostRole(candidateClient);
if (role === HostRole.READER || this.failoverMode != FailoverMode.STRICT_READER) {
const updatedHostInfo: HostInfo = this.pluginService.getHostInfoBuilder().copyFrom(originalWriter).withRole(role).build();
Expand Down Expand Up @@ -399,16 +395,12 @@ export class Failover2Plugin extends AbstractConnectionPlugin implements CanRele

const hosts: HostInfo[] = this.pluginService.getHosts();

// Signal to connect that this is an internal call and does not require additional processing.
const copyProps = new Map<string, any>(this._properties);
copyProps.set(Failover2Plugin.INTERNAL_CONNECT_PROPERTY_NAME, true);

let writerCandidateClient: ClientWrapper = null;
const writerCandidateHostInfo: HostInfo = hosts.find((x) => x.role === HostRole.WRITER);

if (writerCandidateHostInfo) {
try {
writerCandidateClient = await this.pluginService.connect(writerCandidateHostInfo, copyProps);
writerCandidateClient = await this.createConnectionForHost(writerCandidateHostInfo);
} catch (err) {
logger.error(Messages.get("Failover.unableToConnectToWriter"));
this.failoverWriterFailedCounter.inc();
Expand Down Expand Up @@ -445,6 +437,14 @@ export class Failover2Plugin extends AbstractConnectionPlugin implements CanRele
}
}

private async createConnectionForHost(hostInfo: HostInfo): Promise<ClientWrapper> {
const copyProps = new Map<string, any>(this._properties);
// Signal to connect that this is an internal call and does not require additional processing.
copyProps.set(Failover2Plugin.INTERNAL_CONNECT_PROPERTY_NAME, true);
copyProps.set(WrapperProperties.HOST.name, hostInfo.host);
return await this.pluginService.connect(hostInfo, copyProps);
}

async invalidateCurrentClient() {
const client = this.pluginService.getCurrentClient();
if (!client || !client.targetClient) {
Expand Down
2 changes: 1 addition & 1 deletion common/lib/utils/locales/en.json
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@
"ClusterTopologyMonitor.startingHostMonitors": "Starting host monitoring threads.",
"ClusterTopologyMonitor.writerPickedUpFromHostMonitors": "The writer host detected by the node monitors was picked up by the topology monitor: '%s'.",
"ClusterTopologyMonitor.writerMonitoringConnection": "The monitoring connection is connected to a writer: '%s'.",
"ClusterTopologyMonitor.invalidWriterQuery": "An error occurred while attempting to obtain the writer id because the query was invalid. Please ensure you are connecting to an Aurora or RDS DB cluster.",
"ClusterTopologyMonitor.invalidWriterQuery": "An error occurred while attempting to obtain the writer id because the query was invalid. Please ensure you are connecting to an Aurora or RDS DB cluster. Error: '%s'",
"ClusterTopologyMonitor.unableToConnect": "Could not connect to initial host: '%s'.",
"ClusterTopologyMonitor.openedMonitoringConnection": "Opened monitoring connection to: '%s'.",
"ClusterTopologyMonitor.startMonitoring": "Start cluster monitoring task.",
Expand Down
11 changes: 10 additions & 1 deletion mysql/lib/dialect/aurora_mysql_database_dialect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,16 @@ export class AuroraMySQLDatabaseDialect extends MySQLDatabaseDialect implements

async getWriterId(targetClient: ClientWrapper): Promise<string | null> {
const res = await targetClient.query(AuroraMySQLDatabaseDialect.IS_WRITER_QUERY);
return Promise.resolve(res[0][0]["server_id"] ? res[0][0]["server_id"] : null);
try {
const writerId: string = res[0][0]["server_id"];
return writerId ? writerId : null;
} catch (e) {
if (e.message.includes("Cannot read properties of undefined")) {
// Query returned no result, targetClient is not connected to a writer.
return null;
}
throw e;
}
}

async isDialect(targetClient: ClientWrapper): Promise<boolean> {
Expand Down

0 comments on commit d4fee57

Please sign in to comment.