Skip to content

Commit

Permalink
monitoring properties
Browse files Browse the repository at this point in the history
  • Loading branch information
joyc-bq committed Feb 7, 2025
1 parent 60ee14e commit 455a943
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 11 deletions.
1 change: 1 addition & 0 deletions common/lib/connection_plugin_chain_builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ export class ConnectionPluginChainBuilder {
[FailoverPluginFactory, 700],
[Failover2PluginFactory, 710],
[HostMonitoringPluginFactory, 800],
[HostMonitoring2PluginFactory, 810],
[LimitlessConnectionPluginFactory, 950],
[IamAuthenticationPluginFactory, 1000],
[AwsSecretsManagerPluginFactory, 1100],
Expand Down
12 changes: 11 additions & 1 deletion common/lib/plugins/efm/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import { getCurrentTimeNano, sleep } from "../../utils/utils";
import { TelemetryFactory } from "../../utils/telemetry/telemetry_factory";
import { TelemetryCounter } from "../../utils/telemetry/telemetry_counter";
import { TelemetryTraceLevel } from "../../utils/telemetry/telemetry_trace_level";
import { HostResponseTimeMonitor } from "../strategy/fastest_response/host_response_time_monitor";

export interface Monitor {
startMonitoring(context: MonitorConnectionContext): void;
Expand Down Expand Up @@ -238,8 +239,17 @@ export class MonitorImpl implements Monitor {

await this.endMonitoringClient();
// Open a new connection.
const monitoringConnProperties: Map<string, any> = new Map(this.properties);
for (const key of monitoringConnProperties.keys()) {
if (!key.startsWith(HostResponseTimeMonitor.MONITORING_PROPERTY_PREFIX)) {
continue;
}
monitoringConnProperties.set(key.substring(HostResponseTimeMonitor.MONITORING_PROPERTY_PREFIX.length), this.properties.get(key));
monitoringConnProperties.delete(key);
}

logger.debug(`Opening a monitoring connection to ${this.hostInfo.url}`);
this.monitoringClient = await this.pluginService.forceConnect(this.hostInfo, this.properties);
this.monitoringClient = await this.pluginService.forceConnect(this.hostInfo, monitoringConnProperties);
logger.debug(`Successfully opened monitoring connection to ${this.monitoringClient.id} - ${this.hostInfo.url}`);
return Promise.resolve(new ConnectionStatus(true, getCurrentTimeNano() - startNanos));
} catch (error: any) {
Expand Down
29 changes: 19 additions & 10 deletions common/lib/plugins/efm2/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import { TelemetryCounter } from "../../utils/telemetry/telemetry_counter";
import { TelemetryTraceLevel } from "../../utils/telemetry/telemetry_trace_level";
import { HostAvailability } from "../../host_availability/host_availability";
import { MapUtils } from "../../utils/map_utils";
import { HostResponseTimeMonitor } from "../strategy/fastest_response/host_response_time_monitor";

export interface Monitor {
startMonitoring(context: MonitorConnectionContext): void;
Expand All @@ -42,7 +43,7 @@ export interface Monitor {
}

export class MonitorImpl implements Monitor {
private static readonly TASK_SLEEP_NANOS: number = 100 * 1000000;
private static readonly TASK_SLEEP_MILLIS: number = 100;
private activeContexts: WeakRef<MonitorConnectionContext>[] = [];
static newContexts: Map<number, Array<WeakRef<MonitorConnectionContext>>> = new Map<number, Array<WeakRef<MonitorConnectionContext>>>();
private readonly pluginService: PluginService;
Expand Down Expand Up @@ -89,9 +90,8 @@ export class MonitorImpl implements Monitor {
this.telemetryFactory.createGauge(`efm2.activeContexts.size.${hostId}`, () => this.activeContexts.length === Number.MAX_SAFE_INTEGER);

this.telemetryFactory.createGauge(`efm2.hostHealthy.${hostId}`, () => (this.hostUnhealthy ? 0 : 1));
const task1 = this.newContextRun();
const task2 = this.run();
Promise.race([task1, task2]).finally(() => {

Promise.race([this.newContextRun(), this.run()]).finally(() => {
this.stopped = true;
});
}
Expand Down Expand Up @@ -130,9 +130,9 @@ export class MonitorImpl implements Monitor {
// Add all contexts to an active monitoring contexts queue.
// Ignore disposed contexts.
let monitorContextRef: WeakRef<MonitorConnectionContext> | undefined;
while ((monitorContextRef = queue.shift()) != null) {
while ((monitorContextRef = queue.shift())) {
const monitorContext: MonitorConnectionContext = monitorContextRef.deref();
if (monitorContext !== null && monitorContext.isActive()) {
if (monitorContext && monitorContext.isActive()) {
this.activeContexts.push(monitorContextRef);
}
}
Expand All @@ -158,7 +158,7 @@ export class MonitorImpl implements Monitor {
try {
if (this.activeContexts.length === 0 && !this.hostUnhealthy) {
await new Promise((resolve) => {
this.delayMillisTimeoutId = setTimeout(resolve, MonitorImpl.TASK_SLEEP_NANOS / 1000000);
this.delayMillisTimeoutId = setTimeout(resolve, MonitorImpl.TASK_SLEEP_MILLIS);
});
}

Expand Down Expand Up @@ -201,12 +201,12 @@ export class MonitorImpl implements Monitor {
// Add active contexts back to the queue.
this.activeContexts.push(...tmpActiveContexts);

const delayNanos = this.failureDetectionIntervalNanos - (statusCheckEndTimeNanos - statusCheckStartTimeNanos);
const delayMillis = this.failureDetectionIntervalNanos - (statusCheckEndTimeNanos - statusCheckStartTimeNanos) / 1000000;

await new Promise((resolve) => {
this.delayMillisTimeoutId = setTimeout(
resolve,
delayNanos < MonitorImpl.TASK_SLEEP_NANOS ? MonitorImpl.TASK_SLEEP_NANOS / 1000000 : delayNanos / 1000000
delayMillis < MonitorImpl.TASK_SLEEP_MILLIS ? MonitorImpl.TASK_SLEEP_MILLIS : delayMillis
);
});
}
Expand Down Expand Up @@ -235,6 +235,15 @@ export class MonitorImpl implements Monitor {
try {
if (!(await this.pluginService.isClientValid(this.monitoringClient))) {
// Open a new connection.
const monitoringConnProperties: Map<string, any> = new Map(this.properties);
for (const key of monitoringConnProperties.keys()) {
if (!key.startsWith(HostResponseTimeMonitor.MONITORING_PROPERTY_PREFIX)) {
continue;
}
monitoringConnProperties.set(key.substring(HostResponseTimeMonitor.MONITORING_PROPERTY_PREFIX.length), this.properties.get(key));
monitoringConnProperties.delete(key);
}

logger.debug(`Opening a monitoring connection to ${this.hostInfo.url}`);
this.monitoringClient = await this.pluginService.forceConnect(this.hostInfo, this.properties);
logger.debug(`Successfully opened monitoring connection to ${this.monitoringClient.id} - ${this.hostInfo.url}`);
Expand All @@ -259,7 +268,7 @@ export class MonitorImpl implements Monitor {

updateHostHealthStatus(connectionValid: boolean, statusCheckStartNano: number, statusCheckEndNano: number): Promise<void> {
if (!connectionValid) {
this.failureCount += 1;
this.failureCount++;

if (this.invalidHostStartTimeNano === 0) {
this.invalidHostStartTimeNano = statusCheckStartNano;
Expand Down

0 comments on commit 455a943

Please sign in to comment.