Skip to content

Commit

Permalink
feat: efm2 (#371)
Browse files Browse the repository at this point in the history
  • Loading branch information
joyc-bq authored Feb 12, 2025
1 parent 068e0c0 commit 64227d1
Show file tree
Hide file tree
Showing 27 changed files with 853 additions and 83 deletions.
3 changes: 3 additions & 0 deletions common/lib/connection_plugin_chain_builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import { LimitlessConnectionPluginFactory } from "./plugins/limitless/limitless_
import { FastestResponseStrategyPluginFactory } from "./plugins/strategy/fastest_response/fastest_respose_strategy_plugin_factory";
import { CustomEndpointPluginFactory } from "./plugins/custom_endpoint/custom_endpoint_plugin_factory";
import { ConfigurationProfile } from "./profile/configuration_profile";
import { HostMonitoring2PluginFactory } from "./plugins/efm2/host_monitoring2_plugin_factory";

/*
Type alias used for plugin factory sorting. It holds a reference to a plugin
Expand All @@ -63,6 +64,7 @@ export class ConnectionPluginChainBuilder {
["failover", { factory: FailoverPluginFactory, weight: 700 }],
["failover2", { factory: Failover2PluginFactory, weight: 710 }],
["efm", { factory: HostMonitoringPluginFactory, weight: 800 }],
["efm2", { factory: HostMonitoring2PluginFactory, weight: 810 }],
["fastestResponseStrategy", { factory: FastestResponseStrategyPluginFactory, weight: 900 }],
["limitless", { factory: LimitlessConnectionPluginFactory, weight: 950 }],
["iam", { factory: IamAuthenticationPluginFactory, weight: 1000 }],
Expand All @@ -82,6 +84,7 @@ export class ConnectionPluginChainBuilder {
[FailoverPluginFactory, 700],
[Failover2PluginFactory, 710],
[HostMonitoringPluginFactory, 800],
[HostMonitoring2PluginFactory, 810],
[LimitlessConnectionPluginFactory, 950],
[IamAuthenticationPluginFactory, 1000],
[AwsSecretsManagerPluginFactory, 1100],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor {
}

private async openAnyClientAndUpdateTopology(): Promise<HostInfo[] | null> {
let writerVerifiedByThisThread = false;
let writerVerifiedByThisTask = false;
if (!this.monitoringClient) {
let client: ClientWrapper;
try {
Expand All @@ -215,7 +215,7 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor {
this.isVerifiedWriterConnection = true;
this.writerHostInfo = this.initialHostInfo;
logger.info(Messages.get("ClusterTopologyMonitor.writerMonitoringConnection", this.initialHostInfo.hostId));
writerVerifiedByThisThread = true;
writerVerifiedByThisTask = true;
}
} catch (error) {
// Do nothing.
Expand All @@ -228,7 +228,7 @@ export class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor {
}

const hosts: HostInfo[] = await this.fetchTopologyAndUpdateCache(this.monitoringClient);
if (writerVerifiedByThisThread) {
if (writerVerifiedByThisTask) {
if (this.ignoreNewTopologyRequestsEndTimeMs === -1) {
this.ignoreNewTopologyRequestsEndTimeMs = 0;
} else {
Expand Down
47 changes: 24 additions & 23 deletions common/lib/plugins/efm/host_monitoring_connection_plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ export class HostMonitoringConnectionPlugin extends AbstractConnectionPlugin imp
} finally {
if (monitorContext != null) {
await this.monitorService.stopMonitoring(monitorContext);
logger.debug(Messages.get("HostMonitoringConnectionPlugin.monitoringDeactivated", methodName));

if (monitorContext.isHostUnhealthy) {
const monitoringHostInfo = await this.getMonitoringHostInfo();
Expand Down Expand Up @@ -148,7 +149,8 @@ export class HostMonitoringConnectionPlugin extends AbstractConnectionPlugin imp
return result;
}

private throwUnableToIdentifyConnection(host: HostInfo | null, provider: HostListProvider | null): never {
private throwUnableToIdentifyConnection(host: HostInfo | null): never {
const provider: HostListProvider | null = this.pluginService.getHostListProvider();
throw new AwsWrapperError(
Messages.get(
"HostMonitoringConnectionPlugin.unableToIdentifyConnection",
Expand All @@ -159,32 +161,31 @@ export class HostMonitoringConnectionPlugin extends AbstractConnectionPlugin imp
}

async getMonitoringHostInfo(): Promise<HostInfo> {
if (this.monitoringHostInfo) {
return this.monitoringHostInfo;
}
this.monitoringHostInfo = this.pluginService.getCurrentHostInfo();
if (this.monitoringHostInfo == null) {
this.monitoringHostInfo = this.pluginService.getCurrentHostInfo();
const provider: HostListProvider | null = this.pluginService.getHostListProvider();
if (this.monitoringHostInfo == null) {
this.throwUnableToIdentifyConnection(null, provider);
}
const rdsUrlType: RdsUrlType = this.rdsUtils.identifyRdsType(this.monitoringHostInfo.url);

try {
if (rdsUrlType.isRdsCluster) {
logger.debug("Monitoring host info is associated with a cluster endpoint, plugin needs to identify the cluster connection");
this.monitoringHostInfo = await this.pluginService.identifyConnection(this.pluginService.getCurrentClient().targetClient!);
if (this.monitoringHostInfo == null) {
const host: HostInfo | null = this.pluginService.getCurrentHostInfo();
this.throwUnableToIdentifyConnection(host, provider);
}
await this.pluginService.fillAliases(this.pluginService.getCurrentClient().targetClient!, this.monitoringHostInfo);
}
} catch (error: any) {
if (!(error instanceof AwsWrapperError)) {
logger.debug(Messages.get("HostMonitoringConnectionPlugin.errorIdentifyingConnection", error.message));
this.throwUnableToIdentifyConnection(null);
}
const rdsUrlType: RdsUrlType = this.rdsUtils.identifyRdsType(this.monitoringHostInfo.url);

try {
if (rdsUrlType.isRdsCluster) {
logger.debug(Messages.get("HostMonitoringConnectionPlugin.identifyClusterConnection"));
this.monitoringHostInfo = await this.pluginService.identifyConnection(this.pluginService.getCurrentClient().targetClient!);
if (this.monitoringHostInfo == null) {
const host: HostInfo | null = this.pluginService.getCurrentHostInfo();
this.throwUnableToIdentifyConnection(host);
}
throw error;
await this.pluginService.fillAliases(this.pluginService.getCurrentClient().targetClient!, this.monitoringHostInfo);
}
} catch (error: any) {
if (!(error instanceof AwsWrapperError)) {
logger.debug(Messages.get("HostMonitoringConnectionPlugin.errorIdentifyingConnection", error.message));
}
throw error;
}

return this.monitoringHostInfo;
}

Expand Down
36 changes: 15 additions & 21 deletions common/lib/plugins/efm/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ import { PluginService } from "../../plugin_service";
import { logger } from "../../../logutils";
import { Messages } from "../../utils/messages";
import { ClientWrapper } from "../../client_wrapper";
import { sleep } from "../../utils/utils";
import { WrapperProperties } from "../../wrapper_property";
import { getCurrentTimeNano, sleep } from "../../utils/utils";
import { TelemetryFactory } from "../../utils/telemetry/telemetry_factory";
import { TelemetryCounter } from "../../utils/telemetry/telemetry_counter";
import { TelemetryTraceLevel } from "../../utils/telemetry/telemetry_trace_level";
import { HostResponseTimeMonitor } from "../strategy/fastest_response/host_response_time_monitor";
import { WrapperProperties } from "../../wrapper_property";

export interface Monitor {
startMonitoring(context: MonitorConnectionContext): void;
Expand Down Expand Up @@ -79,7 +80,7 @@ export class MonitorImpl implements Monitor {
this.properties = properties;
this.hostInfo = hostInfo;
this.monitorDisposalTimeMillis = monitorDisposalTimeMillis;
this.contextLastUsedTimestampNanos = this.getCurrentTimeNano();
this.contextLastUsedTimestampNanos = getCurrentTimeNano();
const instanceId = this.hostInfo.hostId ?? this.hostInfo.host;
this.instanceInvalidCounter = this.telemetryFactory.createCounter(`efm.hostUnhealthy.count.${instanceId}`);
}
Expand All @@ -94,7 +95,7 @@ export class MonitorImpl implements Monitor {
logger.warn(Messages.get("MonitorImpl.monitorIsStopped", this.hostInfo.host));
}

const currentTimeNanos: number = this.getCurrentTimeNano();
const currentTimeNanos: number = getCurrentTimeNano();
context.startMonitorTimeNano = currentTimeNanos;
this.contextLastUsedTimestampNanos = currentTimeNanos;
this.newContexts.push(context);
Expand All @@ -110,7 +111,7 @@ export class MonitorImpl implements Monitor {
}

context.isActiveContext = false;
this.contextLastUsedTimestampNanos = this.getCurrentTimeNano();
this.contextLastUsedTimestampNanos = getCurrentTimeNano();
}

async run(): Promise<void> {
Expand All @@ -121,7 +122,7 @@ export class MonitorImpl implements Monitor {
try {
let newMonitorContext: MonitorConnectionContext | undefined;
let firstAddedNewMonitorContext: MonitorConnectionContext | null = null;
const currentTimeNano: number = this.getCurrentTimeNano();
const currentTimeNano: number = getCurrentTimeNano();

while ((newMonitorContext = this.newContexts.shift()) != null) {
if (firstAddedNewMonitorContext === newMonitorContext) {
Expand All @@ -140,9 +141,9 @@ export class MonitorImpl implements Monitor {
}

if (this.activeContexts.length > 0) {
this.contextLastUsedTimestampNanos = this.getCurrentTimeNano();
this.contextLastUsedTimestampNanos = getCurrentTimeNano();

const statusCheckStartTimeNanos: number = this.getCurrentTimeNano();
const statusCheckStartTimeNanos: number = getCurrentTimeNano();
this.contextLastUsedTimestampNanos = statusCheckStartTimeNanos;

const status: ConnectionStatus = await this.checkConnectionStatus();
Expand Down Expand Up @@ -199,7 +200,7 @@ export class MonitorImpl implements Monitor {
this.delayMillisTimeoutId = setTimeout(resolve, delayMillis);
});
} else {
if (this.getCurrentTimeNano() - this.contextLastUsedTimestampNanos >= this.monitorDisposalTimeMillis * 1_000_000) {
if (getCurrentTimeNano() - this.contextLastUsedTimestampNanos >= this.monitorDisposalTimeMillis * 1_000_000) {
break;
}
await new Promise((resolve) => {
Expand Down Expand Up @@ -229,36 +230,33 @@ export class MonitorImpl implements Monitor {
const connectContext = this.telemetryFactory.openTelemetryContext("Connection status check", TelemetryTraceLevel.FORCE_TOP_LEVEL);
connectContext.setAttribute("url", this.hostInfo.host);
return await connectContext.start(async () => {
const startNanos = this.getCurrentTimeNano();
const startNanos = getCurrentTimeNano();
try {
const clientIsValid = this.monitoringClient && (await this.pluginService.isClientValid(this.monitoringClient));

if (this.monitoringClient !== null && clientIsValid) {
return Promise.resolve(new ConnectionStatus(clientIsValid, this.getCurrentTimeNano() - startNanos));
return Promise.resolve(new ConnectionStatus(clientIsValid, getCurrentTimeNano() - startNanos));
}

await this.endMonitoringClient();

// Open a new connection.
const monitoringConnProperties: Map<string, any> = new Map(this.properties);

for (const key of this.properties.keys()) {
for (const key of monitoringConnProperties.keys()) {
if (!key.startsWith(WrapperProperties.MONITORING_PROPERTY_PREFIX)) {
continue;
}

monitoringConnProperties.set(key.substring(WrapperProperties.MONITORING_PROPERTY_PREFIX.length), this.properties.get(key));
monitoringConnProperties.delete(key);
}

logger.debug(`Opening a monitoring connection to ${this.hostInfo.url}`);
this.monitoringClient = await this.pluginService.forceConnect(this.hostInfo, monitoringConnProperties);
logger.debug(`Successfully opened monitoring connection to ${this.monitoringClient.id} - ${this.hostInfo.url}`);
return Promise.resolve(new ConnectionStatus(true, this.getCurrentTimeNano() - startNanos));
return Promise.resolve(new ConnectionStatus(true, getCurrentTimeNano() - startNanos));
} catch (error: any) {
this.instanceInvalidCounter.inc();
await this.endMonitoringClient();
return Promise.resolve(new ConnectionStatus(false, this.getCurrentTimeNano() - startNanos));
return Promise.resolve(new ConnectionStatus(false, getCurrentTimeNano() - startNanos));
}
});
}
Expand All @@ -272,10 +270,6 @@ export class MonitorImpl implements Monitor {
return this.stopped || this.cancelled;
}

protected getCurrentTimeNano() {
return Number(process.hrtime.bigint());
}

async releaseResources() {
this.cancelled = true;
clearTimeout(this.delayMillisTimeoutId);
Expand Down
5 changes: 2 additions & 3 deletions common/lib/plugins/efm/monitor_connection_context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export class MonitorConnectionContext {

constructor(
monitor: Monitor,
clientToAbort: any,
clientToAbort: ClientWrapper,
failureDetectionTimeMillis: number,
failureDetectionIntervalMillis: number,
failureDetectionCount: number,
Expand Down Expand Up @@ -127,15 +127,14 @@ export class MonitorConnectionContext {

const invalidHostDurationNano: number = statusCheckEndNano - this.invalidHostStartTimeNano;
const maxInvalidHostDurationMillis: number = this.failureDetectionIntervalMillis * Math.max(0, this.failureDetectionCount);

if (this.failureCount >= this.failureDetectionCount || invalidHostDurationNano >= maxInvalidHostDurationMillis * 1_000_000) {
logger.debug(Messages.get("MonitorConnectionContext.hostDead", hostName));
this.isHostUnhealthy = true;
await this.abortConnection();
return;
}

logger.debug(Messages.get("MonitorConnectionContext.hostNotResponding", hostName, String(this.failureCount)));
logger.debug(Messages.get("MonitorConnectionContext.hostNotResponding", hostName));
return;
}

Expand Down
7 changes: 4 additions & 3 deletions common/lib/plugins/efm/monitor_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ import { WrapperProperties } from "../../wrapper_property";
import { SlidingExpirationCache } from "../../utils/sliding_expiration_cache";
import { PluginService } from "../../plugin_service";
import { Messages } from "../../utils/messages";
import { ClientWrapper } from "../../client_wrapper";

export interface MonitorService {
startMonitoring(
clientToAbort: any,
clientToAbort: ClientWrapper,
hostKeys: Set<string>,
hostInfo: HostInfo,
properties: Map<string, any>,
Expand Down Expand Up @@ -59,7 +60,7 @@ export class MonitorServiceImpl implements MonitorService {
}

async startMonitoring(
clientToAbort: any,
clientToAbort: ClientWrapper,
hostKeys: Set<string>,
hostInfo: HostInfo,
properties: Map<string, any>,
Expand Down Expand Up @@ -158,7 +159,7 @@ export class MonitorServiceImpl implements MonitorService {
}

async releaseResources() {
for (const [key, monitor] of MonitorServiceImpl.monitors.entries) {
for (const [_key, monitor] of MonitorServiceImpl.monitors.entries) {
if (monitor.item) {
await monitor.item.releaseResources();
}
Expand Down
Loading

0 comments on commit 64227d1

Please sign in to comment.