Skip to content

Commit

Permalink
feat: Bin log status and replication lag waiting system (#62)
Browse files Browse the repository at this point in the history
* feat: Bin log status and replication lag waiting system

* MySQLDatabase test

* determine if the connection is master or replication via query rather than readOnly status

* chore: clean up obsolete code and comments
  • Loading branch information
breautek authored Jun 13, 2024
1 parent 612e2c2 commit 9c421ed
Show file tree
Hide file tree
Showing 17 changed files with 499 additions and 10 deletions.
62 changes: 62 additions & 0 deletions spec/database/ConnectionReplicationWaiter.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@

import {MockApplication} from '../support/TestApplication';
import {ConnectionReplicationWaiter} from '../../src/private/ConnectionReplicationWaiter';
import {MockConnection} from '../support/MockConnection';
import { TimeoutError } from '../../src/TimeoutError';

describe('ConnectionReplicationWaiter', () => {
let app: MockApplication = null;

beforeAll(async () => {
process.argv = [];
app = new MockApplication();
await app.start();
});

afterAll(async () => {
await app.close();
});

it('will skip on master connections', async () => {
let conn: MockConnection = new MockConnection(false, new Error().stack);

let spy = jest.spyOn(conn, 'isReadOnly');

let waiter: ConnectionReplicationWaiter = new ConnectionReplicationWaiter(conn);
await waiter.wait(null);

expect(spy).toHaveBeenCalled();
});

it('will timeout after 1 second', async () => {
let conn: MockConnection = new MockConnection(true, new Error().stack);
let waiter: ConnectionReplicationWaiter = new ConnectionReplicationWaiter(conn);

await expect(waiter.wait({
page: 2,
position: 1
}, 1000)).rejects.toBeInstanceOf(TimeoutError);
});

it('will sleep for 1 second in between retries', async () => {
let conn: MockConnection = new MockConnection(true, new Error().stack);
let waiter: ConnectionReplicationWaiter = new ConnectionReplicationWaiter(conn, 1000);

let spy = jest.spyOn(<any>waiter, '$sleep');

let p: Promise<void> = waiter.wait({
page: 2,
position: 1
});

await new Promise<void>((resolve, reject) => {
setTimeout(() => {
expect(spy).toHaveBeenCalledWith(1000);
conn.pos.page++;
resolve();
}, 0);
});

await p;
});
});
6 changes: 3 additions & 3 deletions spec/database/Database.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ describe('Database', () => {
let spy: jasmine.Spy = spyOn(<any>db, '_getConnection');
db.getConnection();

expect(spy).toHaveBeenCalledWith('SLAVE*', false);
expect(spy).toHaveBeenCalledWith('SLAVE*', false, undefined);
});

it('Can get connection via custom query', () => {
Expand All @@ -96,7 +96,7 @@ describe('Database', () => {
let spy: jasmine.Spy = spyOn(<any>db, '_getConnection');
db.getConnection(false, 'test');

expect(spy).toHaveBeenCalledWith('test', false);
expect(spy).toHaveBeenCalledWith('test', false, undefined);
});

it('can get write connection', () => {
Expand All @@ -105,6 +105,6 @@ describe('Database', () => {
let spy: jasmine.Spy = spyOn(<any>db, '_getConnection');
db.getConnection(true);

expect(spy).toHaveBeenCalledWith(MASTER, true);
expect(spy).toHaveBeenCalledWith(MASTER, true, undefined);
});
});
32 changes: 32 additions & 0 deletions spec/database/MySQLDatabase.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import {EventEmitter} from 'events';
import { getInstance } from '../../src/instance';
import { IDatabaseConnection } from '../../src/IDatabaseConnection';
import { DatabaseConnection } from '../../src/DatabaseConnection';
import { ConnectionReplicationWaiter } from '../../src/private/ConnectionReplicationWaiter';
import { MySQLConnection } from '../../src/MySQLConnection';

describe('MySQLDatabase', () => {
let app: MockApplication = null;
Expand Down Expand Up @@ -98,4 +100,34 @@ describe('MySQLDatabase', () => {

expect(spy).toHaveBeenCalled();
});

it('will wait for replication lag', async () => {
let spy = jest.spyOn(ConnectionReplicationWaiter.prototype, 'wait').mockImplementation(() => {
return Promise.resolve();
});

let db: MySQLDatabase = new MySQLDatabase();
jest.spyOn(<any>db, '$getConnectionFromPool').mockImplementation(() => {
let c: MySQLConnection = new MySQLConnection(<any>{
config: {}
}, '', true);

jest.spyOn(c, '__internal_init').mockImplementation(() => {
return Promise.resolve();
});

jest.spyOn(c, 'isMaster').mockImplementation(() => {
return false;
});

return Promise.resolve(c);
});

await db.getConnection(false, null, {
page: 2,
position: 2
});

expect(spy).toHaveBeenCalled();
});
});
9 changes: 9 additions & 0 deletions spec/support/MockConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,15 @@ import { RollbackQuery } from '../../src/private/RollbackQuery';
import { StartTransactionQuery } from '../../src/private/StartTransactionQuery';
import { IsolationLevel } from '../../src/IsolationLevel';
import { SetIsolationLevelQuery } from '../../src/private/SetIsolationLevelQuery';
import { IDatabasePosition } from '../../src/IDatabasePosition';

export class MockConnection extends DatabaseConnection<any> {
public transaction: boolean;
public closed: boolean;
public pos: IDatabasePosition = {
page: 1,
position: 1
};

public constructor(readonly: boolean, instantiationStack: string) {
super(null, readonly, instantiationStack);
Expand Down Expand Up @@ -48,6 +53,10 @@ export class MockConnection extends DatabaseConnection<any> {
await this.commit();
}

public async getCurrentDatabasePosition(): Promise<IDatabasePosition> {
return this.pos;
}

// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
protected _query(query: string, params?: any): Promise<any> {
return Promise.resolve({
Expand Down
7 changes: 4 additions & 3 deletions src/Database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import * as UUID from 'uuid';
import {DatabaseConnection} from './DatabaseConnection';
import { getInstance } from './instance';
import { IDatabasePosition } from './IDatabasePosition';

const MASTER_NAME: string = 'MASTER';
const TAG: string = 'Database';
Expand Down Expand Up @@ -61,7 +62,7 @@ export abstract class Database<TDatabaseConfig, TConnectionAPI> {
this._removeNode(slaveID);
}

public getConnection(requireWriteAccess: boolean = false, nodeID?: string): Promise<DatabaseConnection<TConnectionAPI>> {
public getConnection(requireWriteAccess: boolean = false, nodeID?: string, requiredPosition?: IDatabasePosition): Promise<DatabaseConnection<TConnectionAPI>> {
let query: string = 'SLAVE*';

if (nodeID) {
Expand All @@ -71,7 +72,7 @@ export abstract class Database<TDatabaseConfig, TConnectionAPI> {
query = 'MASTER';
}

return this._getConnection(query, requireWriteAccess);
return this._getConnection(query, requireWriteAccess, requiredPosition);
}

public destroy(): Promise<void> {
Expand All @@ -81,6 +82,6 @@ export abstract class Database<TDatabaseConfig, TConnectionAPI> {
protected abstract _destroy(): Promise<void>;
protected abstract _addNode(name: string, config: TDatabaseConfig): void;
protected abstract _removeNode(name: string): void;
protected abstract _getConnection(query: string, requireWriteAccess: boolean): Promise<DatabaseConnection<TConnectionAPI>>;
protected abstract _getConnection(query: string, requireWriteAccess: boolean, requiredPosition?: IDatabasePosition): Promise<DatabaseConnection<TConnectionAPI>>;
public abstract escape(query: string): string;
}
6 changes: 6 additions & 0 deletions src/DatabaseConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {IDatabaseConnection} from './IDatabaseConnection';
import {IQueryable} from './IQueryable';
import { IConfig } from './IConfig';
import { IsolationLevel } from './IsolationLevel';
import { IDatabasePosition } from './IDatabasePosition';

export const LINGER_WARNING: number = 10000;
export const DEFAULT_QUERY_TIMEOUT: number = 3600000;
Expand Down Expand Up @@ -260,4 +261,9 @@ export abstract class DatabaseConnection<TAPI> implements IDatabaseConnection {
*/
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
protected abstract _stream(query: string, params?: any, streamOptions?: any): Readable;

/**
* @since 8.1.0
*/
public abstract getCurrentDatabasePosition(): Promise<IDatabasePosition>;
}
6 changes: 6 additions & 0 deletions src/IDatabaseConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import { Readable } from 'stream';
import { IQueryable } from './IQueryable';
import { IsolationLevel } from './IsolationLevel';
import { IDatabasePosition } from './IDatabasePosition';

export interface IDatabaseConnection {
setInstantiationStack(stack: string): void;
Expand All @@ -33,4 +34,9 @@ export interface IDatabaseConnection {
isTransaction(): boolean;
commit(): Promise<void>;
rollback(): Promise<void>;

/**
* @since 8.1.0
*/
getCurrentDatabasePosition(): Promise<IDatabasePosition>;
}
23 changes: 23 additions & 0 deletions src/IDatabasePosition.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
Copyright 2017-2024 Norman Breau
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

/**
* @since 8.1.0
*/
export interface IDatabasePosition {
page: number;
position: number;
}
6 changes: 6 additions & 0 deletions src/ManagedDatabaseConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {getInstance} from './instance';
import {Readable} from 'stream';
import { Query } from './Query';
import { IsolationLevel } from './IsolationLevel';
import { IDatabasePosition } from './IDatabasePosition';

const TAG: string = 'ManagedDatabaseConnection';

Expand Down Expand Up @@ -81,6 +82,11 @@ export class ManagedDatabaseConnection implements IDatabaseConnection {
}
}

public async getCurrentDatabasePosition(): Promise<IDatabasePosition> {
let conn: IDatabaseConnection = await this.$getConnection();
return await conn.getCurrentDatabasePosition();
}

public isWriteRequired(): boolean {
return this.$requiresWrite;
}
Expand Down
27 changes: 27 additions & 0 deletions src/MySQLConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ import { DeadLockError } from './DeadLockError';
import { IsolationLevel } from './IsolationLevel';
import {SetIsolationLevelQuery} from './private/SetIsolationLevelQuery';
import { LockWaitTimeoutError } from './LockWaitTimeoutError';
import { IDatabasePosition } from './IDatabasePosition';
import { GetBinLogPositionQuery } from './private/GetBinLogPositionQuery';
import { GetSlavePositionQuery } from './private/GetSlavePositionQuery';
import { GetMasterPositionQuery } from './private/GetMasterPositionQuery';

const DEFAULT_HIGH_WATERMARK: number = 512; // in number of result objects
const TAG: string = 'MySQLConnection';
Expand All @@ -57,12 +61,14 @@ let rollbackQuery: Query = new RollbackQuery();
export class MySQLConnection extends DatabaseConnection<MySQL.PoolConnection> {
private $transaction: boolean;
private $opened: boolean;
private $isMasterConnection: boolean;

public constructor(connection: MySQL.PoolConnection, instantiationStack: string, isReadOnly: boolean = true) {
super(connection, isReadOnly, instantiationStack);

this.$opened = true;
this.$transaction = false;
this.$isMasterConnection = null;

connection.config.queryFormat = function(query: string, values: any) {
if (!values) return query;
Expand All @@ -77,6 +83,22 @@ export class MySQLConnection extends DatabaseConnection<MySQL.PoolConnection> {
};
}

/**
* @internal - Do not use in application code
*/
public async __internal_init(): Promise<void> {
let result = await new GetSlavePositionQuery().execute(this);
this.$isMasterConnection = result === null;
}

public isMaster(): boolean {
return this.$isMasterConnection;
}

public isReplication(): boolean {
return !this.isMaster();
}

public isTransaction(): boolean {
return this.$transaction;
}
Expand All @@ -85,6 +107,11 @@ export class MySQLConnection extends DatabaseConnection<MySQL.PoolConnection> {
return this.$opened;
}

public override async getCurrentDatabasePosition(): Promise<IDatabasePosition> {
let statusQuery: GetBinLogPositionQuery = this.isReplication() ? new GetSlavePositionQuery() : new GetMasterPositionQuery();
return await statusQuery.execute(this);
}

// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
protected _query(query: string, params?: any): Promise<any> {
let logger: BaseLogger = getInstance().getLogger();
Expand Down
36 changes: 32 additions & 4 deletions src/MySQLDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import {Database} from './Database';
import {MySQLConnection} from './MySQLConnection';
import * as MySQL from 'mysql';
import {getInstance} from './instance';
import { IDatabasePosition } from './IDatabasePosition';
import { ConnectionReplicationWaiter } from './private/ConnectionReplicationWaiter';

const TAG: string = 'MySQLDatabase';

Expand Down Expand Up @@ -69,18 +71,44 @@ export class MySQLDatabase extends Database<MySQL.PoolConfig, MySQL.PoolConnecti
});
}

protected _getConnection(query: string, requireWriteAccess: boolean): Promise<MySQLConnection> {
getInstance().getLogger().trace(TAG, `Querying connection pool for "${query}".`);
private $getConnectionFromPool(query: string, requireWriteAccess: boolean, instantationStack: string): Promise<MySQLConnection> {
return new Promise<MySQLConnection>((resolve, reject) => {
let instantationStack: string = new Error().stack;
this.$cluster.getConnection(query, (error: MySQL.MysqlError, connection: MySQL.PoolConnection) => {
if (error) {
reject(error);
return;
}

resolve(new MySQLConnection(connection, instantationStack, !requireWriteAccess));
});
});
}

protected override async _getConnection(query: string, requireWriteAccess: boolean, position?: IDatabasePosition): Promise<MySQLConnection> {
getInstance().getLogger().trace(TAG, `Querying connection pool for "${query}".`);
let instantationStack: string = new Error().stack;

let conn: MySQLConnection = await this.$getConnectionFromPool(query, requireWriteAccess, instantationStack);
await conn.__internal_init();

if (conn.isReplication()) {
// master connections will not wait on database positions
// they are guarenteed to be at the tip.
// readonly, or otherwise known as replication connections
// may have replication lag. If we have a desired position target,
// then await for the connection to catch up to that target.
if (position && position.page && position.position) {
let waiter: ConnectionReplicationWaiter = new ConnectionReplicationWaiter(conn);
try {
await waiter.wait(position);
}
catch (ex) {
conn.close(true);
throw ex;
}
}
}

return conn;
}
}
Loading

0 comments on commit 9c421ed

Please sign in to comment.