Skip to content

Commit

Permalink
feat: notification pipelines framework (#25)
Browse files Browse the repository at this point in the history
  • Loading branch information
jasonlamz authored May 15, 2024
1 parent 12d8213 commit 9abcfbe
Show file tree
Hide file tree
Showing 9 changed files with 251 additions and 39 deletions.
2 changes: 1 addition & 1 deletion common/lib/abstract_connection_plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ export abstract class AbstractConnectionPlugin implements ConnectionPlugin {
return OldConnectionSuggestionAction.NO_OPINION;
}

notifyNodeListChanged(changes: Map<string, Set<HostChangeOptions>>): void {}
notifyHostListChanged(changes: Map<string, Set<HostChangeOptions>>): void {}

acceptsStrategy(role: HostRole, strategy: string): boolean {
return false;
Expand Down
2 changes: 1 addition & 1 deletion common/lib/connection_plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ export interface ConnectionPlugin {

notifyConnectionChanged(changes: Set<HostChangeOptions>): OldConnectionSuggestionAction;

notifyNodeListChanged(changes: Map<string, Set<HostChangeOptions>>): void;
notifyHostListChanged(changes: Map<string, Set<HostChangeOptions>>): void;

acceptsStrategy(role: HostRole, strategy: string): boolean;

Expand Down
49 changes: 49 additions & 0 deletions common/lib/plugin_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import { PluginServiceManagerContainer } from "./plugin_service_manager_containe
import { HostListProviderService } from "./host_list_provider_service";
import { AwsClient } from "./aws_client";
import { PluginService } from "./plugin_service";
import { HostChangeOptions } from "./host_change_options";
import { OldConnectionSuggestionAction } from "./old_connection_suggestion_action";
import { HostRole } from "./host_role";
import { ConnectionProvider } from "./connection_provider";

Expand Down Expand Up @@ -64,6 +66,8 @@ class PluginChain<T> {
export class PluginManager {
private static readonly PLUGIN_CHAIN_CACHE = new Map<[string, HostInfo], PluginChain<any>>();
private static readonly ALL_METHODS: string = "*";
private static readonly NOTIFY_HOST_LIST_CHANGED_METHOD: string = "notifyHostListChanged";
private static readonly NOTIFY_CONNECTION_CHANGED_METHOD: string = "notifyConnectionChanged";
private static readonly ACCEPTS_STRATEGY_METHOD: string = "acceptsStrategy";
private static readonly GET_HOST_INFO_BY_STRATEGY_METHOD: string = "getHostInfoByStrategy";
private readonly _plugins: ConnectionPlugin[] = [];
Expand Down Expand Up @@ -170,6 +174,51 @@ export class PluginManager {
);
}

protected async notifySubscribedPlugins(
methodName: string,
pluginFunc: PluginFunc<void>,
skipNotificationForThisPlugin: ConnectionPlugin | null
): Promise<void> {
if (pluginFunc === null) {
throw new AwsWrapperError("pluginFunc not found.");
}
for (const plugin of this._plugins) {
if (plugin === skipNotificationForThisPlugin) {
continue;
}
if (plugin.getSubscribedMethods().has(PluginManager.ALL_METHODS) || plugin.getSubscribedMethods().has(methodName)) {
await pluginFunc(plugin, () => Promise.resolve());
}
}
}

async notifyConnectionChanged(
changes: Set<HostChangeOptions>,
skipNotificationForThisPlugin: ConnectionPlugin | null
): Promise<Set<OldConnectionSuggestionAction>> {
const result = new Set<OldConnectionSuggestionAction>();
await this.notifySubscribedPlugins(
PluginManager.NOTIFY_CONNECTION_CHANGED_METHOD,
(plugin, func) => {
result.add(plugin.notifyConnectionChanged(changes));
return Promise.resolve();
},
skipNotificationForThisPlugin
);
return result;
}

async notifyHostListChanged(changes: Map<string, Set<HostChangeOptions>>): Promise<void> {
await this.notifySubscribedPlugins(
PluginManager.NOTIFY_HOST_LIST_CHANGED_METHOD,
(plugin, func) => {
plugin.notifyHostListChanged(changes);
return Promise.resolve();
},
null
);
}

acceptsStrategy(role: HostRole, strategy: string) {
for (const plugin of this._plugins) {
const pluginSubscribedMethods = plugin.getSubscribedMethods();
Expand Down
51 changes: 46 additions & 5 deletions common/lib/plugin_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ import { CacheMap } from "./utils/cache_map";
import { HostChangeOptions } from "./host_change_options";
import { HostRole } from "./host_role";
import { WrapperProperties } from "./wrapper_property";
import { PluginManager } from "./plugin_manager";
import { OldConnectionSuggestionAction } from "./old_connection_suggestion_action";
import { logger } from "../logutils";

export class PluginService implements ErrorHandler, HostListProviderService {
private _currentHostInfo?: HostInfo;
Expand Down Expand Up @@ -198,8 +201,11 @@ export class PluginService implements ErrorHandler, HostListProviderService {

if (changes.size > 0) {
this.hosts = newHosts ? newHosts : [];
// TODO: uncomment once notifyHostListChanged is implemented.
// this.pluginServiceManagerContainer.pluginManager.notifyHostListChanged(changes);
if (this.pluginServiceManagerContainer.pluginManager) {
this.pluginServiceManagerContainer.pluginManager.notifyHostListChanged(changes);
} else {
throw new AwsWrapperError("Connection Plugin Manager was not detected."); // This should not be reached
}
}
}

Expand Down Expand Up @@ -236,9 +242,44 @@ export class PluginService implements ErrorHandler, HostListProviderService {
throw new AwsWrapperError("AwsClient is missing target client connect function."); // This should not be reached
}

setCurrentClient(newClient: any, hostInfo: HostInfo) {
this.getCurrentClient().targetClient = newClient;
this._currentHostInfo = hostInfo;
// TODO: Add session state changes
async setCurrentClient(newClient: any, hostInfo: HostInfo): Promise<Set<HostChangeOptions>> {
if (this.getCurrentClient().targetClient === null) {
this.getCurrentClient().targetClient = newClient;
this._currentHostInfo = hostInfo;
const changes = new Set<HostChangeOptions>([HostChangeOptions.INITIAL_CONNECTION]);
if (this.pluginServiceManagerContainer.pluginManager) {
this.pluginServiceManagerContainer.pluginManager.notifyConnectionChanged(changes, null);
}
return changes;
} else {
if (this._currentHostInfo) {
const changes: Set<HostChangeOptions> = this.compare(this._currentHostInfo, hostInfo);
if (changes.size > 0) {
const oldClient: any = this.getCurrentClient().targetClient;
const isInTransaction = this.isInTransaction;
try {
this.getCurrentClient().targetClient = newClient;
this._currentHostInfo = hostInfo;
this.setInTransaction(false);

if (this.pluginServiceManagerContainer.pluginManager) {
const pluginOpinions: Set<OldConnectionSuggestionAction> =
await this.pluginServiceManagerContainer.pluginManager.notifyConnectionChanged(changes, null);

const shouldCloseConnection =
changes.has(HostChangeOptions.CONNECTION_OBJECT_CHANGED) &&
!pluginOpinions.has(OldConnectionSuggestionAction.PRESERVE) &&
oldClient.isValid();
}
} finally {
/* empty */
}
}
return changes;
}
throw new AwsWrapperError("HostInfo not found"); // Should not be reached
}
}

async isClientValid(targetClient: any): Promise<boolean> {
Expand Down
10 changes: 10 additions & 0 deletions common/lib/plugins/default_plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import { Messages } from "../utils/messages";
import { HostListProviderService } from "../host_list_provider_service";
import { HostInfo } from "../host_info";
import { AbstractConnectionPlugin } from "../abstract_connection_plugin";
import { HostChangeOptions } from "../host_change_options";
import { OldConnectionSuggestionAction } from "../old_connection_suggestion_action";
import { HostRole } from "../host_role";
import { PluginService } from "../plugin_service";
import { ConnectionProviderManager } from "../connection_provider_manager";
Expand Down Expand Up @@ -109,6 +111,14 @@ export class DefaultPlugin extends AbstractConnectionPlugin {
return await methodFunc();
}

override notifyConnectionChanged(changes: Set<HostChangeOptions>): OldConnectionSuggestionAction {
return OldConnectionSuggestionAction.NO_OPINION;
}

override notifyHostListChanged(changes: Map<string, Set<HostChangeOptions>>): void {
// do nothing
}

override acceptsStrategy(role: HostRole, strategy: string): boolean {
if (role === HostRole.UNKNOWN) {
// Users must request either a writer or a reader role.
Expand Down
55 changes: 49 additions & 6 deletions common/lib/plugins/failover/failover_plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ export class FailoverPlugin extends AbstractConnectionPlugin {
"forceConnect",
"query",
"notifyConnectionChanged",
"notifyNodeListChanged"
"notifyHostListChanged"
]);
private readonly _staleDnsHelper: StaleDnsHelper;
private readonly _properties: Map<string, any>;
Expand Down Expand Up @@ -140,10 +140,53 @@ export class FailoverPlugin extends AbstractConnectionPlugin {
}

override notifyConnectionChanged(changes: Set<HostChangeOptions>): OldConnectionSuggestionAction {
throw OldConnectionSuggestionAction.NO_OPINION;
return OldConnectionSuggestionAction.NO_OPINION;
}

override notifyNodeListChanged(changes: Map<string, Set<HostChangeOptions>>): void {}
override notifyHostListChanged(changes: Map<string, Set<HostChangeOptions>>): void {
if (!this.enableFailoverSetting) {
return;
}

// Log changes
if (logger.level === "debug") {
let str = "Changes:";
for (const [key, values] of changes.entries()) {
str = str.concat("\n");
// Convert from int back into enum
const valStr = Array.from(values)
.map((x) => HostChangeOptions[x])
.join(", ");
str = str.concat(`\tHost '${key}': ${valStr}`);
}
logger.debug(str);
}

const currentHost = this.pluginService.getCurrentHostInfo();
if (currentHost) {
const url = currentHost.url;
if (this.isHostStillValid(url, changes)) {
return;
}

for (const alias of currentHost.allAliases) {
if (this.isHostStillValid(alias + "/", changes)) {
return;
}
}
}
logger.info(Messages.get("Failover.invalidNode"), currentHost);
}

private isHostStillValid(host: string, changes: Map<string, Set<HostChangeOptions>>): boolean {
if (changes.has(host)) {
const options = changes.get(host);
if (options) {
return !options.has(HostChangeOptions.HOST_DELETED) && !options.has(HostChangeOptions.WENT_DOWN);
}
}
return true;
}

isFailoverEnabled(): boolean {
return (
Expand Down Expand Up @@ -336,7 +379,7 @@ export class FailoverPlugin extends AbstractConnectionPlugin {

this.pluginService.getCurrentHostInfo()?.removeAlias(Array.from(oldAliases));
await this.pluginService.tryClosingTargetClient();
this.pluginService.setCurrentClient(result.client, result.newHost);
await this.pluginService.setCurrentClient(result.client, result.newHost);
await this.updateTopology(true);
}

Expand All @@ -363,7 +406,7 @@ export class FailoverPlugin extends AbstractConnectionPlugin {
}

await this.pluginService.tryClosingTargetClient();
this.pluginService.setCurrentClient(result.client, writerHostInfo);
await this.pluginService.setCurrentClient(result.client, writerHostInfo);
logger.debug(Messages.get("Failover.establishedConnection", this.pluginService.getCurrentHostInfo()?.host ?? ""));
await this.pluginService.refreshHostList();
}
Expand Down Expand Up @@ -461,7 +504,7 @@ export class FailoverPlugin extends AbstractConnectionPlugin {
const client = this.pluginService.createTargetClient(props);
try {
await this.pluginService.connect(baseHostInfo, this._properties, this.pluginService.getDialect().getConnectFunc(client));
this.pluginService.setCurrentClient(client, baseHostInfo);
await this.pluginService.setCurrentClient(client, baseHostInfo);
} catch (error) {
await this.pluginService.tryClosingTargetClient(client);
throw error;
Expand Down
12 changes: 6 additions & 6 deletions common/lib/plugins/stale_dns/stale_dns_helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ export class StaleDnsHelper {
logger.debug(Messages.get("StaleDnsHelper.clusterEndpointDns", hostInetAddress));

if (!clusterInetAddress) {
this.pluginService.setCurrentClient(currentTargetClient, currentHostInfo);
await this.pluginService.setCurrentClient(currentTargetClient, currentHostInfo);
return result;
}

Expand All @@ -95,7 +95,7 @@ export class StaleDnsHelper {
if (!this.writerHostInfo) {
const writerCandidate = this.getWriter();
if (writerCandidate && this.rdsUtils.isRdsClusterDns(writerCandidate.host)) {
this.pluginService.setCurrentClient(currentTargetClient, currentHostInfo);
await this.pluginService.setCurrentClient(currentTargetClient, currentHostInfo);
return result;
}
this.writerHostInfo = writerCandidate;
Expand All @@ -104,7 +104,7 @@ export class StaleDnsHelper {
logger.debug(Messages.get("StaleDnsHelper.writerHostInfo", this.writerHostInfo?.host ?? ""));

if (!this.writerHostInfo) {
this.pluginService.setCurrentClient(currentTargetClient, currentHostInfo);
await this.pluginService.setCurrentClient(currentTargetClient, currentHostInfo);
return result;
}

Expand All @@ -120,7 +120,7 @@ export class StaleDnsHelper {
logger.debug(Messages.get("StaleDnsHelper.writerInetAddress", this.writerHostAddress));

if (!this.writerHostAddress) {
this.pluginService.setCurrentClient(currentTargetClient, currentHostInfo);
await this.pluginService.setCurrentClient(currentTargetClient, currentHostInfo);
return result;
}

Expand All @@ -133,7 +133,7 @@ export class StaleDnsHelper {
try {
result = await this.pluginService.connect(this.writerHostInfo, props, this.pluginService.getDialect().getConnectFunc(targetClient));
await this.pluginService.tryClosingTargetClient(currentTargetClient);
this.pluginService.setCurrentClient(targetClient, this.writerHostInfo);
await this.pluginService.setCurrentClient(targetClient, this.writerHostInfo);
return result;
} catch (error: any) {
await this.pluginService.tryClosingTargetClient(targetClient);
Expand All @@ -144,7 +144,7 @@ export class StaleDnsHelper {
}
}

this.pluginService.setCurrentClient(currentTargetClient, currentHostInfo);
await this.pluginService.setCurrentClient(currentTargetClient, currentHostInfo);
return result;
}

Expand Down
39 changes: 19 additions & 20 deletions tests/unit/failover_plugin.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import { WrapperProperties } from "aws-wrapper-common-lib/lib/wrapper_property";
import { AwsMySQLClient } from "../../mysql";
import { anything, instance, mock, reset, resetCalls, spy, verify, when } from "ts-mockito";
import { Messages } from "aws-wrapper-common-lib/lib/utils/messages";
import { HostChangeOptions } from "aws-wrapper-common-lib/lib/host_change_options";

const builder = new HostInfoBuilder({ hostAvailabilityStrategy: new SimpleHostAvailabilityStrategy() });

Expand Down Expand Up @@ -97,32 +98,30 @@ describe("reader failover handler", () => {
reset(mockWriterResult);
});

// // TODO: enable when notifyNodeListChanged is implemented
// it("test notify list changed with failover disabled", async () => {
// properties.set(WrapperProperties.ENABLE_CLUSTER_AWARE_FAILOVER.name, false);
// const changes: Map<string, Set<NodeChangeOptions>> = new Map();
it("test notify list changed with failover disabled", async () => {
properties.set(WrapperProperties.ENABLE_CLUSTER_AWARE_FAILOVER.name, false);
const changes: Map<string, Set<HostChangeOptions>> = new Map();

// initializePlugin();
// plugin.notifyNodeListChanged(changes);
initializePlugin(instance(mockPluginService));
plugin.notifyHostListChanged(changes);

// verify(mockPluginService.getCurrentHostInfo()).never();
// });
verify(mockPluginService.getCurrentHostInfo()).never();
});

// // TODO: enable when notifyNodeListChanged is implemented
// it("test notify list changed with valid connection not in topology", async () => {
// const changes: Map<string, Set<NodeChangeOptions>> = new Map();
// changes.set("cluster-host/", new Set<NodeChangeOptions>([NodeChangeOptions.NODE_DELETED]));
// changes.set("instance/", new Set<NodeChangeOptions>([NodeChangeOptions.NODE_ADDED]));
it("test notify list changed with valid connection not in topology", async () => {
const changes: Map<string, Set<HostChangeOptions>> = new Map();
changes.set("cluster-host/", new Set<HostChangeOptions>([HostChangeOptions.HOST_DELETED]));
changes.set("instance/", new Set<HostChangeOptions>([HostChangeOptions.HOST_ADDED]));

// initializePlugin();
// plugin.notifyNodeListChanged(changes);
initializePlugin(instance(mockPluginService));
plugin.notifyHostListChanged(changes);

// when(mockHostInfo.url).thenReturn("cluster-url/");
// when(mockHostInfo.allAliases).thenReturn(new Set<string>(["instance"]));
when(mockHostInfo.url).thenReturn("cluster-url/");
when(mockHostInfo.allAliases).thenReturn(new Set<string>(["instance"]));

// verify(mockPluginService.getCurrentHostInfo()).once();
// verify(mockHostInfo.allAliases).never();
// });
verify(mockPluginService.getCurrentHostInfo()).once();
verify(mockHostInfo.allAliases).never();
});

it("test update topology", async () => {
when(mockAwsClient.isValid()).thenResolve(true);
Expand Down
Loading

0 comments on commit 9abcfbe

Please sign in to comment.