diff --git a/jest.config.js b/jest.config.js index e42fac6f..8cb50c93 100644 --- a/jest.config.js +++ b/jest.config.js @@ -1,8 +1,10 @@ module.exports = { preset: 'ts-jest', testEnvironment: 'node', + collectCoverage: true, coverageDirectory: 'coverage', collectCoverageFrom: ['packages/**/src/**/*.{ts,tsx,js,jsx}', 'packages/**/!src/**/*.d.ts'], + coverageReporters: ['text-summary', 'json', 'lcov'], globals: { 'ts-jest': { tsConfig: 'tsconfig.json', diff --git a/packages/clickhouse-adapter/.mdeprc b/packages/clickhouse-adapter/.mdeprc index b3a825c2..c05739ea 100644 --- a/packages/clickhouse-adapter/.mdeprc +++ b/packages/clickhouse-adapter/.mdeprc @@ -3,8 +3,8 @@ "node": "12.14.0", "nycCoverage": false, "coverage": false, - "test_framework": "jest --verbose -i -b -c ../../jest.config.js --coverageDirectory ", - "tests": "../clickhouse-adapter/test/suites/**/*.ts", + "test_framework": "jest --verbose -i -b -c ../../jest.config.js --runInBand --coverageDirectory ", + "tests": "../clickhouse-adapter/test/suites", "auto_compose": true, "with_local_compose": true, "docker_compose": "./test/docker-compose.yml", diff --git a/packages/clickhouse-adapter/src/client.ts b/packages/clickhouse-adapter/src/client.ts index 2704b360..3b62b38a 100644 --- a/packages/clickhouse-adapter/src/client.ts +++ b/packages/clickhouse-adapter/src/client.ts @@ -1,25 +1,36 @@ import ClickHouse from '@apla/clickhouse' import { Promise } from 'bluebird' import { merge } from 'lodash' + import { InsertData, - ClickhouseOptions, + Options, TableBuilder, ClickhouseClientInterface, - QueryOptions, + QueryStream, + ClientConfig, + Callback, } from './interfaces' export class ClickhouseClient implements ClickhouseClientInterface { - public static readonly defaultOpts: ClickhouseOptions = { + public static readonly defaultOpts: Options = { host: 'clickhouse', } public readonly connection: ClickHouse - public readonly queryAsync: (query: string, options: QueryOptions) => Promise + public readonly queryAsync: (query: string, options?: Options) => Promise + + private readonly database: string + private readonly options: Options + + constructor(config: ClientConfig) { + const { dbName, ...options } = config + + this.options = merge({}, ClickhouseClient.defaultOpts, options) + this.database = dbName - constructor(options: ClickhouseOptions) { - this.connection = new ClickHouse(merge({}, ClickhouseClient.defaultOpts, options)) + this.connection = new ClickHouse(this.options) this.queryAsync = Promise.promisify(this.query, { context: this }) } @@ -32,17 +43,20 @@ export class ClickhouseClient implements ClickhouseClientInterface { public insert( dbName: string, insertData: InsertData, - options: QueryOptions, + options: Options, cb: (err: any, result: any) => void ): void public insert(dbName: string, insertData: InsertData, arg1: any, arg2?: any): void { - const queryOptions: QueryOptions = typeof arg1 === 'object' ? arg1 : {} + const options: Options = typeof arg1 === 'object' ? arg1 : {} const stream = this.connection.query( insertData.query(), { - ...queryOptions, - format: queryOptions.format || 'JSONEachRow', - queryOptions: { database: dbName }, + ...options, + format: options.format || 'JSONEachRow', + queryOptions: { + ...options.queryOptions, + database: dbName, + }, }, typeof arg1 === 'function' ? arg1 : arg2 ) @@ -56,11 +70,41 @@ export class ClickhouseClient implements ClickhouseClientInterface { stream.end() } - public query(query: string, options: QueryOptions, cb: (err: any, result: any) => void): void { - this.connection.query( - query, - { syncParser: true, ...options, format: options.format || 'JSONCompact' }, - cb - ) + public query(query: string, options?: Options, cb?: Callback): void { + // promisify passes callback in 2nd argument if no arg provided + const callback = options.constructor === Function ? options : cb + + const opts = { + ...this.getQueryOptions(cb ? options : {}), + syncParser: true, + } + + this.connection.query(query, opts, callback) + } + + public queryStream(query: string, options?: Options, cb?: Callback): QueryStream { + const opts = { + ...this.getQueryOptions(options), + syncParser: false, + } + + return this.connection.query(query, opts, cb) + } + + private getQueryOptions(options?: Options): Record { + return { + ...options, + syncParser: true, + format: this.getFormat(options, 'JSONCompact'), + queryOptions: { + database: this.database, + ...options?.queryOptions, + ...this.options.queryOptions, + }, + } + } + + private getFormat(options: Options | undefined, defaultFormat: string): string { + return options?.format || this.options.format || defaultFormat } } diff --git a/packages/clickhouse-adapter/src/interfaces/clickhouseClientInterface.ts b/packages/clickhouse-adapter/src/interfaces/clickhouseClientInterface.ts index e6fd626a..7e5f6f31 100644 --- a/packages/clickhouse-adapter/src/interfaces/clickhouseClientInterface.ts +++ b/packages/clickhouse-adapter/src/interfaces/clickhouseClientInterface.ts @@ -1,7 +1,9 @@ import ClickHouse from '@apla/clickhouse' import { TableBuilder } from './tableBuilder' import { InsertData } from './insertData' -import { QueryOptions } from './queryOptions' +import { ClickhouseSettings } from './clickhouseSettings' + +export type Callback = (err: Error | undefined, res: any) => void export interface ClickhouseClientInterface { connection: ClickHouse @@ -10,7 +12,7 @@ export interface ClickhouseClientInterface { insert( dbName: string, insertData: InsertData, - options: QueryOptions, + options: ClickhouseSettings, cb: (err: any, result: any) => void ): void insert(dbName: string, insertData: InsertData, arg1: any, arg2: any): void diff --git a/packages/clickhouse-adapter/src/interfaces/clickhouseOptions.ts b/packages/clickhouse-adapter/src/interfaces/clickhouseOptions.ts index 4c87619c..94dcf35a 100644 --- a/packages/clickhouse-adapter/src/interfaces/clickhouseOptions.ts +++ b/packages/clickhouse-adapter/src/interfaces/clickhouseOptions.ts @@ -1,17 +1,12 @@ import { RequestOptions } from 'http' import { Formats } from '../types' +import { ClickhouseSettings } from './clickhouseSettings' -export interface ClickhouseOptions { - readonly host: string - readonly user?: string - readonly password?: string - readonly dbName?: string - readonly path?: string - readonly port?: number - readonly protocol?: 'https:' | 'http:' +export interface Options extends RequestOptions { readonly dataObjects?: boolean readonly format?: Formats - readonly queryOptions?: any + readonly omitFormat?: boolean + readonly queryOptions?: ClickhouseSettings readonly readonly?: boolean - readonly requestOptions?: RequestOptions + readonly syncParser?: boolean } diff --git a/packages/clickhouse-adapter/src/interfaces/clickhouseSettings.ts b/packages/clickhouse-adapter/src/interfaces/clickhouseSettings.ts new file mode 100644 index 00000000..b4cef5d5 --- /dev/null +++ b/packages/clickhouse-adapter/src/interfaces/clickhouseSettings.ts @@ -0,0 +1,12 @@ +const allowDDlValue = [0, 1] as const + +/** + * See https://clickhouse.tech/docs/en/operations/settings/ + */ +export interface ClickhouseSettings { + readonly database: string + readonly readonly?: boolean + readonly allow_ddl?: typeof allowDDlValue[number] + readonly profile?: string + readonly [key: string]: any +} diff --git a/packages/clickhouse-adapter/src/interfaces/clientConfig.ts b/packages/clickhouse-adapter/src/interfaces/clientConfig.ts new file mode 100644 index 00000000..812d78aa --- /dev/null +++ b/packages/clickhouse-adapter/src/interfaces/clientConfig.ts @@ -0,0 +1,11 @@ +import { Options } from './clickhouseOptions' + +export interface ClientConfig extends Options { + readonly host: string + readonly user?: string + readonly password?: string + readonly dbName?: string + readonly path?: string + readonly port?: number + readonly protocol?: 'https:' | 'http:' +} diff --git a/packages/clickhouse-adapter/src/interfaces/index.ts b/packages/clickhouse-adapter/src/interfaces/index.ts index 8b374e8f..36f55aa6 100644 --- a/packages/clickhouse-adapter/src/interfaces/index.ts +++ b/packages/clickhouse-adapter/src/interfaces/index.ts @@ -6,4 +6,6 @@ export * from './table-spec' export * from './clickhouseOptions' export * from './tableBuilder' export * from './clickhouseClientInterface' -export * from './queryOptions' +export * from './clickhouseSettings' +export * from './clientConfig' +export * from './queryStream' diff --git a/packages/clickhouse-adapter/src/interfaces/queryOptions.ts b/packages/clickhouse-adapter/src/interfaces/queryOptions.ts deleted file mode 100644 index 0b0364ba..00000000 --- a/packages/clickhouse-adapter/src/interfaces/queryOptions.ts +++ /dev/null @@ -1,17 +0,0 @@ -import { RequestOptions } from 'http' -import { Formats } from '../types' - -export interface QueryOptions { - readonly host?: string - readonly user?: string - readonly password?: string - readonly dbName?: string - readonly path?: string - readonly port?: number - readonly protocol?: 'https:' | 'http:' - readonly dataObjects?: boolean - readonly format?: Formats - readonly queryOptions?: any - readonly readonly?: boolean - readonly requestOptions?: RequestOptions -} diff --git a/packages/clickhouse-adapter/src/interfaces/queryStream.ts b/packages/clickhouse-adapter/src/interfaces/queryStream.ts new file mode 100644 index 00000000..1278ab4c --- /dev/null +++ b/packages/clickhouse-adapter/src/interfaces/queryStream.ts @@ -0,0 +1,17 @@ +import type { Duplex } from 'stream' + +export interface SupplementalInformation { + rows: number + statistics: { + elapsed: number + rows_read: number + bytes_read: number + } +} + +export interface QueryStream extends Duplex { + /** + * Available only for SELECT queries with JSON* format + */ + supplemental?: SupplementalInformation +} diff --git a/packages/clickhouse-adapter/src/migrators/system.ts b/packages/clickhouse-adapter/src/migrators/system.ts index 9bc7c290..fec0be43 100644 --- a/packages/clickhouse-adapter/src/migrators/system.ts +++ b/packages/clickhouse-adapter/src/migrators/system.ts @@ -12,7 +12,7 @@ export class SystemMigrator { } public async up(dbName: string): Promise { - await this.ch.queryAsync(createDb(dbName), { + await this.ch.connection.querying(createDb(dbName), { format: 'TabSeparated', }) diff --git a/packages/clickhouse-adapter/test/suites/01-create-table.test.ts b/packages/clickhouse-adapter/test/suites/01-create-table.test.ts index c5fab7ed..2b164bf2 100644 --- a/packages/clickhouse-adapter/test/suites/01-create-table.test.ts +++ b/packages/clickhouse-adapter/test/suites/01-create-table.test.ts @@ -31,7 +31,7 @@ describe('create table', () => { host: 'ch1', }) - await client.connection.querying('CREATE DATABASE IF NOT EXISTS db_test') + await client.connection.querying(`CREATE DATABASE IF NOT EXISTS db_test on cluster '{cluster}'`) await client.createTable( new TableMaker('db_test', 'test_replicated_table', `'{cluster}'`, { diff --git a/packages/clickhouse-adapter/test/suites/04-insert-data.test.ts b/packages/clickhouse-adapter/test/suites/04-insert-data.test.ts index 111f8567..3eb9a969 100644 --- a/packages/clickhouse-adapter/test/suites/04-insert-data.test.ts +++ b/packages/clickhouse-adapter/test/suites/04-insert-data.test.ts @@ -4,62 +4,86 @@ import { Migrator, ClickhouseClient, SystemMigrator, TableMaker } from '../../sr import { InsertData } from '../../src/interfaces' const DB_NAME = 'db_test' +const SIMPLE_TABLE = 'insert_simple' +const TSV_TABLE = 'insert_tsv' describe('Clickhouse Adapter', () => { const ch = new ClickhouseClient({ host: 'ch1', dbName: DB_NAME }) const systemMigrator = new SystemMigrator(ch) - beforeAll(() => systemMigrator.up(DB_NAME)) + beforeAll(async () => await systemMigrator.up(DB_NAME)) + describe('insert single row', () => { + beforeAll(async () => { + const migrator = new Migrator(ch) - it('insert single row', async (done: jest.DoneCallback) => { - const migrator = new Migrator(ch) + migrator.addMigration({ + name: `1_${SIMPLE_TABLE}`, + async up(clickhouseClient: ClickhouseClient): Promise { + await clickhouseClient.createTable( + new TableMaker(DB_NAME, SIMPLE_TABLE, `'{cluster}'`, { + columnDefinitions: [ + { name: 'trackDate', type: 'Date' }, + { name: 'trackTimestamp', type: 'DateTime' }, + { name: 'eventType', type: 'String' }, + ], + tableOptions: ['ENGINE = MergeTree(trackDate, (trackTimestamp, eventType), 8192)'], + }) + ) + return true + }, + }) - migrator.addMigration({ - name: '1_event_a', - async up(clickhouseClient: ClickhouseClient): Promise { - await clickhouseClient.createTable( - new TableMaker(DB_NAME, 'event_a', null, { - columnDefinitions: [ - { name: 'trackDate', type: 'Date' }, - { name: 'trackTimestamp', type: 'DateTime' }, - { name: 'eventType', type: 'String' }, - ], - tableOptions: ['ENGINE = MergeTree(trackDate, (trackTimestamp, eventType), 8192)'], - }) - ) - return true - }, + await migrator.up(migrator.migrateAll(DB_NAME)) }) - await migrator.up(migrator.migrateAll(DB_NAME)) - - const now = moment() - const insertData: InsertData = { - query: () => { - return 'INSERT INTO event_a' - }, - data: () => { - return [ + it('insert', async (done: jest.DoneCallback) => { + const now = moment() + const insertData: InsertData = { + query: () => `INSERT INTO ${SIMPLE_TABLE}`, + data: () => [ { trackDate: moment(now).format('YYYY-MM-DD'), trackTimestamp: moment(now).format('YYYY-MM-DD HH:mm:ss'), eventType: 'type_a', }, - ] - }, - } + ], + } - ch.insert(DB_NAME, insertData, () => { - ch.connection.query( - 'SELECT * FROM event_a', - { syncParser: true, queryOptions: { database: DB_NAME } }, - (_: any, result: any) => { - assert(result) - assert(result.data) - assert(result.data.length) - done() - } - ) + ch.insert(DB_NAME, insertData, (err) => { + assert.ifError(err) + ch.connection.query( + `SELECT * FROM ${SIMPLE_TABLE}`, + { syncParser: true, queryOptions: { database: DB_NAME } }, + (_: any, result: any) => { + assert(result) + assert(result.data) + assert(result.data.length) + done() + } + ) + }) + }) + + it('errors on unknown column', async (done: jest.DoneCallback) => { + const now = moment() + const insertData: InsertData = { + query: () => `INSERT INTO ${SIMPLE_TABLE}`, + data: () => [ + { + trackDate: moment(now).format('YYYY-MM-DD'), + trackTimestamp: moment(now).format('YYYY-MM-DD HH:mm:ss'), + eventTypeMissing: 'type_a', + }, + ], + } + + ch.insert(DB_NAME, insertData, (err, result) => { + assert( + (err.message = ~/Unknown field found while parsing JSONEachRow format: eventTypeMissing/) + ) + assert.ifError(result) + done() + }) }) }) @@ -67,10 +91,10 @@ describe('Clickhouse Adapter', () => { const migrator = new Migrator(ch) migrator.addMigration({ - name: '1_event_b', + name: `1_${TSV_TABLE}`, async up(clickhouseClient: ClickhouseClient): Promise { await clickhouseClient.createTable( - new TableMaker(DB_NAME, 'event_b', null, { + new TableMaker(DB_NAME, TSV_TABLE, `'{cluster}'`, { columnDefinitions: [ { name: 'trackDate', type: 'Date' }, { name: 'trackTimestamp', type: 'DateTime' }, @@ -102,16 +126,17 @@ describe('Clickhouse Adapter', () => { const insertData: InsertData = { query: () => { - return `INSERT INTO event_b (${fields.join(',')})` + return `INSERT INTO ${TSV_TABLE} (${fields.join(',')})` }, data: () => { return data.map(getArray) }, } - ch.insert(DB_NAME, insertData, { format: 'TabSeparated' }, () => { + ch.insert(DB_NAME, insertData, { format: 'TabSeparated' }, (err, result) => { + assert.ifError(err) ch.connection.query( - 'SELECT * FROM event_b', + `SELECT * FROM ${TSV_TABLE}`, { syncParser: true, queryOptions: { database: DB_NAME } }, (_: any, result: any) => { assert(result) diff --git a/packages/clickhouse-adapter/test/suites/05-query-data.test.ts b/packages/clickhouse-adapter/test/suites/05-query-data.test.ts index e7200ef3..94da21b3 100644 --- a/packages/clickhouse-adapter/test/suites/05-query-data.test.ts +++ b/packages/clickhouse-adapter/test/suites/05-query-data.test.ts @@ -1,24 +1,44 @@ import assert from 'assert' import moment from 'moment' + import { Migrator, ClickhouseClient, SystemMigrator, TableMaker } from '../../src' import { InsertData } from '../../src/interfaces' const DB_NAME = 'db_test' +const TABLE_NAME = 'test_query_data' describe('Clickhouse Adapter', () => { const ch = new ClickhouseClient({ host: 'ch1', dbName: DB_NAME }) const systemMigrator = new SystemMigrator(ch) - beforeAll(() => systemMigrator.up(DB_NAME)) + const now = moment() + const insertData: InsertData = { + query: () => { + return `INSERT INTO ${TABLE_NAME}` + }, + data: () => { + return [ + { + trackDate: moment(now).format('YYYY-MM-DD'), + trackTimestamp: moment(now).format('YYYY-MM-DD HH:mm:ss'), + eventType: 'type_a', + }, + ] + }, + } + + beforeAll(async () => { + await systemMigrator.up(DB_NAME) + }) - it('query data', async (done: jest.DoneCallback) => { + beforeAll(async () => { const migrator = new Migrator(ch) migrator.addMigration({ - name: '1_event_a', + name: `1_${TABLE_NAME}`, async up(clickhouseClient: ClickhouseClient): Promise { await clickhouseClient.createTable( - new TableMaker(DB_NAME, 'event_a', null, { + new TableMaker(DB_NAME, TABLE_NAME, `'{cluster}'`, { columnDefinitions: [ { name: 'trackDate', type: 'Date' }, { name: 'trackTimestamp', type: 'DateTime' }, @@ -32,32 +52,66 @@ describe('Clickhouse Adapter', () => { }) await migrator.up(migrator.migrateAll(DB_NAME)) + }) - const now = moment() - const insertData: InsertData = { - query: () => { - return 'INSERT INTO event_a' - }, - data: () => { - return [ - { - trackDate: moment(now).format('YYYY-MM-DD'), - trackTimestamp: moment(now).format('YYYY-MM-DD HH:mm:ss'), - eventType: 'type_a', - }, - ] - }, - } + beforeAll((done: jest.DoneCallback) => { + ch.insert(DB_NAME, insertData, (err) => { + if (err) return done.fail(err) + done() + }) + }) - ch.insert(DB_NAME, insertData, async () => { - const result = await ch.queryAsync('SELECT * FROM event_a', { + describe('query data with QueryOptions', () => { + const query = `SELECT * from ${TABLE_NAME}` + + it('simple', async () => { + const result = await ch.queryAsync(query, { queryOptions: { database: DB_NAME }, }) assert(result) assert(result.data) assert(result.data.length) - done() + }) + + it('query data as stream', async () => { + const stream = ch.queryStream(query, { + queryOptions: { database: DB_NAME }, + format: 'JSON', + }) + + const rows = [] + for await (const row of stream) { + rows.push(row) + } + + assert(rows.length === 1) + assert.ok(stream.supplemental) + assert(stream.supplemental.rows === 1) + }) + }) + + describe('query data without QueryOptions', () => { + const query = `SELECT * from ${TABLE_NAME}` + + it('simple', async () => { + const result = await ch.queryAsync(query) + assert(result) + assert(result.data) + assert(result.data.length) + }) + + it('stream', async () => { + const stream = ch.queryStream(query) + + const rows = [] + for await (const row of stream) { + rows.push(row) + } + + assert(rows.length === 1) + assert.ok(stream.supplemental) + assert(stream.supplemental.rows === 1) }) }) })