diff --git a/spec/database/ConnectionReplicationWaiter.spec.ts b/spec/database/ConnectionReplicationWaiter.spec.ts new file mode 100644 index 00000000..44718549 --- /dev/null +++ b/spec/database/ConnectionReplicationWaiter.spec.ts @@ -0,0 +1,62 @@ + +import {MockApplication} from '../support/TestApplication'; +import {ConnectionReplicationWaiter} from '../../src/private/ConnectionReplicationWaiter'; +import {MockConnection} from '../support/MockConnection'; +import { TimeoutError } from '../../src/TimeoutError'; + +describe('ConnectionReplicationWaiter', () => { + let app: MockApplication = null; + + beforeAll(async () => { + process.argv = []; + app = new MockApplication(); + await app.start(); + }); + + afterAll(async () => { + await app.close(); + }); + + it('will skip on master connections', async () => { + let conn: MockConnection = new MockConnection(false, new Error().stack); + + let spy = jest.spyOn(conn, 'isReadOnly'); + + let waiter: ConnectionReplicationWaiter = new ConnectionReplicationWaiter(conn); + await waiter.wait(null); + + expect(spy).toHaveBeenCalled(); + }); + + it('will timeout after 1 second', async () => { + let conn: MockConnection = new MockConnection(true, new Error().stack); + let waiter: ConnectionReplicationWaiter = new ConnectionReplicationWaiter(conn); + + await expect(waiter.wait({ + page: 2, + position: 1 + }, 1000)).rejects.toBeInstanceOf(TimeoutError); + }); + + it('will sleep for 1 second in between retries', async () => { + let conn: MockConnection = new MockConnection(true, new Error().stack); + let waiter: ConnectionReplicationWaiter = new ConnectionReplicationWaiter(conn, 1000); + + let spy = jest.spyOn(waiter, '$sleep'); + + let p: Promise = waiter.wait({ + page: 2, + position: 1 + }); + + await new Promise((resolve, reject) => { + setTimeout(() => { + expect(spy).toHaveBeenCalledWith(1000); + conn.pos.page++; + resolve(); + }, 0); + }); + + await p; + }); +}); diff --git a/spec/database/Database.spec.ts b/spec/database/Database.spec.ts index da11dfc2..75b7e2ce 100644 --- a/spec/database/Database.spec.ts +++ b/spec/database/Database.spec.ts @@ -87,7 +87,7 @@ describe('Database', () => { let spy: jasmine.Spy = spyOn(db, '_getConnection'); db.getConnection(); - expect(spy).toHaveBeenCalledWith('SLAVE*', false); + expect(spy).toHaveBeenCalledWith('SLAVE*', false, undefined); }); it('Can get connection via custom query', () => { @@ -96,7 +96,7 @@ describe('Database', () => { let spy: jasmine.Spy = spyOn(db, '_getConnection'); db.getConnection(false, 'test'); - expect(spy).toHaveBeenCalledWith('test', false); + expect(spy).toHaveBeenCalledWith('test', false, undefined); }); it('can get write connection', () => { @@ -105,6 +105,6 @@ describe('Database', () => { let spy: jasmine.Spy = spyOn(db, '_getConnection'); db.getConnection(true); - expect(spy).toHaveBeenCalledWith(MASTER, true); + expect(spy).toHaveBeenCalledWith(MASTER, true, undefined); }); }); diff --git a/spec/database/MySQLDatabase.spec.ts b/spec/database/MySQLDatabase.spec.ts index fd323252..6019a9e8 100644 --- a/spec/database/MySQLDatabase.spec.ts +++ b/spec/database/MySQLDatabase.spec.ts @@ -8,6 +8,8 @@ import {EventEmitter} from 'events'; import { getInstance } from '../../src/instance'; import { IDatabaseConnection } from '../../src/IDatabaseConnection'; import { DatabaseConnection } from '../../src/DatabaseConnection'; +import { ConnectionReplicationWaiter } from '../../src/private/ConnectionReplicationWaiter'; +import { MySQLConnection } from '../../src/MySQLConnection'; describe('MySQLDatabase', () => { let app: MockApplication = null; @@ -98,4 +100,34 @@ describe('MySQLDatabase', () => { expect(spy).toHaveBeenCalled(); }); + + it('will wait for replication lag', async () => { + let spy = jest.spyOn(ConnectionReplicationWaiter.prototype, 'wait').mockImplementation(() => { + return Promise.resolve(); + }); + + let db: MySQLDatabase = new MySQLDatabase(); + jest.spyOn(db, '$getConnectionFromPool').mockImplementation(() => { + let c: MySQLConnection = new MySQLConnection({ + config: {} + }, '', true); + + jest.spyOn(c, '__internal_init').mockImplementation(() => { + return Promise.resolve(); + }); + + jest.spyOn(c, 'isMaster').mockImplementation(() => { + return false; + }); + + return Promise.resolve(c); + }); + + await db.getConnection(false, null, { + page: 2, + position: 2 + }); + + expect(spy).toHaveBeenCalled(); + }); }); diff --git a/spec/support/MockConnection.ts b/spec/support/MockConnection.ts index cc577d93..3888af9b 100755 --- a/spec/support/MockConnection.ts +++ b/spec/support/MockConnection.ts @@ -5,10 +5,15 @@ import { RollbackQuery } from '../../src/private/RollbackQuery'; import { StartTransactionQuery } from '../../src/private/StartTransactionQuery'; import { IsolationLevel } from '../../src/IsolationLevel'; import { SetIsolationLevelQuery } from '../../src/private/SetIsolationLevelQuery'; +import { IDatabasePosition } from '../../src/IDatabasePosition'; export class MockConnection extends DatabaseConnection { public transaction: boolean; public closed: boolean; + public pos: IDatabasePosition = { + page: 1, + position: 1 + }; public constructor(readonly: boolean, instantiationStack: string) { super(null, readonly, instantiationStack); @@ -48,6 +53,10 @@ export class MockConnection extends DatabaseConnection { await this.commit(); } + public async getCurrentDatabasePosition(): Promise { + return this.pos; + } + // eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types protected _query(query: string, params?: any): Promise { return Promise.resolve({ diff --git a/src/Database.ts b/src/Database.ts index aeabd55a..ae25be61 100755 --- a/src/Database.ts +++ b/src/Database.ts @@ -17,6 +17,7 @@ import * as UUID from 'uuid'; import {DatabaseConnection} from './DatabaseConnection'; import { getInstance } from './instance'; +import { IDatabasePosition } from './IDatabasePosition'; const MASTER_NAME: string = 'MASTER'; const TAG: string = 'Database'; @@ -61,7 +62,7 @@ export abstract class Database { this._removeNode(slaveID); } - public getConnection(requireWriteAccess: boolean = false, nodeID?: string): Promise> { + public getConnection(requireWriteAccess: boolean = false, nodeID?: string, requiredPosition?: IDatabasePosition): Promise> { let query: string = 'SLAVE*'; if (nodeID) { @@ -71,7 +72,7 @@ export abstract class Database { query = 'MASTER'; } - return this._getConnection(query, requireWriteAccess); + return this._getConnection(query, requireWriteAccess, requiredPosition); } public destroy(): Promise { @@ -81,6 +82,6 @@ export abstract class Database { protected abstract _destroy(): Promise; protected abstract _addNode(name: string, config: TDatabaseConfig): void; protected abstract _removeNode(name: string): void; - protected abstract _getConnection(query: string, requireWriteAccess: boolean): Promise>; + protected abstract _getConnection(query: string, requireWriteAccess: boolean, requiredPosition?: IDatabasePosition): Promise>; public abstract escape(query: string): string; } diff --git a/src/DatabaseConnection.ts b/src/DatabaseConnection.ts index 8dce0eb5..ac3a984f 100755 --- a/src/DatabaseConnection.ts +++ b/src/DatabaseConnection.ts @@ -22,6 +22,7 @@ import {IDatabaseConnection} from './IDatabaseConnection'; import {IQueryable} from './IQueryable'; import { IConfig } from './IConfig'; import { IsolationLevel } from './IsolationLevel'; +import { IDatabasePosition } from './IDatabasePosition'; export const LINGER_WARNING: number = 10000; export const DEFAULT_QUERY_TIMEOUT: number = 3600000; @@ -260,4 +261,9 @@ export abstract class DatabaseConnection implements IDatabaseConnection { */ // eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types protected abstract _stream(query: string, params?: any, streamOptions?: any): Readable; + + /** + * @since 8.1.0 + */ + public abstract getCurrentDatabasePosition(): Promise; } diff --git a/src/IDatabaseConnection.ts b/src/IDatabaseConnection.ts index 7aca53e4..c1b82188 100755 --- a/src/IDatabaseConnection.ts +++ b/src/IDatabaseConnection.ts @@ -17,6 +17,7 @@ import { Readable } from 'stream'; import { IQueryable } from './IQueryable'; import { IsolationLevel } from './IsolationLevel'; +import { IDatabasePosition } from './IDatabasePosition'; export interface IDatabaseConnection { setInstantiationStack(stack: string): void; @@ -33,4 +34,9 @@ export interface IDatabaseConnection { isTransaction(): boolean; commit(): Promise; rollback(): Promise; + + /** + * @since 8.1.0 + */ + getCurrentDatabasePosition(): Promise; } diff --git a/src/IDatabasePosition.ts b/src/IDatabasePosition.ts new file mode 100644 index 00000000..f0720d7e --- /dev/null +++ b/src/IDatabasePosition.ts @@ -0,0 +1,23 @@ +/* + Copyright 2017-2024 Norman Breau + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +/** + * @since 8.1.0 + */ +export interface IDatabasePosition { + page: number; + position: number; +} diff --git a/src/ManagedDatabaseConnection.ts b/src/ManagedDatabaseConnection.ts index 754569bf..ed49e7dd 100755 --- a/src/ManagedDatabaseConnection.ts +++ b/src/ManagedDatabaseConnection.ts @@ -19,6 +19,7 @@ import {getInstance} from './instance'; import {Readable} from 'stream'; import { Query } from './Query'; import { IsolationLevel } from './IsolationLevel'; +import { IDatabasePosition } from './IDatabasePosition'; const TAG: string = 'ManagedDatabaseConnection'; @@ -81,6 +82,11 @@ export class ManagedDatabaseConnection implements IDatabaseConnection { } } + public async getCurrentDatabasePosition(): Promise { + let conn: IDatabaseConnection = await this.$getConnection(); + return await conn.getCurrentDatabasePosition(); + } + public isWriteRequired(): boolean { return this.$requiresWrite; } diff --git a/src/MySQLConnection.ts b/src/MySQLConnection.ts index 6d91542d..41455876 100755 --- a/src/MySQLConnection.ts +++ b/src/MySQLConnection.ts @@ -31,6 +31,10 @@ import { DeadLockError } from './DeadLockError'; import { IsolationLevel } from './IsolationLevel'; import {SetIsolationLevelQuery} from './private/SetIsolationLevelQuery'; import { LockWaitTimeoutError } from './LockWaitTimeoutError'; +import { IDatabasePosition } from './IDatabasePosition'; +import { GetBinLogPositionQuery } from './private/GetBinLogPositionQuery'; +import { GetSlavePositionQuery } from './private/GetSlavePositionQuery'; +import { GetMasterPositionQuery } from './private/GetMasterPositionQuery'; const DEFAULT_HIGH_WATERMARK: number = 512; // in number of result objects const TAG: string = 'MySQLConnection'; @@ -57,12 +61,14 @@ let rollbackQuery: Query = new RollbackQuery(); export class MySQLConnection extends DatabaseConnection { private $transaction: boolean; private $opened: boolean; + private $isMasterConnection: boolean; public constructor(connection: MySQL.PoolConnection, instantiationStack: string, isReadOnly: boolean = true) { super(connection, isReadOnly, instantiationStack); this.$opened = true; this.$transaction = false; + this.$isMasterConnection = null; connection.config.queryFormat = function(query: string, values: any) { if (!values) return query; @@ -77,6 +83,22 @@ export class MySQLConnection extends DatabaseConnection { }; } + /** + * @internal - Do not use in application code + */ + public async __internal_init(): Promise { + let result = await new GetSlavePositionQuery().execute(this); + this.$isMasterConnection = result === null; + } + + public isMaster(): boolean { + return this.$isMasterConnection; + } + + public isReplication(): boolean { + return !this.isMaster(); + } + public isTransaction(): boolean { return this.$transaction; } @@ -85,6 +107,11 @@ export class MySQLConnection extends DatabaseConnection { return this.$opened; } + public override async getCurrentDatabasePosition(): Promise { + let statusQuery: GetBinLogPositionQuery = this.isReplication() ? new GetSlavePositionQuery() : new GetMasterPositionQuery(); + return await statusQuery.execute(this); + } + // eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types protected _query(query: string, params?: any): Promise { let logger: BaseLogger = getInstance().getLogger(); diff --git a/src/MySQLDatabase.ts b/src/MySQLDatabase.ts index 43be1522..9cf9badf 100755 --- a/src/MySQLDatabase.ts +++ b/src/MySQLDatabase.ts @@ -18,6 +18,8 @@ import {Database} from './Database'; import {MySQLConnection} from './MySQLConnection'; import * as MySQL from 'mysql'; import {getInstance} from './instance'; +import { IDatabasePosition } from './IDatabasePosition'; +import { ConnectionReplicationWaiter } from './private/ConnectionReplicationWaiter'; const TAG: string = 'MySQLDatabase'; @@ -69,18 +71,44 @@ export class MySQLDatabase extends Database { - getInstance().getLogger().trace(TAG, `Querying connection pool for "${query}".`); + private $getConnectionFromPool(query: string, requireWriteAccess: boolean, instantationStack: string): Promise { return new Promise((resolve, reject) => { - let instantationStack: string = new Error().stack; this.$cluster.getConnection(query, (error: MySQL.MysqlError, connection: MySQL.PoolConnection) => { if (error) { reject(error); return; } - + resolve(new MySQLConnection(connection, instantationStack, !requireWriteAccess)); }); }); } + + protected override async _getConnection(query: string, requireWriteAccess: boolean, position?: IDatabasePosition): Promise { + getInstance().getLogger().trace(TAG, `Querying connection pool for "${query}".`); + let instantationStack: string = new Error().stack; + + let conn: MySQLConnection = await this.$getConnectionFromPool(query, requireWriteAccess, instantationStack); + await conn.__internal_init(); + + if (conn.isReplication()) { + // master connections will not wait on database positions + // they are guarenteed to be at the tip. + // readonly, or otherwise known as replication connections + // may have replication lag. If we have a desired position target, + // then await for the connection to catch up to that target. + if (position && position.page && position.position) { + let waiter: ConnectionReplicationWaiter = new ConnectionReplicationWaiter(conn); + try { + await waiter.wait(position); + } + catch (ex) { + conn.close(true); + throw ex; + } + } + } + + return conn; + } } diff --git a/src/TimeoutError.ts b/src/TimeoutError.ts new file mode 100644 index 00000000..8fff4dc4 --- /dev/null +++ b/src/TimeoutError.ts @@ -0,0 +1,44 @@ +/* + Copyright 2017-2024 Norman Breau + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +import {StormError} from './StormError'; +import {ErrorCode} from './ErrorCode'; +import {StatusCode} from './StatusCode'; + +/** + * @since 8.1.0 + */ +export class TimeoutError extends StormError { + public constructor() { + super(); + } + + public getMessage(): string { + return `The requested resource could not be reached in time.`; + } + + public getCode(): ErrorCode { + return ErrorCode.INTERNAL; + } + + public getHTTPCode(): StatusCode { + return StatusCode.ERR_REQUEST_TIMEOUT; + } + + public override getLocaleCode(): string { + return '@breautek/storm/TimeoutError/message'; + } +} diff --git a/src/api.ts b/src/api.ts index 0559e0b7..d9122993 100755 --- a/src/api.ts +++ b/src/api.ts @@ -51,6 +51,7 @@ export { IErrorResponse, IAdditionalErrorDetails } from './StormError'; +export {TimeoutError} from './TimeoutError'; export {JWTError} from './JWTError'; export {MissingParameterError} from './MissingParameterError'; export {InvalidCredentialsError} from './InvalidCredentialsError'; @@ -96,6 +97,7 @@ export {IDatabaseConnection} from './IDatabaseConnection'; export {IServiceHeaders} from './IServiceHeaders'; export {IAuthTokenData} from './IAuthTokenData'; export {IQueryable} from './IQueryable'; +export {IDatabasePosition} from './IDatabasePosition'; // Token export {Token} from './Token'; diff --git a/src/private/ConnectionReplicationWaiter.ts b/src/private/ConnectionReplicationWaiter.ts new file mode 100644 index 00000000..01705263 --- /dev/null +++ b/src/private/ConnectionReplicationWaiter.ts @@ -0,0 +1,110 @@ + +/* + Copyright 2017-2024 Norman Breau + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +import { IDatabaseConnection } from '../IDatabaseConnection'; +import { IDatabasePosition } from '../IDatabasePosition'; +import { InternalError } from '../InternalError'; +import { TimeoutError } from '../TimeoutError'; + +export class ConnectionReplicationWaiter { + /** + * The default retry delay in milliseconds. + * Defaults to 1 second. + * + * This is the value used in between status query iterations. + */ + public static readonly DEFAULT_RETRY_DELAY: number = 1000; + + /** + * The default timeout delay. Defaults to 120 seconds. + * If the connection could not reach the target position in time, + * then the wait will timeout + * + * Using `Infinity` will disable the timeout + */ + public static readonly DEFAULT_TIMEOUT = 120 * 1000; // 120 seconds in ms + + private $conn: IDatabaseConnection; + private $retryDelay: number; + + public constructor(conn: IDatabaseConnection, retryDelay: number = ConnectionReplicationWaiter.DEFAULT_RETRY_DELAY) { + this.$conn = conn; + this.$retryDelay = retryDelay; + } + + private $sleep(timeout: number): Promise { + return new Promise((resolve, reject) => { + setTimeout(() => { + resolve(); + }, timeout); + }); + } + + public async wait(target: IDatabasePosition, timeout: number = ConnectionReplicationWaiter.DEFAULT_TIMEOUT): Promise { + if (!this.$conn.isReadOnly()) { + return; + } + + let shouldTimeout: boolean = false; + let didTimeout: boolean = false; + + let timeoutTimer = setTimeout(() => { + shouldTimeout = true; + }, timeout); + + // eslint-disable-next-line no-constant-condition + while (true) { + if (shouldTimeout) { + didTimeout = true; + break; + } + + let currentPos: IDatabasePosition = await this.$conn.getCurrentDatabasePosition(); + + if (currentPos === null || (currentPos && !currentPos.page === null) || (currentPos && !currentPos.position === null)) { + throw new InternalError('Database Position not supported'); + } + + // If the current page is greater than the target page, + // then the position doesn't matter, we are ahead of the + // desired target position. + if (currentPos.page > target.page) { + break; + } + + // If the pages are equal, we need to check the position + // and ensure we are at or greater than the target. + if (currentPos.page === target.page && currentPos.position >= target.position) { + break; + } + + // If we made it here, it means the position state checks failed + // so wait a short while and then re-iterate + await this.$sleep(this.$retryDelay); + } + + if (didTimeout) { + // If the while loop exited due to the timeout, + // then throw a timeout error + throw new TimeoutError(); + } + + // Otherwise we are good to go! + clearTimeout(timeoutTimer); + } +} + diff --git a/src/private/GetBinLogPositionQuery.ts b/src/private/GetBinLogPositionQuery.ts new file mode 100644 index 00000000..82c609e0 --- /dev/null +++ b/src/private/GetBinLogPositionQuery.ts @@ -0,0 +1,59 @@ +/* + Copyright 2017-2024 Norman Breau + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +import {Query} from '../Query'; +import {IDatabasePosition} from '../IDatabasePosition'; +import { IDatabaseConnection } from '../IDatabaseConnection'; + +/** + * @since 8.1.0 + */ +export abstract class GetBinLogPositionQuery extends Query { + protected abstract _getQuery(): string; + + protected abstract _getFile(row: TStatusRow): string; + protected abstract _getPosition(row: TStatusRow): string; + + public async onPostProcess(connection: IDatabaseConnection, resultSet: TStatusRow[]): Promise { + let row: TStatusRow = null; + if (resultSet.length > 0) { + row = resultSet[0]; + } + + if (!row) { + return null; + } + + let pageParts: string[] = this._getFile(row).split('.'); + let strPage: string = pageParts[pageParts.length - 1]; + + let page: number = parseInt(strPage); + + if (isNaN(page)) { + return null; + } + + let position: number = parseInt(this._getPosition(row)); + if (isNaN(position)) { + return null; + } + + return { + page: page, + position: position + }; + } +} diff --git a/src/private/GetMasterPositionQuery.ts b/src/private/GetMasterPositionQuery.ts new file mode 100644 index 00000000..c25db680 --- /dev/null +++ b/src/private/GetMasterPositionQuery.ts @@ -0,0 +1,37 @@ +/* + Copyright 2017-2024 Norman Breau + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +import { GetBinLogPositionQuery } from './GetBinLogPositionQuery'; + +interface IStatus { + File: string; + Position: string; +} + +export class GetMasterPositionQuery extends GetBinLogPositionQuery { + + protected override _getQuery(): string { + return 'SHOW MASTER STATUS'; + } + + protected override _getFile(row: IStatus): string { + return row.File; + } + + protected override _getPosition(row: IStatus): string { + return row.Position; + } +} diff --git a/src/private/GetSlavePositionQuery.ts b/src/private/GetSlavePositionQuery.ts new file mode 100644 index 00000000..0f23c158 --- /dev/null +++ b/src/private/GetSlavePositionQuery.ts @@ -0,0 +1,37 @@ +/* + Copyright 2017-2024 Norman Breau + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +import { GetBinLogPositionQuery } from './GetBinLogPositionQuery'; + +interface IStatus { + Master_Log_File: string; + Read_Master_Log_Pos: string; +} + +export class GetSlavePositionQuery extends GetBinLogPositionQuery { + + protected override _getQuery(): string { + return 'SHOW SLAVE STATUS'; + } + + protected override _getFile(row: IStatus): string { + return row.Master_Log_File; + } + + protected override _getPosition(row: IStatus): string { + return row.Read_Master_Log_Pos; + } +}