diff --git a/package-lock.json b/package-lock.json index 9123075..ce9a0ce 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1440,6 +1440,12 @@ "integrity": "sha512-IkVfat549ggtkZUthUzEX49562eGikhSYeVGX97SkMFn+sTZrgRewXjQ4tPKFPCykZHkX1Zfd9OoELGqKU2jJA==", "dev": true }, + "@types/promise-timeout": { + "version": "1.3.0", + "resolved": "https://registry.npmjs.org/@types/promise-timeout/-/promise-timeout-1.3.0.tgz", + "integrity": "sha512-AtVKSZUtpBoZ4SshXJk5JcTXJllinHKKx615lsRNJUsbbFlI0AI8drlnoiQ+PNvjkeoF9Y8fJUh6UO2khsIBZw==", + "dev": true + }, "@types/ramda": { "version": "0.26.33", "resolved": "https://registry.npmjs.org/@types/ramda/-/ramda-0.26.33.tgz", @@ -5986,9 +5992,9 @@ "dev": true }, "nan": { - "version": "2.14.1", - "resolved": "https://registry.npmjs.org/nan/-/nan-2.14.1.tgz", - "integrity": "sha512-isWHgVjnFjh2x2yuJ/tj3JbwoHu3UC2dX5G/88Cm24yB6YopVgxvBObDY7n5xW6ExmFhJpSEQqFPvq9zaXc8Jw==" + "version": "2.14.2", + "resolved": "https://registry.npmjs.org/nan/-/nan-2.14.2.tgz", + "integrity": "sha512-M2ufzIiINKCuDfBSAUr1vWQ+vuVcA9kqx8JJUsbQi6yf1uGRyb7HfpdfUr5qLXf3B/t8dPvcjhKMmlfnP47EzQ==" }, "nanomatch": { "version": "1.2.13", @@ -6068,9 +6074,9 @@ } }, "node-rdkafka": { - "version": "2.9.1", - "resolved": "https://registry.npmjs.org/node-rdkafka/-/node-rdkafka-2.9.1.tgz", - "integrity": "sha512-C5EVDZlDG+5D8KXiz2zKwEiLWIGW5Z1mkVFRzp13T4mrbXz+ESyjrDSLIj7aoUIi5+T10H9p1wwLZJBh9ivjLg==", + "version": "2.10.0", + "resolved": "https://registry.npmjs.org/node-rdkafka/-/node-rdkafka-2.10.0.tgz", + "integrity": "sha512-SCgf/oebWpdv3elpukj2niNOtGQXHK55AQb4TebDJcPJSfMwyN5+w++B2toQF++UlpDQbPpVDEMfWzvtN5kHxQ==", "requires": { "bindings": "^1.3.1", "nan": "^2.14.0" @@ -6340,6 +6346,11 @@ "react-is": "^16.8.4" } }, + "promise-timeout": { + "version": "1.3.0", + "resolved": "https://registry.npmjs.org/promise-timeout/-/promise-timeout-1.3.0.tgz", + "integrity": "sha512-5yANTE0tmi5++POym6OgtFmwfDvOXABD9oj/jLQr5GPEyuNEb7jH4wbbANJceJid49jwhi1RddxnhnEAb/doqg==" + }, "prompts": { "version": "2.3.2", "resolved": "https://registry.npmjs.org/prompts/-/prompts-2.3.2.tgz", diff --git a/package.json b/package.json index a9e1e85..654092f 100644 --- a/package.json +++ b/package.json @@ -20,13 +20,15 @@ "dependencies": { "@melonade/melonade-declaration": "^0.19.2", "axios": "^0.20.0", - "node-rdkafka": "^2.9.1", + "node-rdkafka": "^2.10.0", + "promise-timeout": "^1.3.0", "ramda": "^0.26.1", "tslib": "^2.0.1" }, "devDependencies": { "@types/jest": "^24.0.23", "@types/node": "^10.17.5", + "@types/promise-timeout": "^1.3.0", "@types/ramda": "^0.26.33", "jest": "^26.4.2", "prettier": "^2.0.5", diff --git a/src/example/syncWorker.ts b/src/example/syncWorker.ts index 708e523..a06fcbf 100644 --- a/src/example/syncWorker.ts +++ b/src/example/syncWorker.ts @@ -1,13 +1,13 @@ import { State, Task } from '@melonade/melonade-declaration'; import { SyncWorker, TaskStates } from '..'; -const kafkaServers = process.env['MELONADE_KAFKA_SERVERS']; -const namespace = process.env['MELONADE_NAMESPACE']; -const processManagerUrl = - process.env['MELONADE_PROCESS_MANAGER_URL'] || 'http://localhost:8081'; +const kafkaServers = 'localhost:29092'; +const namespace = 'docker-compose'; +const processManagerUrl = 'http://localhost:8081'; const sleep = (ms: number) => new Promise((res) => setTimeout(res, ms)); +// tslint:disable-next-line: no-for-in for (const forkID in new Array(1).fill(null)) { for (const workerId of [1, 2, 3]) { const worker = new SyncWorker( diff --git a/src/example/worker.ts b/src/example/worker.ts index 3e3a3e4..2967961 100644 --- a/src/example/worker.ts +++ b/src/example/worker.ts @@ -1,28 +1,25 @@ import { State, Task } from '@melonade/melonade-declaration'; import { Worker } from '..'; -const kafkaServers = process.env['MELONADE_KAFKA_SERVERS']; -const namespace = process.env['MELONADE_NAMESPACE']; +const kafkaServers = 'localhost:29092'; +const namespace = 'docker-compose'; +const sleep = (ms: number) => new Promise((res) => setTimeout(res, ms)); + +// tslint:disable-next-line: no-for-in for (const forkID in new Array(1).fill(null)) { - for (const workerId of [1, 2, 3]) { + for (const workerId of [4]) { const worker = new Worker( // task name `t${workerId}`, // process task - (task, _logger, _isTimeOut, updateTask) => { - setTimeout(() => { - updateTask(task, { - status: State.TaskStates.Completed, - }); - - console.log(`Async Completed ${task.taskName}`); - }, 5000); - - console.log(`Processing ${task.taskName}`); + async (task, _logger, _isTimeOut) => { + console.log(`Processing ${task.taskName}: ${task.taskId}`); + await sleep(5 * 1000); + console.log(`Processed ${task.taskName}: ${task.taskId}`); return { - status: State.TaskStates.Inprogress, + status: State.TaskStates.Completed, }; }, diff --git a/src/syncWorker.ts b/src/syncWorker.ts index 9ed7c43..e29d928 100644 --- a/src/syncWorker.ts +++ b/src/syncWorker.ts @@ -7,6 +7,8 @@ import { LibrdKafkaError, Message, } from 'node-rdkafka'; +import { timeout, TimeoutError } from 'promise-timeout'; +import * as R from 'ramda'; import { jsonTryParse } from './utils/common'; import { isTaskTimeout, @@ -25,6 +27,7 @@ export interface ISyncWorkerConfig { autoStart?: boolean; latencyCompensationMs?: number; trackingRunningTasks?: boolean; + batchTimeoutMs?: number; } export interface ISyncUpdateTask { @@ -39,6 +42,7 @@ const DEFAULT_WORKER_CONFIG = { autoStart: true, latencyCompensationMs: 50, trackingRunningTasks: false, + batchTimeoutMs: 10 * 60 * 1000, // 10 mins } as ISyncWorkerConfig; // Maybe use kafka streamAPI @@ -57,7 +61,7 @@ export class SyncWorker extends EventEmitter { isTimeout: boolean, ) => void | Promise; private runningTasks: { - [taskId: string]: Task.ITask; + [taskId: string]: Task.ITask | string; } = {}; private tasksName: string | string[]; @@ -98,6 +102,10 @@ export class SyncWorker extends EventEmitter { 'bootstrap.servers': workerConfig.kafkaServers, 'group.id': `melonade-${this.workerConfig.namespace}-client-${tn}`, 'enable.auto.commit': false, + 'max.poll.interval.ms': Math.max( + 300000, + this.workerConfig.batchTimeoutMs * 5, + ), ...kafkaConfig, }, { 'auto.offset.reset': 'latest' }, @@ -126,13 +134,14 @@ export class SyncWorker extends EventEmitter { this.consumer.connect(); process.once('SIGTERM', () => { + console.log(`${this.tasksName}: unsubscribed`); this.consumer.unsubscribe(); }); } get health(): { consumer: 'connected' | 'disconnected'; - tasks: { [taskId: string]: Task.ITask }; + tasks: { [taskId: string]: Task.ITask | string }; } { return { consumer: this.consumer.isConnected() ? 'connected' : 'disconnected', @@ -143,12 +152,13 @@ export class SyncWorker extends EventEmitter { consume = ( messageNumber: number = this.workerConfig.maximumPollingTasks, ): Promise => { - return new Promise((resolve: Function, reject: Function) => { + return new Promise((resolve: Function) => { this.consumer.consume( messageNumber, (error: LibrdKafkaError, messages: Message[]) => { if (error) { - setTimeout(() => reject(error), 1000); + console.log(`${this.tasksName}: consume error`, error); + setTimeout(() => resolve([]), 1000); } else { resolve( messages.map((message: Kafka.kafkaConsumerMessage) => @@ -181,7 +191,12 @@ export class SyncWorker extends EventEmitter { }; commit = () => { - return this.consumer.commit(); + try { + // @ts-ignore + this.consumer.commitSync(); + } catch (error) { + console.log(`${this.tasksName}: commit error`, error); + } }; private dispatchTask = async (task: Task.ITask, isTimeout: boolean) => { @@ -207,6 +222,8 @@ export class SyncWorker extends EventEmitter { if (this.workerConfig.trackingRunningTasks) { this.runningTasks[task.taskId] = task; + } else { + this.runningTasks[task.taskId] = task.taskId; } try { @@ -214,24 +231,36 @@ export class SyncWorker extends EventEmitter { } catch (error) { console.warn(this.tasksName, error); } finally { - if (this.workerConfig.trackingRunningTasks) { - delete this.runningTasks[task.taskId]; - } + delete this.runningTasks[task.taskId]; } }; private poll = async () => { // https://github.com/nodejs/node/issues/6673 while (this.isSubscribed) { - try { - const tasks = await this.consume(); - if (tasks.length > 0) { - await Promise.all(tasks.map(this.processTask)); + const tasks = await this.consume(); + if (tasks.length > 0) { + try { + if (this.workerConfig.batchTimeoutMs > 0) { + await timeout( + Promise.all(tasks.map(this.processTask)), + this.workerConfig.batchTimeoutMs, + ); + } else { + await Promise.all(tasks.map(this.processTask)); + } + } catch (error) { + if (error instanceof TimeoutError) { + console.log( + `${this.tasksName}: batch timeout`, + R.keys(this.runningTasks), + ); + } else { + console.log(this.tasksName, 'process error', error); + } + } finally { this.commit(); } - } catch (err) { - // In case of consume error - console.log(this.tasksName, err); } } diff --git a/src/worker.ts b/src/worker.ts index 013b12c..2505732 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -7,6 +7,7 @@ import { Message, Producer, } from 'node-rdkafka'; +import { timeout, TimeoutError } from 'promise-timeout'; import * as R from 'ramda'; import { jsonTryParse } from './utils/common'; @@ -19,6 +20,7 @@ export interface IWorkerConfig { autoStart?: boolean; latencyCompensationMs?: number; trackingRunningTasks?: boolean; + batchTimeoutMs?: number; } export interface ITaskResponse { @@ -48,6 +50,7 @@ const DEFAULT_WORKER_CONFIG = { autoStart: true, latencyCompensationMs: 50, trackingRunningTasks: false, + batchTimeoutMs: 10 * 60 * 1000, // 10 mins } as IWorkerConfig; export const alwaysCompleteFunction = (): ITaskResponse => ({ @@ -107,7 +110,7 @@ export class Worker extends EventEmitter { updateTask: IUpdateTask, ) => ITaskResponse | Promise; private runningTasks: { - [taskId: string]: Task.ITask; + [taskId: string]: Task.ITask | string; } = {}; private tasksName: string | string[]; @@ -138,14 +141,25 @@ export class Worker extends EventEmitter { ...workerConfig, }; + var tn = ''; + if (Array.isArray(tasksName)) { + tn = tasksName.join('-'); + } else { + tn = tasksName; + } + this.consumer = new KafkaConsumer( { 'bootstrap.servers': workerConfig.kafkaServers, - 'group.id': `melonade-${this.workerConfig.namespace}.client`, + 'group.id': `melonade-${this.workerConfig.namespace}-client-${tn}`, 'enable.auto.commit': false, + 'max.poll.interval.ms': Math.max( + 300000, + this.workerConfig.batchTimeoutMs * 5, + ), ...kafkaConfig, }, - { 'auto.offset.reset': 'earliest' }, + { 'auto.offset.reset': 'latest' }, ); this.producer = new Producer( { @@ -196,17 +210,14 @@ export class Worker extends EventEmitter { process.once('SIGTERM', () => { this.consumer.unsubscribe(); - - // setTimeout(() => { - // process.exit(0); - // }, 1000); + console.log(`${this.tasksName}: unsubscribed`); }); } get health(): { consumer: 'connected' | 'disconnected'; producer: 'connected' | 'disconnected'; - tasks: { [taskId: string]: Task.ITask }; + tasks: { [taskId: string]: Task.ITask | string }; } { return { consumer: this.consumer.isConnected() ? 'connected' : 'disconnected', @@ -222,12 +233,13 @@ export class Worker extends EventEmitter { consume = ( messageNumber: number = this.workerConfig.maximumPollingTasks, ): Promise => { - return new Promise((resolve: Function, reject: Function) => { + return new Promise((resolve: Function) => { this.consumer.consume( messageNumber, (error: LibrdKafkaError, messages: Message[]) => { if (error) { - setTimeout(() => reject(error), 1000); + console.log(`${this.tasksName}: consume error`, error); + setTimeout(() => resolve([]), 1000); } else { resolve( messages.map((message: Kafka.kafkaConsumerMessage) => @@ -261,7 +273,12 @@ export class Worker extends EventEmitter { }; commit = () => { - return this.consumer.commit(); + try { + // @ts-ignore + this.consumer.commitSync(); + } catch (error) { + console.log(`${this.tasksName}: commit error`, error); + } }; private dispatchTask = async (task: Task.ITask, isTimeout: boolean) => { @@ -307,6 +324,8 @@ export class Worker extends EventEmitter { if (this.workerConfig.trackingRunningTasks) { this.runningTasks[task.taskId] = task; + } else { + this.runningTasks[task.taskId] = task.taskId; } try { @@ -324,24 +343,36 @@ export class Worker extends EventEmitter { console.warn(this.tasksName, error); } } finally { - if (this.workerConfig.trackingRunningTasks) { - delete this.runningTasks[task.taskId]; - } + delete this.runningTasks[task.taskId]; } }; private poll = async () => { // https://github.com/nodejs/node/issues/6673 while (this.isSubscribed) { - try { - const tasks = await this.consume(); - if (tasks.length > 0) { - await Promise.all(tasks.map(this.processTask)); + const tasks = await this.consume(); + if (tasks.length > 0) { + try { + if (this.workerConfig.batchTimeoutMs > 0) { + await timeout( + Promise.all(tasks.map(this.processTask)), + this.workerConfig.batchTimeoutMs, + ); + } else { + await Promise.all(tasks.map(this.processTask)); + } + } catch (error) { + if (error instanceof TimeoutError) { + console.log( + `${this.tasksName}: batch timeout`, + R.keys(this.runningTasks), + ); + } else { + console.log(this.tasksName, 'process error', error); + } + } finally { this.commit(); } - } catch (err) { - // In case of consume error - console.log(this.tasksName, err); } }