Skip to content

Commit

Permalink
Update connect functions to return target client (#61)
Browse files Browse the repository at this point in the history
  • Loading branch information
leszek-bq authored May 31, 2024
1 parent 7c78843 commit ff2cae9
Show file tree
Hide file tree
Showing 24 changed files with 161 additions and 150 deletions.
2 changes: 1 addition & 1 deletion common/lib/aws_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ export abstract class AwsClient extends EventEmitter {
protected _schema: string = "";
protected _isolationLevel: number = 0;
private readonly _properties: Map<string, any>;
private _targetClient: any;
private _targetClient: any = null;
protected _errorHandler: ErrorHandler;
protected _createClientFunc?: (config: any) => any;
protected _connectFunc?: () => Promise<any>;
Expand Down
2 changes: 1 addition & 1 deletion common/lib/connection_provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import { HostInfo } from "./host_info";
import { PluginService } from "./plugin_service";

export interface ConnectionProvider {
connect<T>(hostInfo: HostInfo, pluginService: PluginService, props: Map<string, any>, connectFunc: () => Promise<T>): Promise<T>;
connect<T>(hostInfo: HostInfo, pluginService: PluginService, props: Map<string, any>): Promise<T>;
acceptsUrl(hostInfo: HostInfo, props: Map<string, any>): boolean;
acceptsStrategy(role: HostRole, strategy: string): boolean;
getHostInfoByStrategy(hosts: HostInfo[], role: HostRole, strategy: string, props?: Map<string, any>): HostInfo;
Expand Down
19 changes: 13 additions & 6 deletions common/lib/driver_connection_provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,14 @@ export class DriverConnectionProvider implements ConnectionProvider {
return true;
}

async connect<T>(hostInfo: HostInfo, pluginService: PluginService, props: Map<string, any>, connectFunc: () => Promise<T>): Promise<T> {
async connect<T>(hostInfo: HostInfo, pluginService: PluginService, props: Map<string, any>): Promise<T> {
let result;
try {
result = await connectFunc();
const targetClient: any = pluginService.createTargetClient(props);
const connFunc = pluginService.getConnectFunc(targetClient);
await connFunc();
result = targetClient;
} catch (e) {
await pluginService.tryClosingTargetClient();

if (!WrapperProperties.ENABLE_GREEN_NODE_REPLACEMENT.get(props)) {
throw e;
}
Expand Down Expand Up @@ -96,8 +97,14 @@ export class DriverConnectionProvider implements ConnectionProvider {

const newTargetClient = pluginService.createTargetClient(props);
const fixedConnFunc = pluginService.getConnectFunc(newTargetClient);
result = await fixedConnFunc();
await pluginService.setCurrentClient(newTargetClient, connectionHostInfo);
await fixedConnFunc();
result = newTargetClient;

// Note keeping this here temporarily for current functionality.
// TODO revisit - Follow the paths that the driver_connection_provider.connect is called from
// and make sure pluginService.tryClosingTargetClient() and pluginService.setCurrentClient are called appropriately.
// The driver_connection_provider should have no knowledge of setting/closing clients
await pluginService.tryClosingTargetClient();
}

return result;
Expand Down
18 changes: 12 additions & 6 deletions common/lib/plugin_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ class PluginChain<T> {
export class PluginManager {
private static readonly PLUGIN_CHAIN_CACHE = new Map<[string, HostInfo], PluginChain<any>>();
private static readonly ALL_METHODS: string = "*";
private static readonly CONNECT_METHOD = "connect";
private static readonly FORCE_CONNECT_METHOD = "forceConnect";
private static readonly NOTIFY_HOST_LIST_CHANGED_METHOD: string = "notifyHostListChanged";
private static readonly NOTIFY_CONNECTION_CHANGED_METHOD: string = "notifyConnectionChanged";
private static readonly ACCEPTS_STRATEGY_METHOD: string = "acceptsStrategy";
Expand Down Expand Up @@ -108,29 +110,33 @@ export class PluginManager {
);
}

connect<T>(hostInfo: HostInfo | null, props: Map<string, any>, isInitialConnection: boolean, methodFunc: () => Promise<T>): Promise<T> {
connect<T>(hostInfo: HostInfo | null, props: Map<string, any>, isInitialConnection: boolean): Promise<T> {
if (hostInfo == null) {
throw new AwsWrapperError("HostInfo was not provided.");
}
return this.executeWithSubscribedPlugins(
hostInfo,
props,
"connect",
PluginManager.CONNECT_METHOD,
(plugin, nextPluginFunc) => plugin.connect(hostInfo, props, isInitialConnection, nextPluginFunc),
methodFunc
async () => {
throw new AwsWrapperError("Shouldn't be called.");
}
);
}

forceConnect<T>(hostInfo: HostInfo | null, props: Map<string, any>, isInitialConnection: boolean, methodFunc: () => Promise<T>): Promise<T> {
forceConnect<T>(hostInfo: HostInfo | null, props: Map<string, any>, isInitialConnection: boolean): Promise<T> {
if (hostInfo == null) {
throw new AwsWrapperError("HostInfo was not provided.");
}
return this.executeWithSubscribedPlugins(
hostInfo,
props,
"forceConnect",
PluginManager.FORCE_CONNECT_METHOD,
(plugin, nextPluginFunc) => plugin.forceConnect(hostInfo, props, isInitialConnection, nextPluginFunc),
methodFunc
async () => {
throw new AwsWrapperError("Shouldn't be called.");
}
);
}

Expand Down
15 changes: 5 additions & 10 deletions common/lib/plugin_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -248,18 +248,12 @@ export class PluginService implements ErrorHandler, HostListProviderService {
throw new AwsWrapperError("AwsClient is missing create target client function."); // This should not be reached
}

connect<T>(hostInfo: HostInfo, props: Map<string, any>, connectFunc: () => Promise<T>) {
if (connectFunc) {
return this.pluginServiceManagerContainer.pluginManager?.connect(hostInfo, props, false, connectFunc);
}
throw new AwsWrapperError("AwsClient is missing target client connect function."); // This should not be reached
connect<T>(hostInfo: HostInfo, props: Map<string, any>): any {
return this.pluginServiceManagerContainer.pluginManager?.connect(hostInfo, props, false);
}

forceConnect<T>(hostInfo: HostInfo, props: Map<string, any>, connectFunc: () => Promise<T>) {
if (connectFunc) {
return this.pluginServiceManagerContainer.pluginManager?.forceConnect(hostInfo, props, false, connectFunc);
}
throw new AwsWrapperError("AwsClient is missing target client connect function."); // This should not be reached
forceConnect<T>(hostInfo: HostInfo, props: Map<string, any>): any {
return this.pluginServiceManagerContainer.pluginManager?.forceConnect(hostInfo, props, false);
}

async setCurrentClient(newClient: any, hostInfo: HostInfo): Promise<Set<HostChangeOptions>> {
Expand Down Expand Up @@ -298,6 +292,7 @@ export class PluginService implements ErrorHandler, HostListProviderService {
changes.has(HostChangeOptions.CONNECTION_OBJECT_CHANGED) &&
!pluginOpinions.has(OldConnectionSuggestionAction.PRESERVE) &&
(await oldClient.isValid());
// TODO: Review should tryClosingTargetClient(oldClient) be called here, or at some point in this setCurrentClient method?
}
} finally {
this.sessionStateService.complete();
Expand Down
17 changes: 7 additions & 10 deletions common/lib/plugins/default_plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ export class DefaultPlugin extends AbstractConnectionPlugin {
isInitialConnection: boolean,
forceConnectFunc: () => Promise<Type>
): Promise<Type> {
return await this.connectInternal(hostInfo, props, forceConnectFunc, this.defaultConnProvider);
return await this.connectInternal(hostInfo, props, this.defaultConnProvider);
}

override initHostProvider(
Expand Down Expand Up @@ -91,18 +91,15 @@ export class DefaultPlugin extends AbstractConnectionPlugin {
connProvider = this.connProviderManager.getConnectionProvider(hostInfo, props);
}

return this.connectInternal(hostInfo, props, connectFunc, connProvider);
return this.connectInternal(hostInfo, props, connProvider);
}

private async connectInternal<Type>(
hostInfo: HostInfo,
props: Map<string, any>,
connectFunc: () => Promise<Type>,
connProvider: ConnectionProvider
): Promise<Type> {
const result = await connProvider.connect(hostInfo, this.pluginService, props, connectFunc);
private async connectInternal<Type>(hostInfo: HostInfo, props: Map<string, any>, connProvider: ConnectionProvider): Promise<Type> {
const result: any = await connProvider.connect(hostInfo, this.pluginService, props);
this.pluginService.setAvailability(hostInfo.allAliases, HostAvailability.AVAILABLE);
await this.pluginService.updateDialect(this.pluginService.getCurrentClient().targetClient);
// TODO: review this probably should not be called here, but probably in pluginService.setCurrentClient
// as the pluginService.setCurrentClient has not been called yet
await this.pluginService.updateDialect(result);
return result;
}

Expand Down
13 changes: 8 additions & 5 deletions common/lib/plugins/failover/failover_plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ export class FailoverPlugin extends AbstractConnectionPlugin {
throw new AwsWrapperError("Host list provider service not found."); // this should not be reached
}

const result = await this._staleDnsHelper.getVerifiedConnection(
const targetClient = await this._staleDnsHelper.getVerifiedConnection(
hostInfo.host,
isInitialConnection,
this.hostListProviderService,
Expand All @@ -290,10 +290,11 @@ export class FailoverPlugin extends AbstractConnectionPlugin {
);

if (isInitialConnection) {
await this.pluginService.refreshHostList();
// Todo review: should this be called here or later when the target client is set in plugin service?
await this.pluginService.refreshHostList(targetClient);
}

return result;
return targetClient;
}

override async execute<T>(methodName: string, methodFunc: () => Promise<T>): Promise<T> {
Expand Down Expand Up @@ -500,9 +501,11 @@ export class FailoverPlugin extends AbstractConnectionPlugin {
private async createConnectionForHost(baseHostInfo: HostInfo) {
const props = new Map(this._properties);
props.set(WrapperProperties.HOST.name, baseHostInfo.host);
const client = this.pluginService.createTargetClient(props);

let client;
try {
await this.pluginService.connect(baseHostInfo, this._properties, this.pluginService.getDialect().getConnectFunc(client));
client = await this.pluginService.connect(baseHostInfo, props);
// TODO: review usage of the createConnectionForHost, should we be calling pluginService.setCurrentClient here?
await this.pluginService.setCurrentClient(client, baseHostInfo);
} catch (error) {
await this.pluginService.tryClosingTargetClient(client);
Expand Down
9 changes: 6 additions & 3 deletions common/lib/plugins/failover/reader_failover_handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -317,9 +317,7 @@ class ConnectionAttemptTask {
const copy = new Map(this.initialConnectionProps);
copy.set(WrapperProperties.HOST.name, this.newHost.host);
try {
this.targetClient = await this.pluginService.createTargetClient(copy);
const connectFunc = this.pluginService.getConnectFunc(this.targetClient);
await this.pluginService.forceConnect(this.newHost, this.initialConnectionProps, connectFunc);
this.targetClient = await this.pluginService.forceConnect(this.newHost, copy);
this.pluginService.setAvailability(this.newHost.allAliases, HostAvailability.AVAILABLE);
logger.info(Messages.get("ClusterAwareReaderFailoverHandler.successfulReaderConnection", this.newHost.host));
return new ReaderFailoverResult(this.targetClient, this.newHost, true);
Expand All @@ -338,6 +336,11 @@ class ConnectionAttemptTask {
}
}

// TODO review
// We probably don't need the class member this.targetClient
// it is only created in async call() above and if successful passed to
// ReaderFailoverResult and eventually may get it's way to pluginService.setCurrentClient
// In this case we should not be clearing it here. And again don't need to store it in class member.
async performFinalCleanup() {
await this.pluginService.tryClosingTargetClient(this.targetClient);
}
Expand Down
3 changes: 1 addition & 2 deletions common/lib/plugins/failover/reader_failover_result.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
limitations under the License.
*/

import { AwsClient } from "../../aws_client";
import { HostInfo } from "../../host_info";

export class ReaderFailoverResult {
Expand All @@ -30,7 +29,7 @@ export class ReaderFailoverResult {
this._exception = exception;
}

get client(): AwsClient | null {
get client(): any | null {
return this._client;
}

Expand Down
13 changes: 5 additions & 8 deletions common/lib/plugins/failover/writer_failover_handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import { HostInfo } from "../../host_info";
import { WriterFailoverResult } from "./writer_failover_result";
import { ClusterAwareReaderFailoverHandler } from "./reader_failover_handler";
import { AwsClient } from "../../aws_client";
import { PluginService } from "../../plugin_service";
import { HostAvailability } from "../../host_availability/host_availability";
import { AwsWrapperError } from "../../utils/errors";
Expand Down Expand Up @@ -183,7 +182,7 @@ class ReconnectToWriterHandlerTask {
originalWriterHost: HostInfo | null;
initialConnectionProps: Map<string, any>;
reconnectionWriterIntervalMs: number;
currentClient?: AwsClient;
currentClient?: any;
endTime: number;
failoverCompleted: boolean = false;
timeoutId: any = -1;
Expand Down Expand Up @@ -231,9 +230,7 @@ class ReconnectToWriterHandlerTask {
try {
const props = new Map(this.initialConnectionProps);
props.set(WrapperProperties.HOST.name, this.originalWriterHost.host);
this.currentClient = await this.pluginService.createTargetClient(props);
const connectFunc = this.pluginService.getConnectFunc(this.currentClient);
await this.pluginService.forceConnect(this.originalWriterHost, this.initialConnectionProps, connectFunc);
this.currentClient = await this.pluginService.forceConnect(this.originalWriterHost, props);
await this.pluginService.forceRefreshHostList(this.currentClient);
latestTopology = this.pluginService.getHosts();
} catch (error) {
Expand Down Expand Up @@ -421,11 +418,11 @@ class WaitForNewWriterHandlerTask {
// connect to the new writer
const props = new Map(this.initialConnectionProps);
props.set(WrapperProperties.HOST.name, writerCandidate.host);
const targetClient = await this.pluginService.createTargetClient(props);

let targetClient;
try {
this.pluginService.setAvailability(writerCandidate.allAliases, HostAvailability.AVAILABLE);
const connectFunc = this.pluginService.getConnectFunc(targetClient);
await this.pluginService.forceConnect(writerCandidate, props, connectFunc);
targetClient = await this.pluginService.forceConnect(writerCandidate, props);
await this.pluginService.tryClosingTargetClient(this.currentReaderTargetClient);
await this.pluginService.tryClosingTargetClient(this.currentClient);
this.currentClient = targetClient;
Expand Down
Loading

0 comments on commit ff2cae9

Please sign in to comment.