Skip to content

Commit

Permalink
apply review suggestions
Browse files Browse the repository at this point in the history
  • Loading branch information
sophia-bq committed Feb 5, 2025
1 parent 1544535 commit f0c5c4b
Show file tree
Hide file tree
Showing 8 changed files with 40 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import { RdsHostListProvider } from "../rds_host_list_provider";
import { HostInfo } from "../../host_info";
import { SlidingExpirationCache } from "../../utils/sliding_expiration_cache";
import { ClusterTopologyMonitor, ClusterTopologyMonitorImpl } from "./cluster_topology_monitor";
import { PluginService } from "../../plugin_service";
import { HostListProviderService } from "../../host_list_provider_service";
Expand All @@ -43,7 +42,8 @@ export class MonitoringRdsHostListProvider extends RdsHostListProvider implement
} catch {
// Ignore.
}
}
},
"MonitoringRdsHostListProvider.monitors"
);

private readonly pluginService: PluginService;
Expand Down
7 changes: 4 additions & 3 deletions common/lib/internal_pooled_connection_provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,11 @@ import { SlidingExpirationCacheWithCleanupTask } from "./utils/sliding_expiratio
export class InternalPooledConnectionProvider implements PooledConnectionProvider, CanReleaseResources {
static readonly CACHE_CLEANUP_NANOS: bigint = BigInt(10 * 60_000_000_000); // 10 minutes
static readonly POOL_EXPIRATION_NANOS: bigint = BigInt(30 * 60_000_000_000); // 30 minutes
protected static databasePools: SlidingExpirationCacheWithCleanupTask<string, AwsPoolClient> = new SlidingExpirationCacheWithCleanupTask(
protected static databasePools: SlidingExpirationCacheWithCleanupTask<string, any> = new SlidingExpirationCacheWithCleanupTask(
InternalPooledConnectionProvider.CACHE_CLEANUP_NANOS,
(pool: AwsPoolClient) => pool.getActiveCount() === 0,
async (pool: AwsPoolClient) => await pool.end()
(pool: any) => pool.getActiveCount() === 0,
async (pool: any) => await pool.end(),
"InternalPooledConnectionProvider.databasePools"
);

private static readonly acceptedStrategies: Map<string, HostSelector> = new Map([
Expand Down
3 changes: 2 additions & 1 deletion common/lib/plugins/efm/monitor_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ export class MonitorServiceImpl implements MonitorService {
undefined,
async (monitor: Monitor) => {
await monitor.releaseResources();
}
},
"MonitorServiceImpl.monitors"
);
private readonly pluginService: PluginService;
private cachedMonitorHostKeys: Set<string> | undefined;
Expand Down
3 changes: 2 additions & 1 deletion common/lib/plugins/limitless/limitless_router_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ export class LimitlessRouterServiceImpl implements LimitlessRouterService {
new SlidingExpirationCacheWithCleanupTask(
LimitlessRouterServiceImpl.CACHE_CLEANUP_NANOS,
undefined,
async (monitor: LimitlessRouterMonitor) => await monitor.close()
async (monitor: LimitlessRouterMonitor) => await monitor.close(),
"LimitlessRouterServiceImpl.monitors"
);
protected static readonly limitlessRouterCache: SlidingExpirationCache<string, HostInfo[]> = new SlidingExpirationCache(
LimitlessRouterServiceImpl.CACHE_CLEANUP_NANOS,
Expand Down
7 changes: 5 additions & 2 deletions common/lib/utils/locales/en.json
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,9 @@
"HostMonitor.detectedWriter": "Detected writer: '%s'.",
"HostMonitor.endMonitoring": "Host monitor '%s' completed in '%s'.",
"HostMonitor.writerHostChanged": "Writer host has changed from '%s' to '%s'.",
"SlidingExpirationCacheWithCleanupTask.cleaningUp": "Cleanup interval of '%s' minutes has passed, cleaning up sliding expiration cache.",
"SlidingExpirationCacheWithCleanupTask.cleanUpTaskInterrupted": "Cleanup task has been interrupted and is exiting."
"SlidingExpirationCacheWithCleanupTask.cleaningUp": "Cleanup interval of '%s' minutes has passed, cleaning up sliding expiration cache '%s'.",
"SlidingExpirationCacheWithCleanupTask.cleanUpTaskInterrupted": "Sliding expiration cache '%s' cleanup task has been interrupted and is exiting.",
"SlidingExpirationCacheWithCleanupTask.cleanUpTaskStopped": "Sliding expiration cache '%s' cleanup task has been stopped and is exiting.",
"SlidingExpirationCacheWithCleanupTask.clear": "Sliding expiration cache '%s' is being cleared, all resources will be released.",
"SlidingExpirationCacheWithCleanupTask.cleanUpTaskInitialized": "Sliding expiration cache '%s' cleanup task has been initialized."
}
25 changes: 18 additions & 7 deletions common/lib/utils/sliding_expiration_cache_with_cleanup_task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,29 @@ import { logger } from "../../logutils";

export class SlidingExpirationCacheWithCleanupTask<K, V> extends SlidingExpirationCache<K, V> {
private readonly _asyncItemDisposalFunc?: (item: V) => Promise<void>;
private readonly cacheId: string;
private stopCleanupTask: boolean = false;
private cleanupTask: Promise<void>;
private interruptCleanupTask: () => void;
private isInitialized: boolean = false;

constructor(cleanupIntervalNanos: bigint, shouldDisposeFunc?: (item: V) => boolean, asyncItemDisposalFunc?: (item: V) => Promise<void>) {
constructor(
cleanupIntervalNanos: bigint,
shouldDisposeFunc?: (item: V) => boolean,
asyncItemDisposalFunc?: (item: V) => Promise<void>,
cacheId?: string
) {
super(cleanupIntervalNanos, shouldDisposeFunc);
this._asyncItemDisposalFunc = asyncItemDisposalFunc;
this.cacheId = cacheId;
}

async clear(): Promise<void> {
logger.debug(Messages.get("SlidingExpirationCacheWithCleanupTask.clear", this.cacheId));
this.stopCleanupTask = true;
// If the cleanup task is currently sleeping this will interrupt it.
this.interruptCleanupTask();
await this.cleanupTask;

for (const [key, val] of this.map.entries()) {
if (val !== undefined && this._asyncItemDisposalFunc !== undefined) {
await this._asyncItemDisposalFunc(val.item);
Expand Down Expand Up @@ -73,21 +80,24 @@ export class SlidingExpirationCacheWithCleanupTask<K, V> extends SlidingExpirati

async initCleanupTask(): Promise<void> {
this.isInitialized = true;
logger.debug(Messages.get("SlidingExpirationCacheWithCleanupTask.cleanUpTaskInitialized", this.cacheId));
while (!this.stopCleanupTask) {
const [sleepPromise, temp] = sleepWithAbort(
const [sleepPromise, abortSleepFunc] = sleepWithAbort(
convertNanosToMs(this._cleanupIntervalNanos),
Messages.get("SlidingExpirationCacheWithCleanupTask.cleanUpTaskInterrupted")
Messages.get("SlidingExpirationCacheWithCleanupTask.cleanUpTaskInterrupted", this.cacheId)
);
this.interruptCleanupTask = temp;
this.interruptCleanupTask = abortSleepFunc;
try {
await sleepPromise;
} catch (error) {
// Sleep has been interrupted, exit cleanup task.
logger.info(error.message);
logger.debug(error.message);
return;
}

logger.info(Messages.get("SlidingExpirationCacheWithCleanupTask.cleaningUp", convertNanosToMinutes(this._cleanupIntervalNanos).toString()));
logger.debug(
Messages.get("SlidingExpirationCacheWithCleanupTask.cleaningUp", convertNanosToMinutes(this._cleanupIntervalNanos).toString(), this.cacheId)
);

const itemsToRemove = [];
for (const [key, val] of this.map.entries()) {
Expand All @@ -102,5 +112,6 @@ export class SlidingExpirationCacheWithCleanupTask<K, V> extends SlidingExpirati
// Ignore.
}
}
logger.debug(Messages.get("SlidingExpirationCacheWithCleanupTask.cleanUpTaskStopped", this.cacheId));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,7 @@ describe("aurora read write splitting", () => {
await auroraTestUtility.queryInstanceId(client);
}).rejects.toThrow(FailoverFailedError);
await ProxyHelper.enableAllConnectivity();
await client.end();
await client.connect();
await TestEnvironment.verifyClusterStatus();

Expand Down
6 changes: 6 additions & 0 deletions tests/unit/sliding_expiration_cache.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ describe("test_sliding_expiration_cache", () => {
expect(result3).toEqual("b");
expect(target.get(1)).toEqual("b");
});

it("test remove", async () => {
const target = new SlidingExpirationCache(
BigInt(50_000_000),
Expand Down Expand Up @@ -100,6 +101,7 @@ describe("test_sliding_expiration_cache", () => {
expect(target.get("nonExpiredItem")).toEqual(nonExpiredItem);
expect(nonExpiredItem.disposed).toEqual(false);
});

it("test clear", async () => {
const target = new SlidingExpirationCache(
BigInt(50_000_000),
Expand All @@ -124,11 +126,13 @@ describe("test_sliding_expiration_cache", () => {
expect(item1.disposed).toEqual(true);
expect(item2.disposed).toEqual(true);
});

it("test async cleanup thread", async () => {
const cleanupIntervalNanos = BigInt(300_000_000); // .3 seconds
const disposeMs = 1000;
const target = new SlidingExpirationCacheWithCleanupTask(
cleanupIntervalNanos,
"slidingExpirationCache.test",
(item: AsyncDisposableItem) => item.shouldDispose,
async (item) => await item.dispose(disposeMs)
);
Expand Down Expand Up @@ -161,9 +165,11 @@ describe("test_sliding_expiration_cache", () => {
expect(target.get(2)).toEqual(undefined);
expect(item2.disposed).toEqual(true);
});

it("test async clear", async () => {
const target = new SlidingExpirationCacheWithCleanupTask(
BigInt(50_000_000),
"slidingExpirationCache.test",
(item: AsyncDisposableItem) => item.shouldDispose,
async (item) => await item.dispose(1000)
);
Expand Down

0 comments on commit f0c5c4b

Please sign in to comment.