From 449ead324656dbb6bcf128acef7938ffabf68b2e Mon Sep 17 00:00:00 2001 From: Andrew Harris Date: Wed, 27 Feb 2019 12:24:41 +1100 Subject: [PATCH] add filter for specific log key --- src/amqp-writable-stream.js | 6 ++++++ src/config.js | 1 + 2 files changed, 7 insertions(+) diff --git a/src/amqp-writable-stream.js b/src/amqp-writable-stream.js index 6d5f359..1d54d38 100644 --- a/src/amqp-writable-stream.js +++ b/src/amqp-writable-stream.js @@ -1,3 +1,4 @@ +const config = require('./config'); const { Writable } = require('stream'); const stringify = require('json-stringify-safe'); @@ -12,6 +13,11 @@ module.exports = class AmqpWritableStream extends Writable { channel.on('drain', () => this.emit('drain')); } write (chunk, encoding, callback) { + if (typeof chunk !== 'object') return true; // chunk should be an object, return true to keep going. + if (config.requiredLogKey) { // check to see if the required log key is in this chunk + const logs = Array.isArray(chunk.value) ? chunk.value : chunk.value.logs; + if (!logs.some(log => Object.keys(log).includes(config.requiredLogKey))) return true; + } return this.channel.sendToQueue(this.queueName, Buffer.from(stringify(chunk)), callback); } }; diff --git a/src/config.js b/src/config.js index 09c1c3a..8838a2b 100644 --- a/src/config.js +++ b/src/config.js @@ -12,6 +12,7 @@ module.exports = { db: parseConnection(process.env.DATABASE_URL), tableName: process.env.DATABASE_TABLE_NAME, logOffset: parseInt(process.env.LOG_OFFSET) || 0, + requiredLogKey: process.env.REQUIRED_LOG_KEY, amqp: { url: process.env.AMQP_URL, queueName: process.env.AMQP_SEARCH_QUEUE