Skip to content

Commit

Permalink
fix: reader failover connection attempt batch not cleaned up
Browse files Browse the repository at this point in the history
  • Loading branch information
crystall-bitquill committed Jan 30, 2025
1 parent 132fd5d commit 21f2ead
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 6 deletions.
17 changes: 13 additions & 4 deletions common/lib/plugins/failover/reader_failover_handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,15 @@ export class ClusterAwareReaderFailoverHandler implements ReaderFailoverHandler
tasks.push(secondTask.call());
}

return await Promise.any(tasks);
return await Promise.any(tasks).then((result) => {
if (!result.isConnected) {
firstTask.batchFailed = true;
if (secondTask) {
secondTask.batchFailed = true;
}
}
return result;
});
}

getReaderHostsByPriority(hosts: HostInfo[]): HostInfo[] {
Expand Down Expand Up @@ -285,6 +293,7 @@ class ConnectionAttemptTask {
taskId: number;
taskHandler: ReaderTaskSelectorHandler;
failoverTaskId: string;
batchFailed: boolean = false;

constructor(
initialConnectionProps: Map<string, any>,
Expand Down Expand Up @@ -317,11 +326,10 @@ class ConnectionAttemptTask {

this.pluginService.setAvailability(this.newHost.allAliases, HostAvailability.AVAILABLE);
logger.info(Messages.get("ClusterAwareReaderFailoverHandler.successfulReaderConnection", this.newHost.host));
if (this.taskHandler.getSelectedConnectionAttemptTask(this.failoverTaskId) === -1) {
if (!this.batchFailed && this.taskHandler.getSelectedConnectionAttemptTask(this.failoverTaskId) === -1) {
this.taskHandler.setSelectedConnectionAttemptTask(this.failoverTaskId, this.taskId);
return new ReaderFailoverResult(this.targetClient, this.newHost, true, undefined, this.taskId);
}
await this.pluginService.abortTargetClient(this.targetClient);
return new ReaderFailoverResult(null, null, false, undefined, this.taskId);
} catch (error) {
this.pluginService.setAvailability(this.newHost.allAliases, HostAvailability.NOT_AVAILABLE);
Expand All @@ -340,8 +348,9 @@ class ConnectionAttemptTask {
}

async performFinalCleanup() {
if (this.taskHandler.getSelectedConnectionAttemptTask(this.failoverTaskId) !== this.taskId) {
if (this.batchFailed || this.taskHandler.getSelectedConnectionAttemptTask(this.failoverTaskId) !== this.taskId) {
await this.pluginService.abortTargetClient(this.targetClient);
this.taskHandler.setSelectedConnectionAttemptTask(this.failoverTaskId, -1);
}
}
}
47 changes: 47 additions & 0 deletions tests/unit/reader_failover_handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import { ClientWrapper } from "../../common/lib/client_wrapper";
import { PgDatabaseDialect } from "../../pg/lib/dialect/pg_database_dialect";
import { NodePostgresDriverDialect } from "../../pg/lib/dialect/node_postgres_driver_dialect";
import { PgClientWrapper } from "../../common/lib/pg_client_wrapper";
import { sleep } from "../../common/lib/utils/utils";

const host1 = new HostInfo("writer", 1234, HostRole.WRITER);
const host2 = new HostInfo("reader1", 1234, HostRole.READER);
Expand Down Expand Up @@ -85,6 +86,52 @@ describe("reader failover handler", () => {
expect(result.newHost).toBe(hosts[successHostIndex]);
}, 30000);

it("test failover - batch failure", async () => {
const expectedClients: ClientWrapper[] = [];
const hosts = [...defaultHosts];
const successHostIndex = 4;

when(mockPluginService.getHosts()).thenReturn(hosts);
when(await mockPluginService.getHostRole(anything())).thenReturn(HostRole.READER);

for (let i = 0; i < hosts.length; i++) {
if (i !== 0 && i !== 2) {
const client = new PgClientWrapper({}, hosts[i], properties);
expectedClients.push(client);
when(mockPluginService.forceConnect(hosts[i], anything())).thenCall(async () => {
await sleep(100);
return client;
});
} else {
when(mockPluginService.forceConnect(hosts[i], anything())).thenCall(async () => {
await sleep(100);
throw new AwsWrapperError("Rejecting test");
});
when(mockPluginService.isNetworkError(anything())).thenReturn(true);
expectedClients.push(undefined);
}
}
const mockPluginServiceInstance = instance(mockPluginService);

const target = new ClusterAwareReaderFailoverHandler(
mockPluginServiceInstance,
properties,
ClusterAwareReaderFailoverHandler.DEFAULT_FAILOVER_TIMEOUT,
ClusterAwareReaderFailoverHandler.DEFAULT_READER_CONNECT_TIMEOUT,
false
);
const result = await target.getConnectionFromHostGroup(hosts);
expect(result.isConnected).toBe(true);
expect(result.client).toBe(expectedClients[successHostIndex]);
expect(result.newHost).toBe(hosts[successHostIndex]);
for (let i = 0; i < hosts.length - 1; i++) {
if (i !== 0 && i !== 2 && i !== successHostIndex) {
verify(mockPluginService.abortTargetClient(expectedClients[i])).once();
}
}
verify(mockPluginService.abortTargetClient(undefined)).twice();
}, 30000);

it("test failover timeout", async () => {
// original host list: [active writer, active reader, current connection (reader), active
// reader, down reader, active reader]
Expand Down
5 changes: 3 additions & 2 deletions tests/unit/writer_failover_handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import { WriterFailoverResult } from "../../common/lib/plugins/failover/writer_f
import { ClientWrapper } from "../../common/lib/client_wrapper";
import { PgDatabaseDialect } from "../../pg/lib/dialect/pg_database_dialect";
import { MySQLClientWrapper } from "../../common/lib/mysql_client_wrapper";
import { MySQL2DriverDialect } from "../../mysql/lib/dialect/mysql2_driver_dialect";

const builder = new HostInfoBuilder({ hostAvailabilityStrategy: new SimpleHostAvailabilityStrategy() });

Expand All @@ -46,10 +47,10 @@ const mockPluginService = mock(PluginService);
const mockReaderFailover = mock(ClusterAwareReaderFailoverHandler);

const mockTargetClient = { client: 123 };
const mockClientWrapper: ClientWrapper = new MySQLClientWrapper(mockTargetClient, builder.withHost("host").build(), new Map<string, any>());
const mockClientWrapper: ClientWrapper = new MySQLClientWrapper(mockTargetClient, builder.withHost("host").build(), new Map<string, any>(), new MySQL2DriverDialect());

const mockTargetClientB = { client: 456 };
const mockClientWrapperB: ClientWrapper = new MySQLClientWrapper(mockTargetClientB, builder.withHost("host").build(), new Map<string, any>());
const mockClientWrapperB: ClientWrapper = new MySQLClientWrapper(mockTargetClientB, builder.withHost("host").build(), new Map<string, any>(), new MySQL2DriverDialect());

describe("writer failover handler", () => {
beforeEach(() => {
Expand Down

0 comments on commit 21f2ead

Please sign in to comment.