diff --git a/common/lib/connection_plugin_chain_builder.ts b/common/lib/connection_plugin_chain_builder.ts index dd172cad..6c192d66 100644 --- a/common/lib/connection_plugin_chain_builder.ts +++ b/common/lib/connection_plugin_chain_builder.ts @@ -82,6 +82,7 @@ export class ConnectionPluginChainBuilder { [FailoverPluginFactory, 700], [Failover2PluginFactory, 710], [HostMonitoringPluginFactory, 800], + [HostMonitoring2PluginFactory, 810], [LimitlessConnectionPluginFactory, 950], [IamAuthenticationPluginFactory, 1000], [AwsSecretsManagerPluginFactory, 1100], diff --git a/common/lib/plugins/efm/monitor.ts b/common/lib/plugins/efm/monitor.ts index 24ceb41e..7746b6f7 100644 --- a/common/lib/plugins/efm/monitor.ts +++ b/common/lib/plugins/efm/monitor.ts @@ -24,6 +24,7 @@ import { getCurrentTimeNano, sleep } from "../../utils/utils"; import { TelemetryFactory } from "../../utils/telemetry/telemetry_factory"; import { TelemetryCounter } from "../../utils/telemetry/telemetry_counter"; import { TelemetryTraceLevel } from "../../utils/telemetry/telemetry_trace_level"; +import { HostResponseTimeMonitor } from "../strategy/fastest_response/host_response_time_monitor"; export interface Monitor { startMonitoring(context: MonitorConnectionContext): void; @@ -238,8 +239,17 @@ export class MonitorImpl implements Monitor { await this.endMonitoringClient(); // Open a new connection. + const monitoringConnProperties: Map = new Map(this.properties); + for (const key of monitoringConnProperties.keys()) { + if (!key.startsWith(HostResponseTimeMonitor.MONITORING_PROPERTY_PREFIX)) { + continue; + } + monitoringConnProperties.set(key.substring(HostResponseTimeMonitor.MONITORING_PROPERTY_PREFIX.length), this.properties.get(key)); + monitoringConnProperties.delete(key); + } + logger.debug(`Opening a monitoring connection to ${this.hostInfo.url}`); - this.monitoringClient = await this.pluginService.forceConnect(this.hostInfo, this.properties); + this.monitoringClient = await this.pluginService.forceConnect(this.hostInfo, monitoringConnProperties); logger.debug(`Successfully opened monitoring connection to ${this.monitoringClient.id} - ${this.hostInfo.url}`); return Promise.resolve(new ConnectionStatus(true, getCurrentTimeNano() - startNanos)); } catch (error: any) { diff --git a/common/lib/plugins/efm2/monitor.ts b/common/lib/plugins/efm2/monitor.ts index 43bc713c..a7294574 100644 --- a/common/lib/plugins/efm2/monitor.ts +++ b/common/lib/plugins/efm2/monitor.ts @@ -26,6 +26,7 @@ import { TelemetryCounter } from "../../utils/telemetry/telemetry_counter"; import { TelemetryTraceLevel } from "../../utils/telemetry/telemetry_trace_level"; import { HostAvailability } from "../../host_availability/host_availability"; import { MapUtils } from "../../utils/map_utils"; +import { HostResponseTimeMonitor } from "../strategy/fastest_response/host_response_time_monitor"; export interface Monitor { startMonitoring(context: MonitorConnectionContext): void; @@ -42,7 +43,7 @@ export interface Monitor { } export class MonitorImpl implements Monitor { - private static readonly TASK_SLEEP_NANOS: number = 100 * 1000000; + private static readonly TASK_SLEEP_MILLIS: number = 100; private activeContexts: WeakRef[] = []; static newContexts: Map>> = new Map>>(); private readonly pluginService: PluginService; @@ -89,9 +90,8 @@ export class MonitorImpl implements Monitor { this.telemetryFactory.createGauge(`efm2.activeContexts.size.${hostId}`, () => this.activeContexts.length === Number.MAX_SAFE_INTEGER); this.telemetryFactory.createGauge(`efm2.hostHealthy.${hostId}`, () => (this.hostUnhealthy ? 0 : 1)); - const task1 = this.newContextRun(); - const task2 = this.run(); - Promise.race([task1, task2]).finally(() => { + + Promise.race([this.newContextRun(), this.run()]).finally(() => { this.stopped = true; }); } @@ -130,9 +130,9 @@ export class MonitorImpl implements Monitor { // Add all contexts to an active monitoring contexts queue. // Ignore disposed contexts. let monitorContextRef: WeakRef | undefined; - while ((monitorContextRef = queue.shift()) != null) { + while ((monitorContextRef = queue.shift())) { const monitorContext: MonitorConnectionContext = monitorContextRef.deref(); - if (monitorContext !== null && monitorContext.isActive()) { + if (monitorContext && monitorContext.isActive()) { this.activeContexts.push(monitorContextRef); } } @@ -158,7 +158,7 @@ export class MonitorImpl implements Monitor { try { if (this.activeContexts.length === 0 && !this.hostUnhealthy) { await new Promise((resolve) => { - this.delayMillisTimeoutId = setTimeout(resolve, MonitorImpl.TASK_SLEEP_NANOS / 1000000); + this.delayMillisTimeoutId = setTimeout(resolve, MonitorImpl.TASK_SLEEP_MILLIS); }); } @@ -201,12 +201,12 @@ export class MonitorImpl implements Monitor { // Add active contexts back to the queue. this.activeContexts.push(...tmpActiveContexts); - const delayNanos = this.failureDetectionIntervalNanos - (statusCheckEndTimeNanos - statusCheckStartTimeNanos); + const delayMillis = this.failureDetectionIntervalNanos - (statusCheckEndTimeNanos - statusCheckStartTimeNanos) / 1000000; await new Promise((resolve) => { this.delayMillisTimeoutId = setTimeout( resolve, - delayNanos < MonitorImpl.TASK_SLEEP_NANOS ? MonitorImpl.TASK_SLEEP_NANOS / 1000000 : delayNanos / 1000000 + delayMillis < MonitorImpl.TASK_SLEEP_MILLIS ? MonitorImpl.TASK_SLEEP_MILLIS : delayMillis ); }); } @@ -235,6 +235,15 @@ export class MonitorImpl implements Monitor { try { if (!(await this.pluginService.isClientValid(this.monitoringClient))) { // Open a new connection. + const monitoringConnProperties: Map = new Map(this.properties); + for (const key of monitoringConnProperties.keys()) { + if (!key.startsWith(HostResponseTimeMonitor.MONITORING_PROPERTY_PREFIX)) { + continue; + } + monitoringConnProperties.set(key.substring(HostResponseTimeMonitor.MONITORING_PROPERTY_PREFIX.length), this.properties.get(key)); + monitoringConnProperties.delete(key); + } + logger.debug(`Opening a monitoring connection to ${this.hostInfo.url}`); this.monitoringClient = await this.pluginService.forceConnect(this.hostInfo, this.properties); logger.debug(`Successfully opened monitoring connection to ${this.monitoringClient.id} - ${this.hostInfo.url}`); @@ -259,7 +268,7 @@ export class MonitorImpl implements Monitor { updateHostHealthStatus(connectionValid: boolean, statusCheckStartNano: number, statusCheckEndNano: number): Promise { if (!connectionValid) { - this.failureCount += 1; + this.failureCount++; if (this.invalidHostStartTimeNano === 0) { this.invalidHostStartTimeNano = statusCheckStartNano;