Skip to content

Commit

Permalink
feat: efm2
Browse files Browse the repository at this point in the history
  • Loading branch information
joyc-bq committed Feb 7, 2025
1 parent c6478cb commit 7a3108b
Show file tree
Hide file tree
Showing 22 changed files with 930 additions and 169 deletions.
3 changes: 3 additions & 0 deletions common/lib/connection_plugin_chain_builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import { ConnectionPluginFactory } from "./plugin_factory";
import { LimitlessConnectionPluginFactory } from "./plugins/limitless/limitless_connection_plugin_factory";
import { FastestResponseStrategyPluginFactory } from "./plugins/strategy/fastest_response/fastest_respose_strategy_plugin_factory";
import { ConfigurationProfile } from "./profile/configuration_profile";
import { HostMonitoring2PluginFactory } from "./plugins/efm2/host_monitoring2_plugin_factory";

/*
Type alias used for plugin factory sorting. It holds a reference to a plugin
Expand All @@ -61,6 +62,7 @@ export class ConnectionPluginChainBuilder {
["failover", { factory: FailoverPluginFactory, weight: 700 }],
["failover2", { factory: Failover2PluginFactory, weight: 710 }],
["efm", { factory: HostMonitoringPluginFactory, weight: 800 }],
["efm2", { factory: HostMonitoring2PluginFactory, weight: 810 }],
["fastestResponseStrategy", { factory: FastestResponseStrategyPluginFactory, weight: 900 }],
["limitless", { factory: LimitlessConnectionPluginFactory, weight: 950 }],
["iam", { factory: IamAuthenticationPluginFactory, weight: 1000 }],
Expand All @@ -80,6 +82,7 @@ export class ConnectionPluginChainBuilder {
[FailoverPluginFactory, 700],
[Failover2PluginFactory, 710],
[HostMonitoringPluginFactory, 800],
[HostMonitoring2PluginFactory, 810],
[LimitlessConnectionPluginFactory, 950],
[IamAuthenticationPluginFactory, 1000],
[AwsSecretsManagerPluginFactory, 1100],
Expand Down
47 changes: 24 additions & 23 deletions common/lib/plugins/efm/host_monitoring_connection_plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ export class HostMonitoringConnectionPlugin extends AbstractConnectionPlugin imp
} finally {
if (monitorContext != null) {
await this.monitorService.stopMonitoring(monitorContext);
logger.debug(Messages.get("HostMonitoringConnectionPlugin.monitoringDeactivated", methodName));

if (monitorContext.isHostUnhealthy) {
const monitoringHostInfo = await this.getMonitoringHostInfo();
Expand Down Expand Up @@ -148,7 +149,8 @@ export class HostMonitoringConnectionPlugin extends AbstractConnectionPlugin imp
return result;
}

private throwUnableToIdentifyConnection(host: HostInfo | null, provider: HostListProvider | null): never {
private throwUnableToIdentifyConnection(host: HostInfo | null): never {
const provider: HostListProvider | null = this.pluginService.getHostListProvider();
throw new AwsWrapperError(
Messages.get(
"HostMonitoringConnectionPlugin.unableToIdentifyConnection",
Expand All @@ -159,32 +161,31 @@ export class HostMonitoringConnectionPlugin extends AbstractConnectionPlugin imp
}

async getMonitoringHostInfo(): Promise<HostInfo> {
if (this.monitoringHostInfo) {
return this.monitoringHostInfo;
}
this.monitoringHostInfo = this.pluginService.getCurrentHostInfo();
if (this.monitoringHostInfo == null) {
this.monitoringHostInfo = this.pluginService.getCurrentHostInfo();
const provider: HostListProvider | null = this.pluginService.getHostListProvider();
if (this.monitoringHostInfo == null) {
this.throwUnableToIdentifyConnection(null, provider);
}
const rdsUrlType: RdsUrlType = this.rdsUtils.identifyRdsType(this.monitoringHostInfo.url);

try {
if (rdsUrlType.isRdsCluster) {
logger.debug("Monitoring host info is associated with a cluster endpoint, plugin needs to identify the cluster connection");
this.monitoringHostInfo = await this.pluginService.identifyConnection(this.pluginService.getCurrentClient().targetClient!);
if (this.monitoringHostInfo == null) {
const host: HostInfo | null = this.pluginService.getCurrentHostInfo();
this.throwUnableToIdentifyConnection(host, provider);
}
await this.pluginService.fillAliases(this.pluginService.getCurrentClient().targetClient!, this.monitoringHostInfo);
}
} catch (error: any) {
if (!(error instanceof AwsWrapperError)) {
logger.debug(Messages.get("HostMonitoringConnectionPlugin.errorIdentifyingConnection", error.message));
this.throwUnableToIdentifyConnection(null);
}
const rdsUrlType: RdsUrlType = this.rdsUtils.identifyRdsType(this.monitoringHostInfo.url);

try {
if (rdsUrlType.isRdsCluster) {
logger.debug(Messages.get("HostMonitoringConnectionPlugin.identifyClusterConnection"));
this.monitoringHostInfo = await this.pluginService.identifyConnection(this.pluginService.getCurrentClient().targetClient!);
if (this.monitoringHostInfo == null) {
const host: HostInfo | null = this.pluginService.getCurrentHostInfo();
this.throwUnableToIdentifyConnection(host);
}
throw error;
await this.pluginService.fillAliases(this.pluginService.getCurrentClient().targetClient!, this.monitoringHostInfo);
}
} catch (error: any) {
if (!(error instanceof AwsWrapperError)) {
logger.debug(Messages.get("HostMonitoringConnectionPlugin.errorIdentifyingConnection", error.message));
}
throw error;
}

return this.monitoringHostInfo;
}

Expand Down
39 changes: 16 additions & 23 deletions common/lib/plugins/efm/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ import { PluginService } from "../../plugin_service";
import { logger } from "../../../logutils";
import { Messages } from "../../utils/messages";
import { ClientWrapper } from "../../client_wrapper";
import { sleep } from "../../utils/utils";
import { WrapperProperties } from "../../wrapper_property";
import { getCurrentTimeNano, sleep } from "../../utils/utils";
import { TelemetryFactory } from "../../utils/telemetry/telemetry_factory";
import { TelemetryCounter } from "../../utils/telemetry/telemetry_counter";
import { TelemetryTraceLevel } from "../../utils/telemetry/telemetry_trace_level";
import { HostResponseTimeMonitor } from "../strategy/fastest_response/host_response_time_monitor";

export interface Monitor {
startMonitoring(context: MonitorConnectionContext): void;
Expand Down Expand Up @@ -79,7 +79,7 @@ export class MonitorImpl implements Monitor {
this.properties = properties;
this.hostInfo = hostInfo;
this.monitorDisposalTimeMillis = monitorDisposalTimeMillis;
this.contextLastUsedTimestampNanos = this.getCurrentTimeNano();
this.contextLastUsedTimestampNanos = getCurrentTimeNano();
const instanceId = this.hostInfo.hostId ?? this.hostInfo.host;
this.instanceInvalidCounter = this.telemetryFactory.createCounter(`efm.hostUnhealthy.count.${instanceId}`);
}
Expand All @@ -94,7 +94,7 @@ export class MonitorImpl implements Monitor {
logger.warn(Messages.get("MonitorImpl.monitorIsStopped", this.hostInfo.host));
}

const currentTimeNanos: number = this.getCurrentTimeNano();
const currentTimeNanos: number = getCurrentTimeNano();
context.startMonitorTimeNano = currentTimeNanos;
this.contextLastUsedTimestampNanos = currentTimeNanos;
this.newContexts.push(context);
Expand All @@ -110,7 +110,7 @@ export class MonitorImpl implements Monitor {
}

context.isActiveContext = false;
this.contextLastUsedTimestampNanos = this.getCurrentTimeNano();
this.contextLastUsedTimestampNanos = getCurrentTimeNano();
}

async run(): Promise<void> {
Expand All @@ -121,7 +121,7 @@ export class MonitorImpl implements Monitor {
try {
let newMonitorContext: MonitorConnectionContext | undefined;
let firstAddedNewMonitorContext: MonitorConnectionContext | null = null;
const currentTimeNano: number = this.getCurrentTimeNano();
const currentTimeNano: number = getCurrentTimeNano();

while ((newMonitorContext = this.newContexts.shift()) != null) {
if (firstAddedNewMonitorContext === newMonitorContext) {
Expand All @@ -140,9 +140,9 @@ export class MonitorImpl implements Monitor {
}

if (this.activeContexts.length > 0) {
this.contextLastUsedTimestampNanos = this.getCurrentTimeNano();
this.contextLastUsedTimestampNanos = getCurrentTimeNano();

const statusCheckStartTimeNanos: number = this.getCurrentTimeNano();
const statusCheckStartTimeNanos: number = getCurrentTimeNano();
this.contextLastUsedTimestampNanos = statusCheckStartTimeNanos;

const status: ConnectionStatus = await this.checkConnectionStatus();
Expand Down Expand Up @@ -199,7 +199,7 @@ export class MonitorImpl implements Monitor {
this.delayMillisTimeoutId = setTimeout(resolve, delayMillis);
});
} else {
if (this.getCurrentTimeNano() - this.contextLastUsedTimestampNanos >= this.monitorDisposalTimeMillis * 1_000_000) {
if (getCurrentTimeNano() - this.contextLastUsedTimestampNanos >= this.monitorDisposalTimeMillis * 1_000_000) {
break;
}
await new Promise((resolve) => {
Expand Down Expand Up @@ -229,36 +229,33 @@ export class MonitorImpl implements Monitor {
const connectContext = this.telemetryFactory.openTelemetryContext("Connection status check", TelemetryTraceLevel.FORCE_TOP_LEVEL);
connectContext.setAttribute("url", this.hostInfo.host);
return await connectContext.start(async () => {
const startNanos = this.getCurrentTimeNano();
const startNanos = getCurrentTimeNano();
try {
const clientIsValid = this.monitoringClient && (await this.pluginService.isClientValid(this.monitoringClient));

if (this.monitoringClient !== null && clientIsValid) {
return Promise.resolve(new ConnectionStatus(clientIsValid, this.getCurrentTimeNano() - startNanos));
return Promise.resolve(new ConnectionStatus(clientIsValid, getCurrentTimeNano() - startNanos));
}

await this.endMonitoringClient();

// Open a new connection.
const monitoringConnProperties: Map<string, any> = new Map(this.properties);

for (const key of this.properties.keys()) {
if (!key.startsWith(WrapperProperties.MONITORING_PROPERTY_PREFIX)) {
for (const key of monitoringConnProperties.keys()) {
if (!key.startsWith(HostResponseTimeMonitor.MONITORING_PROPERTY_PREFIX)) {
continue;
}

monitoringConnProperties.set(key.substring(WrapperProperties.MONITORING_PROPERTY_PREFIX.length), this.properties.get(key));
monitoringConnProperties.set(key.substring(HostResponseTimeMonitor.MONITORING_PROPERTY_PREFIX.length), this.properties.get(key));
monitoringConnProperties.delete(key);
}

logger.debug(`Opening a monitoring connection to ${this.hostInfo.url}`);
this.monitoringClient = await this.pluginService.forceConnect(this.hostInfo, monitoringConnProperties);
logger.debug(`Successfully opened monitoring connection to ${this.monitoringClient.id} - ${this.hostInfo.url}`);
return Promise.resolve(new ConnectionStatus(true, this.getCurrentTimeNano() - startNanos));
return Promise.resolve(new ConnectionStatus(true, getCurrentTimeNano() - startNanos));
} catch (error: any) {
this.instanceInvalidCounter.inc();
await this.endMonitoringClient();
return Promise.resolve(new ConnectionStatus(false, this.getCurrentTimeNano() - startNanos));
return Promise.resolve(new ConnectionStatus(false, getCurrentTimeNano() - startNanos));
}
});
}
Expand All @@ -272,10 +269,6 @@ export class MonitorImpl implements Monitor {
return this.stopped || this.cancelled;
}

protected getCurrentTimeNano() {
return Number(process.hrtime.bigint());
}

async releaseResources() {
this.cancelled = true;
clearTimeout(this.delayMillisTimeoutId);
Expand Down
1 change: 0 additions & 1 deletion common/lib/plugins/efm/monitor_connection_context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ export class MonitorConnectionContext {

const invalidHostDurationNano: number = statusCheckEndNano - this.invalidHostStartTimeNano;
const maxInvalidHostDurationMillis: number = this.failureDetectionIntervalMillis * Math.max(0, this.failureDetectionCount);

if (this.failureCount >= this.failureDetectionCount || invalidHostDurationNano >= maxInvalidHostDurationMillis * 1_000_000) {
logger.debug(Messages.get("MonitorConnectionContext.hostDead", hostName));
this.isHostUnhealthy = true;
Expand Down
2 changes: 1 addition & 1 deletion common/lib/plugins/efm/monitor_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ export class MonitorServiceImpl implements MonitorService {
}

async releaseResources() {
for (const [key, monitor] of MonitorServiceImpl.monitors.entries) {
for (const [_key, monitor] of MonitorServiceImpl.monitors.entries) {
if (monitor.item) {
await monitor.item.releaseResources();
}
Expand Down
Loading

0 comments on commit 7a3108b

Please sign in to comment.