Skip to content

Commit

Permalink
feat: failover reader selector strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
sophia-bq committed Jan 29, 2025
1 parent 2c2e16f commit 64dcca3
Show file tree
Hide file tree
Showing 16 changed files with 257 additions and 237 deletions.
4 changes: 4 additions & 0 deletions common/lib/host_list_provider/host_list_provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ export type DynamicHostListProvider = HostListProvider;

export type StaticHostListProvider = HostListProvider;

export interface BlockingHostListProvider extends HostListProvider {
forceMonitoringRefresh(shouldVerifyWriter: boolean, timeoutMs: number): Promise<HostInfo[]>;
}

export interface HostListProvider {
refresh(): Promise<HostInfo[]>;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ import { MonitoringRdsHostListProvider } from "./monitoring_host_list_provider";
import { Messages } from "../../utils/messages";

export interface ClusterTopologyMonitor {
forceRefresh(client: any, timeoutMs: number): Promise<HostInfo[]>;
forceRefresh(client: ClientWrapper, timeoutMs: number): Promise<HostInfo[]>;

close(): void;
close(): Promise<void>;

forceMonitoringRefresh(shouldVerifyWriter: boolean, timeoutMs: number): Promise<HostInfo[]>;
}
Expand All @@ -50,9 +50,9 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor {
private writerHostInfo: HostInfo = null;
private isVerifiedWriterConnection: boolean = false;
private monitoringClient: ClientWrapper = null;
private highRefreshRateEndTime: number = -1;
private highRefreshRateEndTimeMs: number = -1;
private highRefreshPeriodAfterPanicMs: number = 30000; // 30 seconds.
private ignoreNewTopologyRequestsEndTime: number = -1;
private ignoreNewTopologyRequestsEndTimeMs: number = -1;
private ignoreTopologyRequestMs: number = 10000; // 10 seconds.

// Tracking of the host monitors.
Expand Down Expand Up @@ -122,11 +122,12 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor {
this.hostMonitors.clear();
}

async forceMonitoringRefresh(shouldVerifyWriter: boolean, timeoutMs: number): Promise<HostInfo[]> {
if (this.ignoreNewTopologyRequestsEndTime > 0 && Date.now() < this.ignoreNewTopologyRequestsEndTime) {
async forceMonitoringRefresh(shouldVerifyWriter: boolean, timeoutMs: number): Promise<HostInfo[] | null> {
if (Date.now() < this.ignoreNewTopologyRequestsEndTimeMs) {
// Previous failover has just completed, use results without triggering new update.
const currentHosts = this.topologyMap.get(this.clusterId);
if (currentHosts !== null) {
logger.info(Messages.get("ClusterTopologyMonitoring.ignoringTopologyRequest"));
return currentHosts;
}
}
Expand All @@ -144,7 +145,7 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor {
return await this.waitTillTopologyGetsUpdated(timeoutMs);
}

async forceRefresh(client: any, timeoutMs: number): Promise<HostInfo[]> {
async forceRefresh(client: ClientWrapper, timeoutMs: number): Promise<HostInfo[] | null> {
if (this.isVerifiedWriterConnection) {
return await this.waitTillTopologyGetsUpdated(timeoutMs);
}
Expand All @@ -153,7 +154,7 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor {
return await this.fetchTopologyAndUpdateCache(client);
}

async waitTillTopologyGetsUpdated(timeoutMs: number): Promise<HostInfo[]> {
async waitTillTopologyGetsUpdated(timeoutMs: number): Promise<HostInfo[] | null> {
// Signal to any monitor that might be in delay, that topology should be updated.
this.requestToUpdateTopology = true;

Expand All @@ -176,7 +177,7 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor {
return latestHosts;
}

async fetchTopologyAndUpdateCache(client: ClientWrapper): Promise<HostInfo[]> {
async fetchTopologyAndUpdateCache(client: ClientWrapper): Promise<HostInfo[] | null> {
if (!client) {
return null;
}
Expand All @@ -193,7 +194,7 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor {
return null;
}

private async openAnyClientAndUpdateTopology(): Promise<HostInfo[]> {
private async openAnyClientAndUpdateTopology(): Promise<HostInfo[] | null> {
let writerVerifiedByThisThread = false;
if (!this.monitoringClient) {
let client: ClientWrapper;
Expand Down Expand Up @@ -225,10 +226,10 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor {

const hosts: HostInfo[] = await this.fetchTopologyAndUpdateCache(this.monitoringClient);
if (writerVerifiedByThisThread) {
if (this.ignoreNewTopologyRequestsEndTime === -1) {
this.ignoreNewTopologyRequestsEndTime = 0;
if (this.ignoreNewTopologyRequestsEndTimeMs === -1) {
this.ignoreNewTopologyRequestsEndTimeMs = 0;
} else {
this.ignoreNewTopologyRequestsEndTime = Date.now() + this.ignoreTopologyRequestMs;
this.ignoreNewTopologyRequestsEndTimeMs = Date.now() + this.ignoreTopologyRequestMs;
}
}

Expand All @@ -239,7 +240,7 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor {
return hosts;
}

updateTopologyCache(hosts: HostInfo[]) {
updateTopologyCache(hosts: HostInfo[]): void {
this.topologyMap.put(this.clusterId, hosts, ClusterTopologyMonitorImpl.TOPOLOGY_CACHE_EXPIRATION_NANOS);
this.requestToUpdateTopology = false;
}
Expand All @@ -250,26 +251,26 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor {
return writerHost === hostId ? writerHost : null;
}

async closeConnection(client: ClientWrapper) {
async closeConnection(client: ClientWrapper): Promise<void> {
if (client !== null) {
await client.abort();
client = null;
}
}

async updateMonitoringClient(newClient: ClientWrapper | null) {
async updateMonitoringClient(newClient: ClientWrapper | null): Promise<void> {
const clientToClose = this.monitoringClient;
this.monitoringClient = newClient;
if (clientToClose) {
await clientToClose.abort();
}
}

private isInPanicMode() {
private isInPanicMode(): boolean {
return !this.monitoringClient || !this.isVerifiedWriterConnection;
}

async run() {
async run(): Promise<void> {
logger.debug(Messages.get("ClusterTopologyMonitor.startMonitoring"));
try {
while (!this.stopMonitoring) {
Expand Down Expand Up @@ -317,15 +318,15 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor {
await this.updateMonitoringClient(writerClient);
this.writerHostInfo = writerHostInfo;
this.isVerifiedWriterConnection = true;
if (this.ignoreNewTopologyRequestsEndTime === -1) {
this.ignoreNewTopologyRequestsEndTime = 0;
if (this.ignoreNewTopologyRequestsEndTimeMs === -1) {
this.ignoreNewTopologyRequestsEndTimeMs = 0;
} else {
this.ignoreNewTopologyRequestsEndTime = Date.now() + this.ignoreTopologyRequestMs;
this.ignoreNewTopologyRequestsEndTimeMs = Date.now() + this.ignoreTopologyRequestMs;
}
if (this.highRefreshRateEndTime === -1) {
this.highRefreshRateEndTime = 0;
if (this.highRefreshRateEndTimeMs === -1) {
this.highRefreshRateEndTimeMs = 0;
} else {
this.highRefreshRateEndTime = Date.now() + this.highRefreshPeriodAfterPanicMs;
this.highRefreshRateEndTimeMs = Date.now() + this.highRefreshPeriodAfterPanicMs;
}

// Stop monitoring of each host, writer detected.
Expand Down Expand Up @@ -363,18 +364,18 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor {
await this.updateMonitoringClient(null);
continue;
}
if (this.highRefreshRateEndTime > 0 && Date.now() > this.highRefreshRateEndTime) {
this.highRefreshRateEndTime = 0;
if (this.highRefreshRateEndTimeMs > 0 && Date.now() > this.highRefreshRateEndTimeMs) {
this.highRefreshRateEndTimeMs = 0;
}
if (this.highRefreshRateEndTime < 0) {
if (this.highRefreshRateEndTimeMs < 0) {
// Log topology when not in high refresh rate.
this.logTopology(`[clusterTopologyMonitor] `);
}
// Set an easily interruptible delay between topology refreshes.
await this.delay(false);
}
if (this.ignoreNewTopologyRequestsEndTime > 0 && Date.now() > this.ignoreNewTopologyRequestsEndTime) {
this.ignoreNewTopologyRequestsEndTime = 0;
if (this.ignoreNewTopologyRequestsEndTimeMs > 0 && Date.now() > this.ignoreNewTopologyRequestsEndTimeMs) {
this.ignoreNewTopologyRequestsEndTimeMs = 0;
}
}
} catch (error) {
Expand All @@ -387,7 +388,7 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor {
}

private async delay(useHighRefreshRate: boolean) {
if (this.highRefreshRateEndTime > 0 && Date.now() < this.highRefreshRateEndTime) {
if (Date.now() < this.highRefreshRateEndTimeMs) {
useHighRefreshRate = true;
}
const endTime = Date.now() + (useHighRefreshRate ? this.highRefreshRateMs : this.refreshRateMs);
Expand Down Expand Up @@ -418,7 +419,7 @@ export class HostMonitor {
}

async run() {
let client = null;
let client: ClientWrapper | null = null;
let updateTopology: boolean = false;
const startTime: number = Date.now();
logger.debug(Messages.get("HostMonitor.startMonitoring", this.hostInfo.hostId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ export class MonitoringRdsHostListProvider extends RdsHostListProvider {
static readonly MONITOR_EXPIRATION_NANOS: bigint = BigInt(15 * 60_000_000_000); // 15 minutes.
static readonly DEFAULT_TOPOLOGY_QUERY_TIMEOUT_MS = 5000; // 5 seconds.

private static monitors: SlidingExpirationCache<string, ClusterTopologyMonitorImpl> = new SlidingExpirationCache(
private static monitors: SlidingExpirationCache<string, ClusterTopologyMonitor> = new SlidingExpirationCache(
MonitoringRdsHostListProvider.CACHE_CLEANUP_NANOS,
() => true,
async (monitor: ClusterTopologyMonitorImpl) => {
async (monitor: ClusterTopologyMonitor) => {
try {
await monitor.close();
} catch {
Expand All @@ -52,10 +52,7 @@ export class MonitoringRdsHostListProvider extends RdsHostListProvider {

async clearAll(): Promise<void> {
RdsHostListProvider.clearAll();
for (const [key, monitor] of MonitoringRdsHostListProvider.monitors.entries) {
await monitor.item.close();
}
MonitoringRdsHostListProvider.monitors.clear();
await MonitoringRdsHostListProvider.monitors.clear();
}

async queryForTopology(targetClient: ClientWrapper, dialect: DatabaseDialect): Promise<HostInfo[]> {
Expand All @@ -82,11 +79,6 @@ export class MonitoringRdsHostListProvider extends RdsHostListProvider {
return await dialect.queryForTopology(targetClient, this).then((res: any) => this.processQueryResults(res));
}

async getWriterId(targetClient: ClientWrapper): Promise<string> {
const hostInfo: HostInfo = await this.identifyConnection(targetClient, this.hostListProviderService.getDialect());
return hostInfo ? hostInfo.hostId : null;
}

async forceMonitoringRefresh(shouldVerifyWriter: boolean, timeoutMs: number): Promise<HostInfo[]> {
let monitor: ClusterTopologyMonitor = MonitoringRdsHostListProvider.monitors.get(
this.clusterId,
Expand Down
13 changes: 13 additions & 0 deletions common/lib/host_list_provider/rds_host_list_provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,19 @@ export class RdsHostListProvider implements DynamicHostListProvider {
}
}

async getWriterId(client: ClientWrapper): Promise<string | null> {
const dialect = this.hostListProviderService.getDialect();
if (!this.isTopologyAwareDatabaseDialect(dialect)) {
throw new TypeError(Messages.get("RdsHostListProvider.incorrectDialect"));
}

if (client) {
return await dialect.getWriterId(client);
} else {
throw new AwsWrapperError(Messages.get("AwsClient.targetClientNotDefined"));
}
}

async identifyConnection(targetClient: ClientWrapper, dialect: DatabaseDialect): Promise<HostInfo | null> {
if (!this.isTopologyAwareDatabaseDialect(dialect)) {
throw new TypeError(Messages.get("RdsHostListProvider.incorrectDialect"));
Expand Down
4 changes: 2 additions & 2 deletions common/lib/plugin_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,9 @@ export class PluginService implements ErrorHandler, HostListProviderService {
}
}

async initiateTopologyUpdate(shouldVerifyWriter: boolean, timeoutMs: number): Promise<boolean> {
async forceMonitoringRefresh(shouldVerifyWriter: boolean, timeoutMs: number): Promise<boolean> {
const hostListProvider = this.getHostListProvider();
if (!(hostListProvider instanceof MonitoringRdsHostListProvider)) {
if (!("forceMonitoringRefresh" in hostListProvider)) {
throw new AwsWrapperError(Messages.get("PluginService.requiredMonitoringRdsHostListProvider"), typeof hostListProvider);
}

Expand Down
24 changes: 1 addition & 23 deletions common/lib/plugins/failover/failover_plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,15 @@ export class FailoverPlugin extends AbstractConnectionPlugin {
private telemetryFailoverAdditionalTopTraceSetting: boolean = false;
private _rdsUrlType: RdsUrlType | null = null;
private _isInTransaction: boolean = false;
private _closedExplicitly: boolean = false;
private _lastError: any;
protected failoverTimeoutMsSetting: number = WrapperProperties.FAILOVER_TIMEOUT_MS.defaultValue;
protected failoverClusterTopologyRefreshRateMsSetting: number = WrapperProperties.FAILOVER_CLUSTER_TOPOLOGY_REFRESH_RATE_MS.defaultValue;
protected failoverWriterReconnectIntervalMsSetting: number = WrapperProperties.FAILOVER_WRITER_RECONNECT_INTERVAL_MS.defaultValue;
protected failoverReaderConnectTimeoutMsSetting: number = WrapperProperties.FAILOVER_READER_CONNECT_TIMEOUT_MS.defaultValue;
protected isClosed: boolean = false;
failoverMode: FailoverMode | null = null;

private hostListProviderService?: HostListProviderService;
private pluginService: PluginService;
private readonly pluginService: PluginService;
protected enableFailoverSetting: boolean = WrapperProperties.ENABLE_CLUSTER_AWARE_FAILOVER.defaultValue;

constructor(pluginService: PluginService, properties: Map<string, any>, rdsHelper: RdsUtils);
Expand Down Expand Up @@ -229,18 +227,6 @@ export class FailoverPlugin extends AbstractConnectionPlugin {
this.telemetryFailoverAdditionalTopTraceSetting = WrapperProperties.TELEMETRY_FAILOVER_ADDITIONAL_TOP_TRACE.get(this._properties);
}

private async invalidInvocationOnClosedConnection() {
if (!this._closedExplicitly) {
this.isClosed = false;
await this.pickNewConnection();

// "The active SQL connection has changed. Please re-configure session state if required."
logger.debug(Messages.get("Failover.connectionChangedError"));
throw new FailoverSuccessError(Messages.get("Failover.connectionChangedError"));
}
throw new AwsWrapperError(Messages.get("Failover.noOperationsAfterConnectionClosed"));
}

private getCurrentWriter(): HostInfo | null {
const topology = this.pluginService.getHosts();
if (topology.length == 0) {
Expand Down Expand Up @@ -311,10 +297,6 @@ export class FailoverPlugin extends AbstractConnectionPlugin {
return await methodFunc();
}

if (this.isClosed) {
await this.invalidInvocationOnClosedConnection();
}

if (this.canUpdateTopology(methodName)) {
await this.updateTopology(false);
}
Expand Down Expand Up @@ -485,10 +467,6 @@ export class FailoverPlugin extends AbstractConnectionPlugin {
}

async pickNewConnection() {
if (this.isClosed && this._closedExplicitly) {
logger.debug(Messages.get("Failover.transactionResolutionUnknownError"));
return;
}
const currentClient = this.pluginService.getCurrentClient();
const currentWriter = this.getCurrentWriter();
if (currentWriter && currentClient.targetClient == null && !this.shouldAttemptReaderConnection()) {
Expand Down
4 changes: 2 additions & 2 deletions common/lib/plugins/failover/reader_failover_handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ export class ClusterAwareReaderFailoverHandler implements ReaderFailoverHandler
const timer: any = {};
const endTime = Date.now() + this.maxFailoverTimeoutMs;

const timeoutTask = getTimeoutTask(timer, "Internal failover task timed out.", this.maxFailoverTimeoutMs);
const timeoutTask = getTimeoutTask(timer, Messages.get("Failover.timeoutError"), this.maxFailoverTimeoutMs);
const failoverTask = this.internalFailoverTask(hosts, currentHost, endTime);

return await Promise.race([timeoutTask, failoverTask])
Expand Down Expand Up @@ -130,7 +130,7 @@ export class ClusterAwareReaderFailoverHandler implements ReaderFailoverHandler
await sleep(1000);
}
}
throw new InternalQueryTimeoutError("Internal failover task has timed out.");
throw new InternalQueryTimeoutError(Messages.get("Failover.timeoutError"));
}

async failoverInternal(hosts: HostInfo[], currentHost: HostInfo | null): Promise<ReaderFailoverResult> {
Expand Down
Loading

0 comments on commit 64dcca3

Please sign in to comment.