Skip to content

Commit

Permalink
refactor: untracked promises and timeouts
Browse files Browse the repository at this point in the history
  • Loading branch information
sophia-bq committed Jan 7, 2025
1 parent d168815 commit c4a4778
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 57 deletions.
124 changes: 79 additions & 45 deletions common/lib/host_list_provider/monitoring/cluster_topology_monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,44 +17,43 @@
import { HostInfo } from "../../host_info";
import { CacheMap } from "../../utils/cache_map";
import { PluginService } from "../../plugin_service";
import { HostListProviderService } from "../../host_list_provider_service";
import { HostAvailability } from "../../host_availability/host_availability";
import { logTopology, sleep } from "../../utils/utils";
import { logger } from "../../../logutils";
import { HostRole } from "../../host_role";
import { ClientWrapper } from "../../client_wrapper";
import { AwsWrapperError } from "../../utils/errors";
import { MonitoringRdsHostListProvider } from "./monitoring_host_list_provider";
import { error } from "winston";
import { clearTimeout } from "node:timers";

export interface ClusterToplogyMonitor {
forceRefresh(client: any, timeoutMs: number): Promise<HostInfo[]>;

setClusterId(clusterId: string): void;

close(): void;

forceMonitoringRefresh(shouldVerifyWriter: boolean, timeoutMs: number): Promise<HostInfo[]>;
}

export class ClusterToplogyMonitorImpl implements ClusterToplogyMonitor {
public clusterId: string;
public topologyMap: CacheMap<string, HostInfo[]>;
private topologyCacheExpirationNanos: number = 60000000000; // 1 minute // TODO: investigate values and set in constructor.
initialHostInfo: HostInfo;
public properties: Map<string, any>;
public pluginService: PluginService;
protected hostListProviderService: HostListProviderService;
hostListProvider: MonitoringRdsHostListProvider;
protected refreshRate: number = 100;
private highRefreshRate: number = 100;
static readonly TOPOLOGY_CACHE_EXPIRATION_MS: number = 300000;

private readonly clusterId: string;
private readonly initialHostInfo: HostInfo;
private readonly _properties: Map<string, any>;
private readonly _pluginService: PluginService;
private readonly _hostListProvider: MonitoringRdsHostListProvider;
private readonly refreshRateMs: number;
private readonly highRefreshRateMs: number;

private topologyMap: CacheMap<string, HostInfo[]>;
private writerHostInfo: HostInfo = null;
private isVerifiedWriterConnection: boolean = false;
private monitoringClient: ClientWrapper = null;
private highRefreshRateEndTime: any = 0;
private highRefreshPeriodAfterPanic: number = 30000; // 30 seconds.
private ignoreTopologyRequest: number = 1000; // 10 seconds.
private highRefreshRateEndTime: number = 0;
private highRefreshPeriodAfterPanicMs: number = 30000; // 30 seconds.
private ignoreNewTopologyRequestsEndTime: number = 0;
private ignoreTopologyRequestMs: number = 10000; // 10 seconds.

// Controls for stopping the ClusterTopologyMonitor run.
private stopMonitoring: boolean = false;
Expand Down Expand Up @@ -85,21 +84,31 @@ export class ClusterToplogyMonitorImpl implements ClusterToplogyMonitor {
initialHostSpec: HostInfo,
props,
pluginService: PluginService,
hostListProviderService: HostListProviderService,
hostListProvider: MonitoringRdsHostListProvider,
refreshRateNano
refreshRateMs: number,
highRefreshRateMs: number
) {
this.clusterId = clusterId;
this.topologyMap = topologyMap;
this.initialHostInfo = initialHostSpec;
this.pluginService = pluginService;
this.hostListProviderService = hostListProviderService;
this.hostListProvider = hostListProvider;
this.properties = props;
//this.refreshRateNano = refreshRateNano; // TODO: coordinate timeouts for bigint or number.
this._pluginService = pluginService;
this._hostListProvider = hostListProvider;
this._properties = props;
this.refreshRateMs = refreshRateMs;
this.highRefreshRateMs = highRefreshRateMs;
this.runPromise = this.run();
}

get hostListProvider(): MonitoringRdsHostListProvider {
return this._hostListProvider;
}
get pluginService(): PluginService {
return this._pluginService;
}
get properties(): Map<string, any> {
return this._properties;
}

async close(): Promise<void> {
this.stopMonitoring = true;
this.hostMonitorsStop = true;
Expand All @@ -108,10 +117,6 @@ export class ClusterToplogyMonitorImpl implements ClusterToplogyMonitor {
this.hostMonitors.clear();
}

setClusterId(clusterId: string): void {
this.clusterId = clusterId;
}

async forceMonitoringRefresh(shouldVerifyWriter: boolean, timeoutMs: number): Promise<HostInfo[]> {
const currentHosts = this.topologyMap.get(this.clusterId);
if (currentHosts) {
Expand Down Expand Up @@ -177,7 +182,7 @@ export class ClusterToplogyMonitorImpl implements ClusterToplogyMonitor {
}

try {
const hosts: HostInfo[] = await this.hostListProvider.sqlQueryForTopology(client);
const hosts: HostInfo[] = await this._hostListProvider.sqlQueryForTopology(client);
if (hosts) {
this.updateTopologyCache(hosts);
}
Expand All @@ -188,13 +193,46 @@ export class ClusterToplogyMonitorImpl implements ClusterToplogyMonitor {
return null;
}

private openAnyClientAndUpdateTopology() {
// TODO: implement method.
return [];
private async openAnyClientAndUpdateTopology() {
if (!this.monitoringClient) {
let client;
try {
client = await this._pluginService.forceConnect(this.initialHostInfo, this._properties);
if (!this.monitoringClient) {
this.monitoringClient = client;
client = await this._pluginService.forceConnect(this.initialHostInfo, this._properties);
}
} catch {
logger.debug(`Could not connect to: ${this.initialHostInfo.host}`);
return null;
}

if (client && !this.monitoringClient) {
this.monitoringClient = client;
logger.debug(`Opened monitoring connection to: ${this.initialHostInfo.host}`);
if (this.getWriterHostId(this.monitoringClient) !== null) {
this.isVerifiedWriterConnection = true;
this.writerHostInfo = this.initialHostInfo;
}
} else {
// Monitoring connection already set by another task, close the new connection.
this.untrackedPromises.push(this.closeConnection(client));
}
}

const hosts: HostInfo[] = await this.fetchTopologyAndUpdateCache(this.monitoringClient);
if (!hosts) {
const clientToClose = this.monitoringClient;
this.monitoringClient = null;
this.isVerifiedWriterConnection = false;

this.untrackedPromises.push(this.closeConnection(clientToClose));
}
return hosts;
}

updateTopologyCache(hosts: HostInfo[]) {
this.topologyMap.put(this.clusterId, hosts, this.topologyCacheExpirationNanos);
this.topologyMap.put(this.clusterId, hosts, ClusterToplogyMonitorImpl.TOPOLOGY_CACHE_EXPIRATION_MS);
this.releaseTopologyUpdate();
this.topologyUpdated = new Promise<void>((done) => {
this.releaseTopologyUpdate = () => {
Expand All @@ -204,14 +242,14 @@ export class ClusterToplogyMonitorImpl implements ClusterToplogyMonitor {
}

getWriterHostId(client: ClientWrapper) {
return client.hostInfo.role === HostRole.WRITER ? client.id : null;
return client && client.hostInfo.role === HostRole.WRITER ? client.id : null;
}

async closeConnection(client: any) {
async closeConnection(client: ClientWrapper) {
if (!client) {
return;
}
await this.pluginService.abortTargetClient(client);
await this._pluginService.abortTargetClient(client);
}

private isInPanicMode() {
Expand All @@ -236,7 +274,7 @@ export class ClusterToplogyMonitorImpl implements ClusterToplogyMonitor {
// Use any client to gather topology information.
let hosts: HostInfo[] = this.topologyMap.get(this.clusterId);
if (!hosts) {
hosts = this.openAnyClientAndUpdateTopology();
hosts = await this.openAnyClientAndUpdateTopology();
}

// Set up host monitors.
Expand All @@ -259,15 +297,13 @@ export class ClusterToplogyMonitorImpl implements ClusterToplogyMonitor {
// Writer detected, update monitoringClient.
const client = this.monitoringClient;
this.monitoringClient = writerClient;
await this.closeConnection(client);
this.untrackedPromises.push(this.closeConnection(client));
this.isVerifiedWriterConnection = true;
this.highRefreshRateEndTime = Date.now() + this.highRefreshPeriodAfterPanic;
this.ignoreNewTopologyRequestsEndTime = Date.now() - this.ignoreTopologyRequest;
this.highRefreshRateEndTime = Date.now() + this.highRefreshPeriodAfterPanicMs;
this.ignoreNewTopologyRequestsEndTime = Date.now() + this.ignoreTopologyRequestMs;

// Stop monitoring of each host, writer detected.
this.hostMonitorsStop = true;
await Promise.all(this.untrackedPromises);
this.untrackedPromises = [];
this.hostMonitors.clear();
continue;
} else {
Expand All @@ -292,8 +328,6 @@ export class ClusterToplogyMonitorImpl implements ClusterToplogyMonitor {
if (this.hostMonitors.size !== 0) {
// Stop host monitors.
this.hostMonitorsStop = true;
await Promise.all(this.untrackedPromises);
this.untrackedPromises = [];
this.hostMonitors.clear();
}
const hosts = await this.fetchTopologyAndUpdateCache(this.monitoringClient);
Expand All @@ -302,7 +336,7 @@ export class ClusterToplogyMonitorImpl implements ClusterToplogyMonitor {
const client = this.monitoringClient;
this.monitoringClient = null;
this.isVerifiedWriterConnection = false;
await this.closeConnection(client);
this.untrackedPromises.push(this.closeConnection(client));
continue;
}
if (this.highRefreshRateEndTime > 0 && Date.now() > this.highRefreshRateEndTime) {
Expand Down Expand Up @@ -331,7 +365,7 @@ export class ClusterToplogyMonitorImpl implements ClusterToplogyMonitor {
}

private async delay(useHighRefreshRate: boolean) {
const endTime = Date.now() + (useHighRefreshRate ? this.highRefreshRate : this.refreshRate);
const endTime = Date.now() + (useHighRefreshRate ? this.highRefreshRateMs : this.refreshRateMs);
while (Date.now() < endTime && !this.requestToUpdateTopology) {
await sleep(50);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,14 @@ import { ClientWrapper } from "../../client_wrapper";
import { DatabaseDialect } from "../../database_dialect/database_dialect";
import { AwsWrapperError } from "../../utils/errors";
import { Messages } from "../../utils/messages";
import { WrapperProperties } from "../../wrapper_property";

export class MonitoringRdsHostListProvider extends RdsHostListProvider {
static CACHE_CLEANUP_NANOS: bigint = BigInt(60_000_000_000); // 1 minute
static MONITOR_EXPIRATION_NANO: bigint = BigInt(15 * 60_000_000_000); // 15 minutes
static TOPOLOGY_QUERY_TIMEOUT = 5000;
static readonly CACHE_CLEANUP_NANOS: bigint = BigInt(10 * 60_000_000_000); // 10 minutes.
static readonly MONITOR_EXPIRATION_NANOS: bigint = BigInt(15 * 60_000_000_000); // 15 minutes.
static readonly DEFAULT_TOPOLOGY_QUERY_TIMEOUT_MS = 5000; // 5 seconds.

pluginService: PluginService;

protected static monitors: SlidingExpirationCache<string, ClusterToplogyMonitorImpl> = new SlidingExpirationCache(
private static monitors: SlidingExpirationCache<string, ClusterToplogyMonitorImpl> = new SlidingExpirationCache(
MonitoringRdsHostListProvider.CACHE_CLEANUP_NANOS,
() => true,
async (monitor: ClusterToplogyMonitorImpl) => {
Expand All @@ -44,6 +43,8 @@ export class MonitoringRdsHostListProvider extends RdsHostListProvider {
}
);

private readonly pluginService: PluginService;

constructor(properties: Map<string, any>, originalUrl: string, hostListProviderService: HostListProviderService, pluginService: PluginService) {
super(properties, originalUrl, hostListProviderService);
this.pluginService = pluginService;
Expand All @@ -59,14 +60,14 @@ export class MonitoringRdsHostListProvider extends RdsHostListProvider {
async queryForTopology(targetClient: ClientWrapper, dialect: DatabaseDialect): Promise<HostInfo[]> {
let monitor: ClusterToplogyMonitor = MonitoringRdsHostListProvider.monitors.get(
this.clusterId,
MonitoringRdsHostListProvider.MONITOR_EXPIRATION_NANO
MonitoringRdsHostListProvider.MONITOR_EXPIRATION_NANOS
);
if (!monitor) {
monitor = this.initMonitor();
}

try {
return await monitor.forceRefresh(targetClient, MonitoringRdsHostListProvider.TOPOLOGY_QUERY_TIMEOUT);
return await monitor.forceRefresh(targetClient, MonitoringRdsHostListProvider.DEFAULT_TOPOLOGY_QUERY_TIMEOUT_MS);
} catch {
return null;
}
Expand All @@ -84,7 +85,7 @@ export class MonitoringRdsHostListProvider extends RdsHostListProvider {
async forceMonitoringRefresh(shouldVerifyWriter: boolean, timeoutMs: number): Promise<HostInfo[]> {
let monitor: ClusterToplogyMonitor = MonitoringRdsHostListProvider.monitors.get(
this.clusterId,
MonitoringRdsHostListProvider.MONITOR_EXPIRATION_NANO
MonitoringRdsHostListProvider.MONITOR_EXPIRATION_NANOS
);
if (!monitor) {
monitor = this.initMonitor();
Expand All @@ -93,6 +94,7 @@ export class MonitoringRdsHostListProvider extends RdsHostListProvider {
if (!monitor) {
throw new AwsWrapperError(Messages.get("MonitoringHostListProvider.requiresMonitor"));
}

return await monitor.forceMonitoringRefresh(shouldVerifyWriter, timeoutMs);
}

Expand All @@ -103,14 +105,15 @@ export class MonitoringRdsHostListProvider extends RdsHostListProvider {
this.initialHost,
this.properties,
this.pluginService,
this.hostListProviderService,
this,
this.clusterInstanceTemplate
WrapperProperties.CLUSTER_TOPOLOGY_REFRESH_RATE_MS.get(this.properties),
WrapperProperties.FAILOVER_CLUSTER_TOPOLOGY_REFRESH_RATE_MS.get(this.properties)
);

return MonitoringRdsHostListProvider.monitors.computeIfAbsent(
this.clusterId,
(x) => monitor,
MonitoringRdsHostListProvider.MONITOR_EXPIRATION_NANO
MonitoringRdsHostListProvider.MONITOR_EXPIRATION_NANOS
);
}
}

0 comments on commit c4a4778

Please sign in to comment.