Skip to content

Commit

Permalink
connectFunc changes
Browse files Browse the repository at this point in the history
  • Loading branch information
joyc-bq committed May 11, 2024
1 parent da2bc0a commit d8079e5
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 28 deletions.
4 changes: 4 additions & 0 deletions common/lib/plugin_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,10 @@ export class PluginService implements ErrorHandler, HostListProviderService {
return pluginManager?.getHostInfoByStrategy(role, strategy);
}

async getHostRole(client: AwsClient): Promise<HostRole> {
return await this._hostListProvider?.getHostRole(client) ?? HostRole.UNKNOWN;
}

connect<T>(hostInfo: HostInfo, props: Map<string, any>, connectFunc: () => Promise<T>) {
if (connectFunc) {
return this.pluginServiceManagerContainer.pluginManager?.connect(hostInfo, props, false, connectFunc);
Expand Down
59 changes: 31 additions & 28 deletions common/lib/plugins/read_write_splitting.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ export class ReadWriteSplittingPlugin extends AbstractConnectionPlugin {
id: string = uniqueId("_readWriteSplittingPlugin");
_hostListProviderService: HostListProviderService | undefined;
private pluginService: PluginService;
_writerClient?: AwsClient | undefined;
_readerClient?: AwsClient | undefined;
_writerClient?: any | undefined;
_readerClient?: any | undefined;
private readonly _properties: Map<string, any>;
private _readerHostInfo?: HostInfo = undefined;
private _inReadWriteSplit = false;
Expand Down Expand Up @@ -85,8 +85,8 @@ export class ReadWriteSplittingPlugin extends AbstractConnectionPlugin {
}

updateInternalConnectionInfo(): void {
const currentHost = this.pluginService.getCurrentHostInfo();
const currentClient = this.pluginService.getCurrentClient();
const currentHost = this.pluginService.getCurrentHostInfo();
if (currentHost === null || currentClient === null) {
return;
}
Expand Down Expand Up @@ -116,34 +116,32 @@ export class ReadWriteSplittingPlugin extends AbstractConnectionPlugin {
isInitialConnection: boolean,
connectFunc: () => Promise<T>
): Promise<T> {
const currentClient = await connectFunc();
const result = await connectFunc();
if (!isInitialConnection || this._hostListProviderService?.isStaticHostListProvider()) {
return currentClient;
return result;
}
const currentClient = this.pluginService.getCurrentClient();
const currentRole = await this.pluginService.getHostRole(currentClient);

const currentHostInfo = this.pluginService.getCurrentHostInfo();
if (!currentHostInfo) {
throw new AwsWrapperError("Could not find current hostInfo");
}
const currentRole = currentHostInfo.role;
if (currentRole === undefined || currentRole === HostRole.UNKNOWN) {
this.logAndThrowError(Messages.get("ReadWriteSplittingPlugin.errorVerifyingInitialHostSpecRole"));
}
const currentHost = this.pluginService.getInitialConnectionHostInfo();
if (currentHost != null) {
const currentHost: HostInfo | null = this.pluginService.getInitialConnectionHostInfo();
if (currentHost !== null) {
if (currentRole === currentHost.role) {
return currentClient;
return result;
}
const updatedHost = Object.assign({}, currentHost);
updatedHost.role = currentRole;
const updatedHost: HostInfo = Object.assign({}, currentHost);
updatedHost.role = currentRole ?? HostRole.UNKNOWN;
this._hostListProviderService?.setInitialConnectionHostInfo(updatedHost);
}
return currentClient;
return result;
}

override async execute<T>(methodName: string, executeFunc: () => Promise<T>, methodArgs: any[]): Promise<T> {
override async execute<T>(methodName: string, executeFunc: () => Promise<T>, methodArgs: any): Promise<T> {
if (methodName === ReadWriteSplittingPlugin.READ_ONLY) {
await this.switchConnectionIfRequired(methodArgs[0]);
const readOnly: boolean = (methodArgs.includes("SET SESSION TRANSACTION READ ONLY") ? true : false)
await this.switchConnectionIfRequired(readOnly);
}
try {
return await executeFunc();
Expand All @@ -158,12 +156,12 @@ export class ReadWriteSplittingPlugin extends AbstractConnectionPlugin {
}
}

setWriterClient(writerClient: AwsClient, writerHostInfo: HostInfo): void {
setWriterClient(writerClient: any, writerHostInfo: HostInfo): void {
this._writerClient = writerClient;
logger.debug(Messages.get("ReadWriteSplittingPlugin.setWriterClient", writerHostInfo.url));
}

setReaderConnection(readerClient: AwsClient, readerHost: HostInfo): void {
setReaderConnection(readerClient: any, readerHost: HostInfo): void {
this._readerClient = readerClient;
this._readerHostInfo = readerHost;
logger.debug(Messages.get("ReadWriteSplittingPlugin.setReaderConnection", readerHost.url));
Expand All @@ -173,10 +171,14 @@ export class ReadWriteSplittingPlugin extends AbstractConnectionPlugin {
const props = new Map(this._properties);
props.set("host", writerHost.host);
const client = this.pluginService.createTargetClient(props);
await this.pluginService.connect(writerHost, props, this.pluginService.getDialect().getConnectFunc(client));
this.setWriterClient(client, writerHost);
this.switchCurrentClientTo(client, writerHost);

try {
await this.pluginService.connect(writerHost, props, this.pluginService.getDialect().getConnectFunc(client));
this.setWriterClient(client, writerHost);
this.switchCurrentClientTo(client, writerHost);
} catch (any) {
logger.warn(Messages.get("ReadWriteSplittingPlugin.failedToConnectToWriter", writerHost.url));
await this.pluginService.tryClosingTargetClient(client);
}
}

async switchConnectionIfRequired(readOnly: boolean) {
Expand Down Expand Up @@ -210,7 +212,6 @@ export class ReadWriteSplittingPlugin extends AbstractConnectionPlugin {
logger.warn("ReadWriteSplittingPlugin.fallbackToWriter", currentHost.url);
}
}

} else if (currentHost.role != HostRole.WRITER) {
if (this.pluginService.isInTransaction()) {
this.logAndThrowError(Messages.get("ReadWriteSplittingPlugin.setReadOnlyFalseInTransaction"));
Expand Down Expand Up @@ -250,15 +251,17 @@ export class ReadWriteSplittingPlugin extends AbstractConnectionPlugin {
for (let i = 0; i < connectAttempts; i++) {
const host = this.pluginService.getHostInfoByStrategy(HostRole.READER, this.readerSelectorStrategy);
if (host) {
const props = new Map(this._properties);
props.set("host", host.host);
client = this.pluginService.createTargetClient(props);

try {
const props = new Map(this._properties);
props.set("host", host.host);
client = this.pluginService.createTargetClient(props);
await this.pluginService.connect(host, props, this.pluginService.getDialect().getConnectFunc(client));
readerHost = host;
break;
} catch (any) {
logger.warn(Messages.get("ReadWriteSplittingPlugin.failedToConnectToReader", host.url));
await this.pluginService.tryClosingTargetClient(client);
}
}
}
Expand Down
1 change: 1 addition & 0 deletions common/lib/utils/locales/en.json
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
"ReadWriteSplittingPlugin.exceptionWhileExecutingCommand": "Detected an exception while executing a command: %s",
"ReadWriteSplittingPlugin.executingAgainstOldConnection": "Executing method against old connection: %s",
"ReadWriteSplittingPlugin.failedToConnectToReader": "Failed to connect to reader host: %s",
"ReadWriteSplittingPlugin.failedToConnectToWriter": "Failed to connect to writer host: %s",
"ReadWriteSplittingPlugin.failoverExceptionWhileExecutingCommand": "Detected a failover exception while executing a command: %s",
"ReadWriteSplittingPlugin.fallbackToWriter": "Failed to switch to a reader; the current writer will be used as a fallback: %s",
"ReadWriteSplittingPlugin.noReadersAvailable": "The plugin was unable to establish a reader connection to any reader instance.",
Expand Down

0 comments on commit d8079e5

Please sign in to comment.