Skip to content

Commit

Permalink
feat: session state transfer
Browse files Browse the repository at this point in the history
  • Loading branch information
crystall-bitquill committed May 10, 2024
1 parent 77ea4d0 commit a6183c6
Show file tree
Hide file tree
Showing 21 changed files with 1,671 additions and 37 deletions.
26 changes: 24 additions & 2 deletions common/lib/aws_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ export abstract class AwsClient extends EventEmitter {
private _config: any;
protected isConnected: boolean = false;
protected _isReadOnly: boolean = false;
protected _isAutoCommit: boolean = true;
protected _catalog: string = "";
protected _schema: string = "";
protected _isolationLevel: number = 0;
private readonly _properties: Map<string, any>;
private _targetClient: any;
protected _errorHandler: ErrorHandler;
Expand Down Expand Up @@ -123,13 +127,31 @@ export abstract class AwsClient extends EventEmitter {

abstract executeQuery(props: Map<string, any>, sql: string): Promise<any>;

abstract setReadOnly(readOnly: boolean): Promise<any | void>;
abstract setReadOnly(readOnly: boolean, forceUpdate?: boolean): Promise<any | void>;

abstract isReadOnly(): boolean;

abstract setAutoCommit(autoCommit: boolean, forceUpdate?: boolean): Promise<any | void>;

abstract getAutoCommit(): boolean;

abstract setTransactionIsolation(transactionIsolation: number, forceUpdate?: boolean): Promise<any | void>;

abstract getTransactionIsolation(): number;

abstract setSchema(schema: any, forceUpdate?: boolean): Promise<any | void>;

abstract getSchema(): string;

abstract setCatalog(catalog: string, forceUpdate?: boolean): Promise<any | void>;

abstract getCatalog(): string;

abstract end(): Promise<any>;

abstract rollback(): void;
abstract rollback(): Promise<any>;

abstract resetState(): void;

async isValid(): Promise<boolean> {
if (!this.targetClient) {
Expand Down
5 changes: 5 additions & 0 deletions common/lib/database_dialect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,9 @@ export interface DatabaseDialect {
getDialectUpdateCandidates(): string[];
isDialect<T>(conn: T): boolean;
getHostListProvider(props: Map<string, any>, originalUrl: string, hostListProviderService: HostListProviderService): HostListProvider;
doesStatementSetReadOnly(statement: string): boolean | undefined;
doesStatementSetTransactionIsolation(statement: string): number | undefined;
doesStatementSetAutoCommit(statement: string): boolean | undefined;
doesStatementSetSchema(statement: string): string | undefined;
doesStatementSetCatalog(statement: string): string | undefined;
}
4 changes: 2 additions & 2 deletions common/lib/driver_connection_provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ export class DriverConnectionProvider implements ConnectionProvider {
const newTargetClient = pluginService.createTargetClient(props);
const fixedConnFunc = pluginService.getConnectFunc(newTargetClient);
result = await fixedConnFunc();
pluginService.setCurrentClient(newTargetClient, connectionHostInfo);
await pluginService.setCurrentClient(newTargetClient, connectionHostInfo);
}

return result;
Expand All @@ -106,7 +106,7 @@ export class DriverConnectionProvider implements ConnectionProvider {
getHostInfoByStrategy(hosts: HostInfo[], role: HostRole, strategy: string, props?: Map<string, any>): HostInfo {
const acceptedStrategy = DriverConnectionProvider.acceptedStrategies.get(strategy);
if (!acceptedStrategy) {
throw new AwsWrapperError(Messages.get("ConnectionProvider.unsupportedHostSpecSelectorStrategy", strategy, "DriverConnectionProvider")); // TODO
throw new AwsWrapperError(Messages.get("ConnectionProvider.unsupportedHostSpecSelectorStrategy", strategy, "DriverConnectionProvider"));
}
return acceptedStrategy.getHost(hosts, role, props);
}
Expand Down
125 changes: 113 additions & 12 deletions common/lib/plugin_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,27 @@ import { CacheMap } from "./utils/cache_map";
import { HostChangeOptions } from "./host_change_options";
import { HostRole } from "./host_role";
import { WrapperProperties } from "./wrapper_property";
import { SqlMethodUtils } from "./utils/sql_method_utils";
import { SessionStateService } from "./session_state_service";
import { SessionStateServiceImpl } from "./session_state_service_impl";

export class PluginService implements ErrorHandler, HostListProviderService {
private _currentHostInfo?: HostInfo;
private _currentClient: AwsClient;
private _hostListProvider?: HostListProvider;
private _initialConnectionHostInfo?: HostInfo;
private _isInTransaction: boolean = false;
private readonly _props: Map<string, any>;
private pluginServiceManagerContainer: PluginServiceManagerContainer;
private _props: Map<string, any>;
protected hosts: HostInfo[] = [];
protected readonly sessionStateService: SessionStateService;
protected static readonly hostAvailabilityExpiringCache: CacheMap<string, HostAvailability> = new CacheMap<string, HostAvailability>();

constructor(container: PluginServiceManagerContainer, client: AwsClient, props: Map<string, any>) {
this._currentClient = client;
this.pluginServiceManagerContainer = container;
this._props = props;
this.sessionStateService = new SessionStateServiceImpl(this, this._props);
container.pluginService = this;
}

Expand Down Expand Up @@ -236,9 +241,61 @@ export class PluginService implements ErrorHandler, HostListProviderService {
throw new AwsWrapperError("AwsClient is missing target client connect function."); // This should not be reached
}

setCurrentClient(newClient: any, hostInfo: HostInfo) {
this.getCurrentClient().targetClient = newClient;
this._currentHostInfo = hostInfo;
async setCurrentClient(newClient: any, hostInfo: HostInfo) {
if (this.getCurrentClient().targetClient === null) {
this.getCurrentClient().targetClient = newClient;
this._currentHostInfo = hostInfo;
this.sessionStateService.reset();
const changes = new Set<HostChangeOptions>([HostChangeOptions.INITIAL_CONNECTION]);

if (this.pluginServiceManagerContainer.pluginManager) {
// this.pluginServiceManagerContainer.pluginManager.notifyConnectionChanged(changes, null);
}

return changes;
} else {
if (this._currentHostInfo) {
const changes: Set<HostChangeOptions> = this.compare(this._currentHostInfo, hostInfo);

if (changes.size > 0) {
const oldClient: any = this.getCurrentClient().targetClient;
const isInTransaction = this.isInTransaction;
this.sessionStateService.begin();

try {
this.getCurrentClient().resetState();
this.getCurrentClient().targetClient = newClient;
this._currentHostInfo = hostInfo;
await this.sessionStateService.applyCurrentSessionState(this.getCurrentClient());
this.setInTransaction(false);

if (this.pluginServiceManagerContainer.pluginManager) {
// const pluginOpinions: Set<OldConnectionSuggestionAction> =
// await this.pluginServiceManagerContainer.pluginManager.notifyConnectionChanged(changes, null);

const shouldCloseConnection =
changes.has(HostChangeOptions.CONNECTION_OBJECT_CHANGED) &&
// !pluginOpinions.has(OldConnectionSuggestionAction.PRESERVE) &&
(await oldClient.isValid());

if (shouldCloseConnection) {
await this.sessionStateService.applyPristineSessionState(this.getCurrentClient());

try {
await this.tryClosingTargetClient(oldClient);
} catch (e: any) {
// ignore
}
}
}
} finally {
this.sessionStateService.complete();
}
}
return changes;
}
throw new AwsWrapperError("HostInfo not found"); // Should not be reached
}
}

async isClientValid(targetClient: any): Promise<boolean> {
Expand All @@ -255,17 +312,61 @@ export class PluginService implements ErrorHandler, HostListProviderService {
return this.getDialect().getConnectFunc(targetClient);
}

getSessionStateService() {
return this.sessionStateService;
}

async updateState(sql: string) {
this.updateInTransaction(sql);

const statements = SqlMethodUtils.parseMultiStatementQueries(sql);
await this.updateReadOnly(statements);
await this.updateAutoCommit(statements);
await this.updateCatalog(statements);
await this.updateSchema(statements);
await this.updateTransactionIsolation(statements);
}

updateInTransaction(sql: string) {
// TODO: revise with session state transfer
if (sql.toLowerCase().startsWith("start transaction") || sql.toLowerCase().startsWith("begin")) {
if (SqlMethodUtils.doesOpenTransaction(sql)) {
this.setInTransaction(true);
} else if (
sql.toLowerCase().startsWith("commit") ||
sql.toLowerCase().startsWith("rollback") ||
sql.toLowerCase().startsWith("end") ||
sql.toLowerCase().startsWith("abort")
) {
} else if (SqlMethodUtils.doesCloseTransaction(sql)) {
this.setInTransaction(false);
}
}

private async updateReadOnly(statements: string[]) {
const updateReadOnly = SqlMethodUtils.doesSetReadOnly(statements, this.getDialect());
if (updateReadOnly !== undefined) {
await this.getCurrentClient().setReadOnly(updateReadOnly);
}
}

private async updateAutoCommit(statements: string[]) {
const updateAutoCommit = SqlMethodUtils.doesSetAutoCommit(statements, this.getDialect());
if (updateAutoCommit !== undefined) {
await this.getCurrentClient().setAutoCommit(updateAutoCommit);
}
}

private async updateCatalog(statements: string[]) {
const updateCatalog = SqlMethodUtils.doesSetCatalog(statements, this.getDialect());
if (updateCatalog !== undefined) {
await this.getCurrentClient().setCatalog(updateCatalog);
}
}

private async updateSchema(statements: string[]) {
const updateSchema = SqlMethodUtils.doesSetSchema(statements, this.getDialect());
if (updateSchema !== undefined) {
await this.getCurrentClient().setSchema(updateSchema);
}
}

private async updateTransactionIsolation(statements: string[]) {
const updateTransactionIsolation = SqlMethodUtils.doesSetTransactionIsolation(statements, this.getDialect());
if (updateTransactionIsolation !== undefined) {
await this.getCurrentClient().setTransactionIsolation(updateTransactionIsolation);
}
}
}
9 changes: 4 additions & 5 deletions common/lib/plugins/failover/failover_plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ export class FailoverPlugin extends AbstractConnectionPlugin {

this.pluginService.getCurrentHostInfo()?.removeAlias(Array.from(oldAliases));
await this.pluginService.tryClosingTargetClient();
this.pluginService.setCurrentClient(result.client, result.newHost);
await this.pluginService.setCurrentClient(result.client, result.newHost);
await this.updateTopology(true);
}

Expand All @@ -363,7 +363,7 @@ export class FailoverPlugin extends AbstractConnectionPlugin {
}

await this.pluginService.tryClosingTargetClient();
this.pluginService.setCurrentClient(result.client, writerHostInfo);
await this.pluginService.setCurrentClient(result.client, writerHostInfo);
logger.debug(Messages.get("Failover.establishedConnection", this.pluginService.getCurrentHostInfo()?.host ?? ""));
await this.pluginService.refreshHostList();
}
Expand All @@ -377,8 +377,7 @@ export class FailoverPlugin extends AbstractConnectionPlugin {
if (this.pluginService.isInTransaction()) {
this._isInTransaction = this.pluginService.isInTransaction();
try {
// TODO: rollback not implemented
client.rollback();
await client.rollback();
} catch (error) {
// swallow this error
}
Expand Down Expand Up @@ -461,7 +460,7 @@ export class FailoverPlugin extends AbstractConnectionPlugin {
const client = this.pluginService.createTargetClient(props);
try {
await this.pluginService.connect(baseHostInfo, this._properties, this.pluginService.getDialect().getConnectFunc(client));
this.pluginService.setCurrentClient(client, baseHostInfo);
await this.pluginService.setCurrentClient(client, baseHostInfo);
} catch (error) {
await this.pluginService.tryClosingTargetClient(client);
throw error;
Expand Down
12 changes: 6 additions & 6 deletions common/lib/plugins/stale_dns_helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ export class StaleDnsHelper {
logger.debug(Messages.get("AuroraStaleDnsHelper.clusterEndpointDns", hostInetAddress));

if (!clusterInetAddress) {
this.pluginService.setCurrentClient(currentTargetClient, currentHostInfo);
await this.pluginService.setCurrentClient(currentTargetClient, currentHostInfo);
return result;
}

Expand All @@ -94,7 +94,7 @@ export class StaleDnsHelper {
if (!this.writerHostInfo) {
const writerCandidate = this.getWriter();
if (writerCandidate && this.rdsUtils.isRdsClusterDns(writerCandidate.host)) {
this.pluginService.setCurrentClient(currentTargetClient, currentHostInfo);
await this.pluginService.setCurrentClient(currentTargetClient, currentHostInfo);
return result;
}
this.writerHostInfo = writerCandidate;
Expand All @@ -103,7 +103,7 @@ export class StaleDnsHelper {
logger.debug(Messages.get("AuroraStaleDnsHelper.writerHostSpec", this.writerHostInfo?.host ?? ""));

if (!this.writerHostInfo) {
this.pluginService.setCurrentClient(currentTargetClient, currentHostInfo);
await this.pluginService.setCurrentClient(currentTargetClient, currentHostInfo);
return result;
}

Expand All @@ -119,7 +119,7 @@ export class StaleDnsHelper {
logger.debug(Messages.get("AuroraStaleDnsHelper.writerInetAddress", this.writerHostAddress));

if (!this.writerHostAddress) {
this.pluginService.setCurrentClient(currentTargetClient, currentHostInfo);
await this.pluginService.setCurrentClient(currentTargetClient, currentHostInfo);
return result;
}

Expand All @@ -133,7 +133,7 @@ export class StaleDnsHelper {
try {
result = await this.pluginService.connect(this.writerHostInfo, props, this.pluginService.getDialect().getConnectFunc(targetClient));
await this.pluginService.tryClosingTargetClient(currentTargetClient);
this.pluginService.setCurrentClient(targetClient, this.writerHostInfo);
await this.pluginService.setCurrentClient(targetClient, this.writerHostInfo);
return result;
} catch (error: any) {
await this.pluginService.tryClosingTargetClient(targetClient);
Expand All @@ -144,7 +144,7 @@ export class StaleDnsHelper {
}
}

this.pluginService.setCurrentClient(currentTargetClient, currentHostInfo);
await this.pluginService.setCurrentClient(currentTargetClient, currentHostInfo);
return result;
}

Expand Down
Loading

0 comments on commit a6183c6

Please sign in to comment.