Skip to content

Commit

Permalink
apply review suggestions
Browse files Browse the repository at this point in the history
  • Loading branch information
sophia-bq committed Feb 1, 2025
1 parent af933bb commit 31b702f
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import { AwsWrapperError } from "../../utils/errors";
import { Messages } from "../../utils/messages";
import { WrapperProperties } from "../../wrapper_property";
import { BlockingHostListProvider } from "../host_list_provider";
import { logger } from "../../../logutils";

export class MonitoringRdsHostListProvider extends RdsHostListProvider implements BlockingHostListProvider {
static readonly CACHE_CLEANUP_NANOS: bigint = BigInt(60_000_000_000); // 1 minute.
Expand Down Expand Up @@ -63,17 +64,12 @@ export class MonitoringRdsHostListProvider extends RdsHostListProvider implement
}

async queryForTopology(targetClient: ClientWrapper, dialect: DatabaseDialect): Promise<HostInfo[]> {
let monitor: ClusterTopologyMonitor = MonitoringRdsHostListProvider.monitors.get(
this.clusterId,
MonitoringRdsHostListProvider.MONITOR_EXPIRATION_NANOS
);
if (!monitor) {
monitor = this.initMonitor();
}
const monitor: ClusterTopologyMonitor = this.initMonitor();

try {
return await monitor.forceRefresh(targetClient, MonitoringRdsHostListProvider.DEFAULT_TOPOLOGY_QUERY_TIMEOUT_MS);
} catch {
} catch (error) {
logger.info(Messages.get("MonitoringHostListProvider.errorForceRefresh", error.message));
return null;
}
}
Expand All @@ -87,36 +83,31 @@ export class MonitoringRdsHostListProvider extends RdsHostListProvider implement
}

async forceMonitoringRefresh(shouldVerifyWriter: boolean, timeoutMs: number): Promise<HostInfo[]> {
let monitor: ClusterTopologyMonitor = MonitoringRdsHostListProvider.monitors.get(
this.clusterId,
MonitoringRdsHostListProvider.MONITOR_EXPIRATION_NANOS
);
if (!monitor) {
monitor = this.initMonitor();
}
const monitor: ClusterTopologyMonitor = this.initMonitor();

if (!monitor) {
throw new AwsWrapperError(Messages.get("MonitoringHostListProvider.requiresMonitor"));
}
return await monitor.forceMonitoringRefresh(shouldVerifyWriter, timeoutMs);
}

protected initMonitor(): ClusterTopologyMonitor {
const monitor = new ClusterTopologyMonitorImpl(
this.clusterId,
MonitoringRdsHostListProvider.topologyCache,
this.initialHost,
this.properties,
this.pluginService,
this,
WrapperProperties.CLUSTER_TOPOLOGY_REFRESH_RATE_MS.get(this.properties),
WrapperProperties.CLUSTER_TOPOLOGY_HIGH_REFRESH_RATE_MS.get(this.properties)
);

return MonitoringRdsHostListProvider.monitors.computeIfAbsent(
const monitor: ClusterTopologyMonitor = MonitoringRdsHostListProvider.monitors.computeIfAbsent(
this.clusterId,
(x) => monitor,
() =>
new ClusterTopologyMonitorImpl(
this.clusterId,
MonitoringRdsHostListProvider.topologyCache,
this.initialHost,
this.properties,
this.pluginService,
this,
WrapperProperties.CLUSTER_TOPOLOGY_REFRESH_RATE_MS.get(this.properties),
WrapperProperties.CLUSTER_TOPOLOGY_HIGH_REFRESH_RATE_MS.get(this.properties)
),
MonitoringRdsHostListProvider.MONITOR_EXPIRATION_NANOS
);

if (monitor === null) {
throw new AwsWrapperError(Messages.get("MonitoringHostListProvider.requiresMonitor"));
}
return monitor;
}
}
10 changes: 3 additions & 7 deletions common/lib/plugin_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ import { DatabaseDialectCodes } from "./database_dialect/database_dialect_codes"
import { getWriter } from "./utils/utils";
import { TelemetryFactory } from "./utils/telemetry/telemetry_factory";
import { DriverDialect } from "./driver_dialect/driver_dialect";
import { MonitoringRdsHostListProvider } from "./host_list_provider/monitoring/monitoring_host_list_provider";
import { TopologyAwareDatabaseDialect } from "./topology_aware_database_dialect";

export class PluginService implements ErrorHandler, HostListProviderService {
private readonly _currentClient: AwsClient;
Expand Down Expand Up @@ -179,14 +177,12 @@ export class PluginService implements ErrorHandler, HostListProviderService {
async forceMonitoringRefresh(shouldVerifyWriter: boolean, timeoutMs: number): Promise<boolean> {
const hostListProvider: HostListProvider = this.getHostListProvider();
if (!this.isBlockingHostListProvider(hostListProvider)) {
throw new AwsWrapperError(Messages.get("PluginService.requiredMonitoringRdsHostListProvider"), typeof hostListProvider);
logger.info(Messages.get("PluginService.requiredBlockingHostListProvider", typeof hostListProvider));
throw new AwsWrapperError(Messages.get("PluginService.requiredBlockingHostListProvider", typeof hostListProvider));
}

try {
const updatedHostList: HostInfo[] = await (hostListProvider as MonitoringRdsHostListProvider).forceMonitoringRefresh(
shouldVerifyWriter,
timeoutMs
);
const updatedHostList: HostInfo[] = await hostListProvider.forceMonitoringRefresh(shouldVerifyWriter, timeoutMs);
if (updatedHostList) {
if (updatedHostList !== this.hosts) {
this.updateHostAvailability(updatedHostList);
Expand Down
45 changes: 17 additions & 28 deletions common/lib/plugins/failover2/failover2_plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,28 +124,19 @@ export class Failover2Plugin extends AbstractConnectionPlugin implements CanRele
);
}

async updateTopology() {
const client = this.pluginService.getCurrentClient();
if (!this.isFailoverEnabled() || !(await client.isValid())) {
return;
}

await this.pluginService.refreshHostList();
}

override async connect(
hostInfo: HostInfo,
props: Map<string, any>,
isInitialConnection: boolean,
connectFunc: () => Promise<ClientWrapper>
): Promise<ClientWrapper> {
// Call was initiated by Failover2 Plugin, does not require additional processing.
if (props.has(Failover2Plugin.INTERNAL_CONNECT_PROPERTY_NAME)) {
return await this._staleDnsHelper.getVerifiedConnection(hostInfo.host, isInitialConnection, this.hostListProviderService!, props, connectFunc);
}

// Failover is not enabled, does not require additional processing.
if (!this.enableFailoverSetting || !WrapperProperties.ENABLE_CLUSTER_AWARE_FAILOVER.get(props)) {
if (
// Call was initiated by Failover2 Plugin, does not require additional processing.
props.has(Failover2Plugin.INTERNAL_CONNECT_PROPERTY_NAME) ||
// Failover is not enabled, does not require additional processing.
!this.enableFailoverSetting ||
!WrapperProperties.ENABLE_CLUSTER_AWARE_FAILOVER.get(props)
) {
return await this._staleDnsHelper.getVerifiedConnection(hostInfo.host, isInitialConnection, this.hostListProviderService!, props, connectFunc);
}

Expand Down Expand Up @@ -388,9 +379,7 @@ export class Failover2Plugin extends AbstractConnectionPlugin implements CanRele
await telemetryContext.start(async () => {
if (!(await this.pluginService.forceMonitoringRefresh(true, this.failoverTimeoutSettingMs))) {
// Unable to establish SQL connection to writer node.
this.failoverWriterFailedCounter.inc();
logger.error(Messages.get("Failover2.unableToFetchTopology"));
throw new FailoverFailedError(Messages.get("Failover.unableToConnectToWriter"));
this.logAndThrowError(Messages.get("Failover2.unableToFetchTopology"));
}

const hosts: HostInfo[] = this.pluginService.getHosts();
Expand All @@ -402,16 +391,12 @@ export class Failover2Plugin extends AbstractConnectionPlugin implements CanRele
try {
writerCandidateClient = await this.createConnectionForHost(writerCandidateHostInfo);
} catch (err) {
logger.error(Messages.get("Failover.unableToConnectToWriter"));
this.failoverWriterFailedCounter.inc();
throw new FailoverFailedError(Messages.get("Failover.unableToConnectToWriter"));
this.logAndThrowError("Failover.unableToConnectToWriter");
}
}

if (!writerCandidateClient) {
logger.error(Messages.get("Failover.unableToConnectToWriter"));
this.failoverWriterFailedCounter.inc();
throw new FailoverFailedError(Messages.get("Failover.unableToConnectToWriter"));
this.logAndThrowError("Failover.unableToConnectToWriter");
}

if ((await this.pluginService.getHostRole(writerCandidateClient)) !== HostRole.WRITER) {
Expand All @@ -420,9 +405,7 @@ export class Failover2Plugin extends AbstractConnectionPlugin implements CanRele
} catch (error) {
// Do nothing.
}
logger.error(Messages.get("Failover2.failoverWriterConnectedToReader"));
this.failoverWriterFailedCounter.inc();
throw new FailoverFailedError(Messages.get("Failover2.failoverWriterConnectedToReader"));
this.logAndThrowError(Messages.get("Failover2.failoverWriterConnectedToReader"));
}

await this.pluginService.abortCurrentClient();
Expand Down Expand Up @@ -491,6 +474,12 @@ export class Failover2Plugin extends AbstractConnectionPlugin implements CanRele
return false;
}

private logAndThrowError(errorMessage: string) {
logger.error(errorMessage);
this.failoverWriterFailedCounter.inc();
throw new FailoverFailedError(errorMessage);
}

async releaseResources(): Promise<void> {
await (this.pluginService.getHostListProvider() as MonitoringRdsHostListProvider).clearAll();
}
Expand Down
3 changes: 2 additions & 1 deletion common/lib/utils/locales/en.json
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,9 @@
"PluginService.failedToRetrieveHostPort": "Could not retrieve Host:Port for connection.",
"PluginService.nonEmptyAliases": "fillAliases called when HostInfo already contains the following aliases: '%s'.",
"PluginService.forceMonitoringRefreshTimeout": "A timeout exception occurred after waiting '%s' ms for refreshed topology.",
"PluginService.requiredMonitoringRdsHostListProvider": "The detected host list provider is not a MonitoringRdsHostListProvider. A MonitoringRdsHostListProvider is required to force refresh the host list. Detected host list provider: '%s'.",
"PluginService.requiredBlockingHostListProvider": "The detected host list provider is not a BlockingHostListProvider. A BlockingHostListProvider is required to force refresh the host list. Detected host list provider: '%s'.",
"MonitoringHostListProvider.requiresMonitor": "The MonitoringRdsHostListProvider could not retrieve or initialize a ClusterTopologyMonitor for refreshing the topology.",
"MonitoringHostListProvider.errorForceRefresh": "The MonitoringRdsHostListProvider could not refresh the topology, caught error: '%s'",
"HostMonitoringConnectionPlugin.activatedMonitoring": "Executing method '%s', monitoring is activated.",
"HostMonitoringConnectionPlugin.unableToIdentifyConnection": "Unable to identify the given connection: '%s', please ensure the correct host list provider is specified. The host list provider in use is: '%s'.",
"HostMonitoringConnectionPlugin.errorIdentifyingConnection": "Error occurred while identifying connection: '%s'.",
Expand Down
11 changes: 10 additions & 1 deletion pg/lib/dialect/aurora_pg_database_dialect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,16 @@ export class AuroraPgDatabaseDialect extends PgDatabaseDialect implements Topolo

async getWriterId(targetClient: ClientWrapper): Promise<string | null> {
const res = await targetClient.query(AuroraPgDatabaseDialect.IS_WRITER_QUERY);
return Promise.resolve(res.rows[0]["server_id"] ? res.rows[0]["server_id"] : null);
try {
const writerId: string = res.rows[0]["server_id"];
return writerId ? writerId : null;
} catch (e) {
if (e.message.includes("Cannot read properties of undefined")) {
// Query returned no result, targetClient is not connected to a writer.
return null;
}
throw e;
}
}

async isDialect(targetClient: ClientWrapper): Promise<boolean> {
Expand Down
36 changes: 0 additions & 36 deletions tests/unit/failover2_plugin.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,39 +73,6 @@ describe("reader failover handler", () => {
reset(mockRdsHostListProvider);
});

it("test update topology", async () => {
when(mockAwsClient.isValid()).thenResolve(true);

// Test updateTopology with failover disabled.
WrapperProperties.ENABLE_CLUSTER_AWARE_FAILOVER.set(properties, false);
initializePlugin(instance(mockPluginService));
await plugin.updateTopology();
verify(mockPluginService.forceRefreshHostList()).never();
verify(mockPluginService.refreshHostList()).never();

// Test updateTopology with no connection.
WrapperProperties.ENABLE_CLUSTER_AWARE_FAILOVER.set(properties, true);
initializePlugin(instance(mockPluginService));
when(mockPluginService.getCurrentHostInfo()).thenReturn(null);
await plugin.updateTopology();
verify(mockPluginService.forceRefreshHostList()).never();
verify(mockPluginService.refreshHostList()).never();

// Test updateTopology with closed connection.
when(mockAwsClient.isValid()).thenResolve(true);
await plugin.updateTopology();
verify(mockPluginService.forceRefreshHostList()).never();
verify(mockPluginService.refreshHostList()).never();

// Test with hosts.
when(mockPluginService.getHosts()).thenReturn([builder.withHost("host").build()]);

// Test updateTopology.
await plugin.updateTopology();
verify(mockPluginService.forceRefreshHostList()).never();
verify(mockPluginService.refreshHostList()).once();
});

it("test failover - failover reader", async () => {
when(mockPluginService.isInTransaction()).thenReturn(true);
initializePlugin(instance(mockPluginService));
Expand Down Expand Up @@ -153,9 +120,6 @@ describe("reader failover handler", () => {
initializePlugin(mockPluginServiceInstance);
plugin.initHostProvider(mockHostInfoInstance, properties, mockPluginServiceInstance, () => {});

const spyPlugin: Failover2Plugin = spy(plugin);
when(spyPlugin.updateTopology()).thenReturn();

await plugin.failoverReader();

verify(mockPluginService.setCurrentClient(mockClientWrapper, anything())).once();
Expand Down

0 comments on commit 31b702f

Please sign in to comment.