Skip to content

Commit

Permalink
refactor: sliding expiration cache with cleanup task
Browse files Browse the repository at this point in the history
  • Loading branch information
sophia-bq committed Feb 4, 2025
1 parent 98fcda0 commit 083d56f
Show file tree
Hide file tree
Showing 16 changed files with 292 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,19 @@ import { Messages } from "../../utils/messages";
import { WrapperProperties } from "../../wrapper_property";
import { BlockingHostListProvider } from "../host_list_provider";
import { logger } from "../../../logutils";
import { SlidingExpirationCacheWithCleanupTask } from "../../utils/sliding_expiration_cache_with_cleanup_task";

export class MonitoringRdsHostListProvider extends RdsHostListProvider implements BlockingHostListProvider {
static readonly CACHE_CLEANUP_NANOS: bigint = BigInt(60_000_000_000); // 1 minute.
static readonly MONITOR_EXPIRATION_NANOS: bigint = BigInt(15 * 60_000_000_000); // 15 minutes.
static readonly DEFAULT_TOPOLOGY_QUERY_TIMEOUT_MS = 5000; // 5 seconds.

private static monitors: SlidingExpirationCache<string, ClusterTopologyMonitor> = new SlidingExpirationCache(
private static monitors: SlidingExpirationCacheWithCleanupTask<string, ClusterTopologyMonitor> = new SlidingExpirationCacheWithCleanupTask(
MonitoringRdsHostListProvider.CACHE_CLEANUP_NANOS,
() => true,
async (monitor: ClusterTopologyMonitor) => {
async (item: ClusterTopologyMonitor) => {
try {
await monitor.close();
await item.close();
} catch {
// Ignore.
}
Expand All @@ -54,13 +55,7 @@ export class MonitoringRdsHostListProvider extends RdsHostListProvider implement

async clearAll(): Promise<void> {
RdsHostListProvider.clearAll();
// TODO: refactor when sliding-expiration-cache refactoring is merged.
for (const [key, monitor] of MonitoringRdsHostListProvider.monitors.entries) {
if (monitor !== undefined) {
await monitor.item.close();
}
}
MonitoringRdsHostListProvider.monitors.clear();
await MonitoringRdsHostListProvider.monitors.clear();
}

async queryForTopology(targetClient: ClientWrapper, dialect: DatabaseDialect): Promise<HostInfo[]> {
Expand Down
21 changes: 6 additions & 15 deletions common/lib/internal_pooled_connection_provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import { PluginService } from "./plugin_service";
import { WrapperProperties } from "./wrapper_property";
import { CanReleaseResources } from "./can_release_resources";
import { SlidingExpirationCache } from "./utils/sliding_expiration_cache";
import { PoolKey } from "./utils/pool_key";
import { PooledConnectionProvider } from "./pooled_connection_provider";
import { HostInfo } from "./host_info";
Expand All @@ -39,14 +38,15 @@ import { AwsPoolConfig } from "./aws_pool_config";
import { LeastConnectionsHostSelector } from "./least_connections_host_selector";
import { PoolClientWrapper } from "./pool_client_wrapper";
import { logger } from "../logutils";
import { SlidingExpirationCacheWithCleanupTask } from "./utils/sliding_expiration_cache_with_cleanup_task";

export class InternalPooledConnectionProvider implements PooledConnectionProvider, CanReleaseResources {
static readonly CACHE_CLEANUP_NANOS: bigint = BigInt(10 * 60_000_000_000); // 10 minutes
static readonly POOL_EXPIRATION_NANOS: bigint = BigInt(30 * 60_000_000_000); // 30 minutes
protected static databasePools: SlidingExpirationCache<string, any> = new SlidingExpirationCache(
protected static databasePools: SlidingExpirationCacheWithCleanupTask<string, AwsPoolClient> = new SlidingExpirationCacheWithCleanupTask(
InternalPooledConnectionProvider.CACHE_CLEANUP_NANOS,
(pool: any) => pool.getActiveCount() === 0,
(pool: any) => pool.end()
(pool: AwsPoolClient) => pool.getActiveCount() === 0,
async (pool: AwsPoolClient) => await pool.end()
);

private static readonly acceptedStrategies: Map<string, HostSelector> = new Map([
Expand Down Expand Up @@ -122,16 +122,7 @@ export class InternalPooledConnectionProvider implements PooledConnectionProvide
}

async releaseResources() {
for (const [_key, value] of InternalPooledConnectionProvider.databasePools.entries) {
if (value.item) {
await value.item.releaseResources();
}
}
InternalPooledConnectionProvider.clearDatabasePools();
}

static clearDatabasePools() {
InternalPooledConnectionProvider.databasePools.clear();
await InternalPooledConnectionProvider.databasePools.clear();
}

getHostInfoByStrategy(hosts: HostInfo[], role: HostRole, strategy: string, props?: Map<string, any>): HostInfo {
Expand Down Expand Up @@ -177,7 +168,7 @@ export class InternalPooledConnectionProvider implements PooledConnectionProvide
}

// for testing only
setDatabasePools(connectionPools: SlidingExpirationCache<string, any>): void {
setDatabasePools(connectionPools: SlidingExpirationCacheWithCleanupTask<string, any>): void {
InternalPooledConnectionProvider.databasePools = connectionPools;
LeastConnectionsHostSelector.setDatabasePools(connectionPools);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,6 @@ export class HostMonitoringConnectionPlugin extends AbstractConnectionPlugin imp
}

async releaseResources(): Promise<void> {
return this.monitorService.releaseResources();
await this.monitorService.releaseResources();
}
}
25 changes: 12 additions & 13 deletions common/lib/plugins/efm/monitor_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ import { HostInfo } from "../../host_info";
import { AwsWrapperError, IllegalArgumentError } from "../../utils/errors";
import { Monitor, MonitorImpl } from "./monitor";
import { WrapperProperties } from "../../wrapper_property";
import { SlidingExpirationCache } from "../../utils/sliding_expiration_cache";
import { PluginService } from "../../plugin_service";
import { Messages } from "../../utils/messages";
import { SlidingExpirationCacheWithCleanupTask } from "../../utils/sliding_expiration_cache_with_cleanup_task";

export interface MonitorService {
startMonitoring(
Expand All @@ -43,10 +43,12 @@ export interface MonitorService {

export class MonitorServiceImpl implements MonitorService {
private static readonly CACHE_CLEANUP_NANOS = BigInt(60_000_000_000);
protected static readonly monitors: SlidingExpirationCache<string, Monitor> = new SlidingExpirationCache(
protected static readonly monitors: SlidingExpirationCacheWithCleanupTask<string, Monitor> = new SlidingExpirationCacheWithCleanupTask(
MonitorServiceImpl.CACHE_CLEANUP_NANOS,
undefined,
() => {}
async (monitor: Monitor) => {
await monitor.releaseResources();
}
);
private readonly pluginService: PluginService;
private cachedMonitorHostKeys: Set<string> | undefined;
Expand Down Expand Up @@ -107,7 +109,7 @@ export class MonitorServiceImpl implements MonitorService {
}

stopMonitoringForAllConnections(hostKeys: Set<string>) {
let monitor;
let monitor: Monitor;
for (const hostKey of hostKeys) {
monitor = MonitorServiceImpl.monitors.get(hostKey);
if (monitor) {
Expand All @@ -118,8 +120,8 @@ export class MonitorServiceImpl implements MonitorService {
}

async getMonitor(hostKeys: Set<string>, hostInfo: HostInfo, properties: Map<string, any>): Promise<Monitor | null> {
let monitor;
let anyHostKey;
let monitor: Monitor;
let anyHostKey: string;
for (const hostKey of hostKeys) {
monitor = MonitorServiceImpl.monitors.get(hostKey);
anyHostKey = hostKey;
Expand Down Expand Up @@ -158,16 +160,13 @@ export class MonitorServiceImpl implements MonitorService {
}

async releaseResources() {
for (const [key, monitor] of MonitorServiceImpl.monitors.entries) {
if (monitor.item) {
await monitor.item.releaseResources();
}
}
await MonitorServiceImpl.monitors.clear();
this.cachedMonitorHostKeys = undefined;
this.cachedMonitorRef = undefined;
}

static clearMonitors() {
MonitorServiceImpl.monitors.clear();
// Used for performance testing.
static async clearMonitors() {
await MonitorServiceImpl.monitors.clear();
}
}
16 changes: 9 additions & 7 deletions common/lib/plugins/limitless/limitless_router_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import { HostRole } from "../../host_role";
import { HostAvailability } from "../../host_availability/host_availability";
import { HighestWeightHostSelector } from "../../highest_weight_host_selector";
import { sleep } from "../../utils/utils";
import { SlidingExpirationCacheWithCleanupTask } from "../../utils/sliding_expiration_cache_with_cleanup_task";

export interface LimitlessRouterService {
startMonitor(hostInfo: HostInfo, properties: Map<string, any>): void;
Expand All @@ -40,11 +41,12 @@ export interface LimitlessRouterService {

export class LimitlessRouterServiceImpl implements LimitlessRouterService {
protected static readonly CACHE_CLEANUP_NANOS = BigInt(60_000_000_000); // 1 min
protected static readonly monitors: SlidingExpirationCache<string, LimitlessRouterMonitor> = new SlidingExpirationCache(
LimitlessRouterServiceImpl.CACHE_CLEANUP_NANOS,
undefined,
async (monitor) => await monitor.close()
);
protected static readonly monitors: SlidingExpirationCacheWithCleanupTask<string, LimitlessRouterMonitor> =
new SlidingExpirationCacheWithCleanupTask(
LimitlessRouterServiceImpl.CACHE_CLEANUP_NANOS,
undefined,
async (monitor: LimitlessRouterMonitor) => await monitor.close()
);
protected static readonly limitlessRouterCache: SlidingExpirationCache<string, HostInfo[]> = new SlidingExpirationCache(
LimitlessRouterServiceImpl.CACHE_CLEANUP_NANOS,
undefined,
Expand Down Expand Up @@ -254,7 +256,7 @@ export class LimitlessRouterServiceImpl implements LimitlessRouterService {
}
}

static clearMonitors() {
LimitlessRouterServiceImpl.monitors.clear();
static async clearMonitors() {
await LimitlessRouterServiceImpl.monitors.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ export class HostResponseTimeServiceImpl implements HostResponseTimeService {
readonly intervalMs: number;
protected hosts: HostInfo[];
private readonly telemetryFactory: TelemetryFactory;
protected static monitoringHosts: SlidingExpirationCache<string, any> = new SlidingExpirationCache(
protected static monitoringHosts: SlidingExpirationCache<string, HostResponseTimeMonitor> = new SlidingExpirationCache(
HostResponseTimeServiceImpl.CACHE_CLEANUP_NANOS,
undefined,
async (monitor: HostResponseTimeMonitor) => {
Expand Down
4 changes: 3 additions & 1 deletion common/lib/utils/locales/en.json
Original file line number Diff line number Diff line change
Expand Up @@ -224,5 +224,7 @@
"HostMonitor.startMonitoring": "Host monitor '%s' started.",
"HostMonitor.detectedWriter": "Detected writer: '%s'.",
"HostMonitor.endMonitoring": "Host monitor '%s' completed in '%s'.",
"HostMonitor.writerHostChanged": "Writer host has changed from '%s' to '%s'."
"HostMonitor.writerHostChanged": "Writer host has changed from '%s' to '%s'.",
"SlidingExpirationCacheWithCleanupTask.cleaningUp": "Cleanup interval of '%s' minutes has passed, cleaning up sliding expiration cache.",
"SlidingExpirationCacheWithCleanupTask.cleanUpTaskInterrupted": "Cleanup task has been interrupted and is exiting."
}
4 changes: 2 additions & 2 deletions common/lib/utils/sliding_expiration_cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class CacheItem<V> {
}

export class SlidingExpirationCache<K, V> {
private _cleanupIntervalNanos: bigint = BigInt(10 * 60_000_000_000); // 10 minutes
protected _cleanupIntervalNanos: bigint = BigInt(10 * 60_000_000_000); // 10 minutes.
private readonly _shouldDisposeFunc?: (item: V) => boolean;
private readonly _itemDisposalFunc?: (item: V) => void;
map: Map<K, CacheItem<V>> = new Map<K, CacheItem<V>>();
Expand Down Expand Up @@ -116,7 +116,7 @@ export class SlidingExpirationCache<K, V> {
return cacheItem;
});

if (item != undefined && item != null && this._itemDisposalFunc != null) {
if (item !== undefined && item !== null && this._itemDisposalFunc !== null) {
this._itemDisposalFunc(item);
}
}
Expand Down
106 changes: 106 additions & 0 deletions common/lib/utils/sliding_expiration_cache_with_cleanup_task.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License").
You may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

import { convertNanosToMinutes, convertNanosToMs, sleepWithAbort } from "./utils";
import { MapUtils } from "./map_utils";
import { SlidingExpirationCache } from "./sliding_expiration_cache";
import { Messages } from "./messages";
import { logger } from "../../logutils";

export class SlidingExpirationCacheWithCleanupTask<K, V> extends SlidingExpirationCache<K, V> {
private readonly _asyncItemDisposalFunc?: (item: V) => Promise<void>;
private stopCleanupTask: boolean = false;
private cleanupTask: Promise<void>;
private interruptCleanupTask: () => void;
private isInitialized: boolean = false;

constructor(cleanupIntervalNanos: bigint, shouldDisposeFunc?: (item: V) => boolean, asyncItemDisposalFunc?: (item: V) => Promise<void>) {
super(cleanupIntervalNanos, shouldDisposeFunc);
this._asyncItemDisposalFunc = asyncItemDisposalFunc;
}

async clear(): Promise<void> {
this.stopCleanupTask = true;
// If the cleanup task is currently sleeping this will interrupt it.
this.interruptCleanupTask();
await this.cleanupTask;

for (const [key, val] of this.map.entries()) {
if (val !== undefined && this._asyncItemDisposalFunc !== undefined) {
await this._asyncItemDisposalFunc(val.item);
}
}
this.map.clear();
}

computeIfAbsent(key: K, mappingFunc: (key: K) => V, itemExpirationNanos: bigint): V | null {
if (!this.isInitialized) {
this.cleanupTask = this.initCleanupTask();
}
return super.computeIfAbsent(key, mappingFunc, itemExpirationNanos);
}

putIfAbsent(key: K, value: V, itemExpirationNanos: bigint): V | null {
if (!this.isInitialized) {
this.cleanupTask = this.initCleanupTask();
}
return super.putIfAbsent(key, value, itemExpirationNanos);
}

put(key: K, value: V, itemExpirationNanos: bigint): V | null {
if (!this.isInitialized) {
this.cleanupTask = this.initCleanupTask();
}
return super.put(key, value, itemExpirationNanos);
}

protected cleanUp(): void {
// Intentionally does nothing, cleanup task performs this job.
}

async initCleanupTask(): Promise<void> {
this.isInitialized = true;
while (!this.stopCleanupTask) {
const [sleepPromise, temp] = sleepWithAbort(
convertNanosToMs(this._cleanupIntervalNanos),
Messages.get("SlidingExpirationCacheWithCleanupTask.cleanUpTaskInterrupted")
);
this.interruptCleanupTask = temp;
try {
await sleepPromise;
} catch (error) {
// Sleep has been interrupted, exit cleanup task.
logger.info(error.message);
return;
}

logger.info(Messages.get("SlidingExpirationCacheWithCleanupTask.cleaningUp", convertNanosToMinutes(this._cleanupIntervalNanos).toString()));

const itemsToRemove = [];
for (const [key, val] of this.map.entries()) {
if (val !== undefined && this._asyncItemDisposalFunc !== undefined && this.shouldCleanupItem(val)) {
MapUtils.remove(this.map, key);
itemsToRemove.push(this._asyncItemDisposalFunc(val.item));
}
}
try {
await Promise.all(itemsToRemove);
} catch (error) {
// Ignore.
}
}
}
}
20 changes: 20 additions & 0 deletions common/lib/utils/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,18 @@ export function sleep(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms));
}

export function sleepWithAbort(ms: number, message?: string) {
let abortSleep;
const promise = new Promise((resolve, reject) => {
const timeout = setTimeout(resolve, ms);
abortSleep = () => {
clearTimeout(timeout);
reject(new AwsWrapperError(message));
};
});
return [promise, abortSleep];
}

export function getTimeoutTask(timer: any, message: string, timeoutValue: number): Promise<void> {
return new Promise((_resolve, reject) => {
timer.timeoutId = setTimeout(() => {
Expand Down Expand Up @@ -52,6 +64,14 @@ export function getTimeInNanos() {
return process.hrtime.bigint();
}

export function convertNanosToMs(nanos: bigint) {
return Number(nanos) / 1000000;
}

export function convertNanosToMinutes(nanos: bigint) {
return Number(nanos) / 60000000000;
}

export function maskProperties(props: Map<string, any>) {
const maskedProperties = new Map(props);
if (maskedProperties.has(WrapperProperties.PASSWORD.name)) {
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/container/tests/performance.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ async function doMeasurePerformance(sleepDelayMillis: number, repeatTimes: numbe
try {
await ProxyHelper.enableAllConnectivity();
await client.end();
MonitorServiceImpl.clearMonitors();
await MonitorServiceImpl.clearMonitors();
} catch (error: any) {
// ignore
}
Expand Down
Loading

0 comments on commit 083d56f

Please sign in to comment.