Skip to content

Commit da6dca5

Browse files
BB-168: separate transition processor
1 parent 7eeed6a commit da6dca5

File tree

9 files changed

+202
-30
lines changed

9 files changed

+202
-30
lines changed

extensions/lifecycle/LifecycleConfigValidator.js

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,13 @@ const joiSchema = joi.object({
4646
concurrency: joi.number().greater(0).default(10),
4747
probeServer: probeServerJoi.default(),
4848
},
49+
transitionProcessor: {
50+
auth: inheritedAuthJoi,
51+
groupId: joi.string().required(),
52+
retry: retryParamsJoi,
53+
concurrency: joi.number().greater(0).default(10),
54+
probeServer: probeServerJoi.default(),
55+
},
4956
});
5057

5158
function configValidator(backbeatConfig, extConfig) {
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
'LifecycleObjectExpirationProcessor'; // eslint-disable-line
2+
3+
const Logger = require('werelogs').Logger;
4+
5+
const LifecycleObjectProcessor = require('./LifecycleObjectProcessor');
6+
const LifecycleDeleteObjectTask =
7+
require('../tasks/LifecycleDeleteObjectTask');
8+
9+
class LifecycleObjectExpirationProcessor extends LifecycleObjectProcessor {
10+
11+
/**
12+
* Constructor of LifecycleObjectProcessor
13+
*
14+
* @constructor
15+
* @param {Object} zkConfig - zookeeper configuration object
16+
* @param {String} zkConfig.connectionString - zookeeper connection string
17+
* as "host:port[/chroot]"
18+
* @param {Object} kafkaConfig - kafka configuration object
19+
* @param {string} kafkaConfig.hosts - list of kafka brokers
20+
* as "host:port[,host:port...]"
21+
* @param {Object} [kafkaConfig.backlogMetrics] - param object to
22+
* publish kafka topic metrics to zookeeper (see {@link
23+
* BackbeatConsumer} constructor)
24+
* @param {Object} lcConfig - lifecycle configuration object
25+
* @param {String} lcConfig.auth - authentication info
26+
* @param {String} lcConfig.objectTasksTopic - lifecycle object topic name
27+
* @param {Object} lcConfig.objectProcessor - kafka consumer object
28+
* @param {String} lcConfig.objectProcessor.groupId - kafka
29+
* consumer group id
30+
* @param {Number} [lcConfig.objectProcessor.concurrency] - number
31+
* of max allowed concurrent operations
32+
* @param {Object} s3Config - S3 configuration
33+
* @param {Object} s3Config.host - s3 endpoint host
34+
* @param {Number} s3Config.port - s3 endpoint port
35+
* @param {String} [transport="http"] - transport method ("http"
36+
* or "https")
37+
*/
38+
constructor(zkConfig, kafkaConfig, lcConfig, s3Config, transport = 'http') {
39+
super(zkConfig, kafkaConfig, lcConfig, s3Config, transport);
40+
this._log = new Logger('Backbeat:Lifecycle:ObjectExpirationProcessor');
41+
}
42+
43+
getProcessConfig(lcConfig) {
44+
return lcConfig.objectProcessor;
45+
}
46+
47+
getAuthConfig(lcConfig) {
48+
if (lcConfig.objectProcessor.auth) {
49+
return lcConfig.objectProcessor.auth;
50+
}
51+
52+
return lcConfig.auth;
53+
}
54+
55+
getTask(actionEntry) {
56+
const actionType = actionEntry.getActionType();
57+
58+
if (actionType !== 'deleteObject' && actionType !== 'deleteMPU') {
59+
this._log.warn(`skipped unsupported action ${actionType}`,
60+
actionEntry.getLogInfo());
61+
return null;
62+
}
63+
64+
return new LifecycleDeleteObjectTask(this);
65+
}
66+
}
67+
68+
module.exports = LifecycleObjectExpirationProcessor;

extensions/lifecycle/objectProcessor/LifecycleObjectProcessor.js

Lines changed: 35 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,6 @@ const https = require('https');
66
const { EventEmitter } = require('events');
77
const Logger = require('werelogs').Logger;
88

9-
const LifecycleDeleteObjectTask =
10-
require('../tasks/LifecycleDeleteObjectTask');
11-
const LifecycleUpdateTransitionTask =
12-
require('../tasks/LifecycleUpdateTransitionTask');
139
const BackbeatConsumer = require('../../../lib/BackbeatConsumer');
1410
const BackbeatMetadataProxy = require('../../../lib/BackbeatMetadataProxy');
1511
const ActionQueueEntry = require('../../../lib/models/ActionQueueEntry');
@@ -48,10 +44,7 @@ class LifecycleObjectProcessor extends EventEmitter {
4844
* @param {Object} lcConfig - lifecycle configuration object
4945
* @param {String} lcConfig.auth - authentication info
5046
* @param {String} lcConfig.objectTasksTopic - lifecycle object topic name
51-
* @param {Object} lcConfig.objectProcessor - kafka consumer object
52-
* @param {String} lcConfig.objectProcessor.groupId - kafka
5347
* consumer group id
54-
* @param {Number} [lcConfig.objectProcessor.concurrency] - number
5548
* of max allowed concurrent operations
5649
* @param {Object} s3Config - S3 configuration
5750
* @param {Object} s3Config.host - s3 endpoint host
@@ -65,7 +58,8 @@ class LifecycleObjectProcessor extends EventEmitter {
6558
this._zkConfig = zkConfig;
6659
this._kafkaConfig = kafkaConfig;
6760
this._lcConfig = lcConfig;
68-
this._authConfig = lcConfig.objectProcessor.auth || lcConfig.auth;
61+
this._processConfig = this.getProcessConfig(this._lcConfig);
62+
this._authConfig = this.getAuthConfig(this._lcConfig);
6963
this._s3Config = s3Config;
7064
this._transport = transport;
7165
this._consumer = null;
@@ -99,8 +93,8 @@ class LifecycleObjectProcessor extends EventEmitter {
9993
backlogMetrics: this._kafkaConfig.backlogMetrics,
10094
},
10195
topic: this._lcConfig.objectTasksTopic,
102-
groupId: this._lcConfig.objectProcessor.groupId,
103-
concurrency: this._lcConfig.objectProcessor.concurrency,
96+
groupId: this._processConfig.groupId,
97+
concurrency: this._processConfig.concurrency,
10498
queueProcessor: this.processKafkaEntry.bind(this),
10599
});
106100
this._consumer.on('error', err => {
@@ -261,6 +255,32 @@ class LifecycleObjectProcessor extends EventEmitter {
261255
this._consumer.close(cb);
262256
}
263257

258+
/**
259+
* Retrieve object processor config
260+
* @return {object} - process config
261+
*/
262+
getProcessConfig() {
263+
throw new Error('LifecycleObjectProcessor.getProcessConfig not implemented');
264+
}
265+
266+
/**
267+
* Retrieve process auth config
268+
* @return {object} - auth config
269+
*/
270+
getAuthConfig() {
271+
throw new Error('LifecycleObjectProcessor.getAuthConfig not implemented');
272+
}
273+
274+
/**
275+
* Retrieve object processor task action
276+
* @param {ActionQueueEntry} actionEntry - lifecycle action entry
277+
* @return {BackbeatTask|null} - backbeat task object
278+
*/
279+
// eslint-disable-next-line
280+
getTask(actionEntry) {
281+
return null;
282+
}
283+
264284
/**
265285
* Proceed to the lifecycle action of an object given a kafka
266286
* object lifecycle queue entry
@@ -279,20 +299,12 @@ class LifecycleObjectProcessor extends EventEmitter {
279299
}
280300
this._log.debug('processing lifecycle object entry',
281301
actionEntry.getLogInfo());
282-
const actionType = actionEntry.getActionType();
283-
let task;
284-
if (actionType === 'deleteObject' ||
285-
actionType === 'deleteMPU') {
286-
task = new LifecycleDeleteObjectTask(this);
287-
} else if (actionType === 'copyLocation' &&
288-
actionEntry.getContextAttribute('ruleType')
289-
=== 'transition') {
290-
task = new LifecycleUpdateTransitionTask(this);
291-
} else {
292-
this._log.warn(`skipped unsupported action ${actionType}`,
293-
actionEntry.getLogInfo());
302+
const task = this.getTask(actionEntry);
303+
304+
if (task === null) {
294305
return process.nextTick(done);
295306
}
307+
296308
return this.retryWrapper.retry({
297309
actionDesc: 'process lifecycle object entry',
298310
logFields: actionEntry.getLogInfo(),
@@ -306,6 +318,7 @@ class LifecycleObjectProcessor extends EventEmitter {
306318
return {
307319
s3Config: this._s3Config,
308320
lcConfig: this._lcConfig,
321+
processConfig: this._processConfig,
309322
authConfig: this._authConfig,
310323
getS3Client: this._getS3Client.bind(this),
311324
getBackbeatClient: this._getBackbeatClient.bind(this),
@@ -317,7 +330,6 @@ class LifecycleObjectProcessor extends EventEmitter {
317330
isReady() {
318331
return this._consumer && this._consumer.isReady();
319332
}
320-
321333
}
322334

323335
module.exports = LifecycleObjectProcessor;
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
'use strict'; // eslint-disable-line
2+
3+
const Logger = require('werelogs').Logger;
4+
5+
const LifecycleObjectProcessor = require('./LifecycleObjectProcessor');
6+
const LifecycleUpdateTransitionTask =
7+
require('../tasks/LifecycleUpdateTransitionTask');
8+
9+
class LifecycleObjectTransitionProcessor extends LifecycleObjectProcessor {
10+
11+
/**
12+
* Constructor of LifecycleObjectProcessor
13+
*
14+
* @constructor
15+
* @param {Object} zkConfig - zookeeper configuration object
16+
* @param {String} zkConfig.connectionString - zookeeper connection string
17+
* as "host:port[/chroot]"
18+
* @param {Object} kafkaConfig - kafka configuration object
19+
* @param {string} kafkaConfig.hosts - list of kafka brokers
20+
* as "host:port[,host:port...]"
21+
* @param {Object} [kafkaConfig.backlogMetrics] - param object to
22+
* publish kafka topic metrics to zookeeper (see {@link
23+
* BackbeatConsumer} constructor)
24+
* @param {Object} lcConfig - lifecycle configuration object
25+
* @param {String} lcConfig.auth - authentication info
26+
* @param {String} lcConfig.objectTasksTopic - lifecycle object topic name
27+
* @param {Object} lcConfig.transitionProcessor - kafka consumer object
28+
* @param {String} lcConfig.transitionProcessor.groupId - kafka
29+
* consumer group id
30+
* @param {Number} [lcConfig.transitionProcessor.concurrency] - number
31+
* of max allowed concurrent operations
32+
* @param {Object} s3Config - S3 configuration
33+
* @param {Object} s3Config.host - s3 endpoint host
34+
* @param {Number} s3Config.port - s3 endpoint port
35+
* @param {String} [transport="http"] - transport method ("http"
36+
* or "https")
37+
*/
38+
constructor(zkConfig, kafkaConfig, lcConfig, s3Config, transport = 'http') {
39+
super(zkConfig, kafkaConfig, lcConfig, s3Config, transport);
40+
this._log = new Logger('Backbeat:Lifecycle:ObjectTransitionProcessor');
41+
}
42+
43+
getProcessConfig(lcConfig) {
44+
return lcConfig.transitionProcessor;
45+
}
46+
47+
getAuthConfig(lcConfig) {
48+
if (lcConfig.transitionProcessor.auth) {
49+
return lcConfig.transitionProcessor.auth;
50+
}
51+
52+
return lcConfig.auth;
53+
}
54+
55+
getTask(actionEntry) {
56+
const actionType = actionEntry.getActionType();
57+
58+
if (actionType !== 'copyLocation' ||
59+
actionEntry.getContextAttribute('ruleType') !== 'transition') {
60+
this._log.warn(`skipped unsupported action ${actionType}`,
61+
actionEntry.getLogInfo());
62+
return null;
63+
}
64+
65+
return new LifecycleUpdateTransitionTask(this);
66+
}
67+
}
68+
69+
module.exports = LifecycleObjectTransitionProcessor;

extensions/lifecycle/objectProcessor/task.js

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ const { sendSuccess, sendError } = require('arsenal').network.probe.Utils;
1212
const { ZenkoMetrics } = require('arsenal').metrics;
1313

1414
const { initManagement } = require('../../../lib/management/index');
15-
const LifecycleObjectProcessor = require('./LifecycleObjectProcessor');
15+
const LifecycleObjectExpirationProcessor = require('./LifecycleObjectExpirationProcessor');
16+
const LifecycleObjectTransitionProcessor = require('./LifecycleObjectTransitionProcessor');
1617
const { startProbeServer } = require('../../../lib/util/probe');
1718
const config = require('../../../lib/Config');
1819

@@ -24,8 +25,19 @@ const transport = config.transport;
2425

2526
const logger = new werelogs.Logger('Backbeat:Lifecycle:Consumer');
2627

27-
const objectProcessor = new LifecycleObjectProcessor(
28-
zkConfig, kafkaConfig, lcConfig, s3Config, transport);
28+
let objectProcessor;
29+
30+
switch (process.env.LIFECYCLE_OBJECT_PROCESSOR_TYPE) {
31+
case 'transition':
32+
objectProcessor = new LifecycleObjectTransitionProcessor(
33+
zkConfig, kafkaConfig, lcConfig, s3Config, transport);
34+
break;
35+
case 'expiration': // fallthrough
36+
default:
37+
objectProcessor = new LifecycleObjectExpirationProcessor(
38+
zkConfig, kafkaConfig, lcConfig, s3Config, transport);
39+
break;
40+
}
2941

3042
werelogs.configure({ level: config.log.logLevel,
3143
dump: config.log.dumpLevel });

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
"lifecycle_conductor": "node extensions/lifecycle/conductor/service.js",
1212
"lifecycle_bucket_processor": "node extensions/lifecycle/bucketProcessor/task.js",
1313
"lifecycle_object_processor": "node extensions/lifecycle/objectProcessor/task.js",
14+
"lifecycle_object_transition_processor": "LIFECYCLE_OBJECT_PROCESSOR_TYPE=transition node extensions/lifecycle/objectProcessor/task.js",
1415
"mongo_queue_processor": "node extensions/mongoProcessor/mongoProcessorTask.js",
1516
"garbage_collector": "node extensions/gc/service.js",
1617
"test": "mocha --recursive tests/unit",

tests/functional/lifecycle/LifecycleConductor.spec.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,7 @@ describe('lifecycle conductor', function lifecycleConductor() {
315315
};
316316
}
317317

318-
describe(description, () => {
318+
return describe(description, () => {
319319
beforeEach(done => {
320320
bucketdListing = [];
321321

tests/functional/lifecycle/LifecycleObjectProcessor.spec.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22

33
const werelogs = require('werelogs');
44

5-
const LifecycleObjectProcessor = require(
6-
'../../../extensions/lifecycle/objectProcessor/LifecycleObjectProcessor');
5+
const LifecycleObjectExpirationProcessor = require(
6+
'../../../extensions/lifecycle/objectProcessor/LifecycleObjectExpirationProcessor');
77

88
const { S3ClientMock } = require('../../utils/S3ClientMock');
99

@@ -23,7 +23,7 @@ describe('Lifecycle Object Processor', function lifecycleObjectProcessor() {
2323

2424
function generateRetryTest(failures, message) {
2525
return function testRetries(done) {
26-
const lop = new LifecycleObjectProcessor(
26+
const lop = new LifecycleObjectExpirationProcessor(
2727
zkConfig, kafkaConfig, lcConfig, s3Config);
2828

2929
const s3Client = new S3ClientMock(failures);

tests/functional/lifecycle/configObjects.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ const lcConfig = {
2424
objectProcessor: {
2525
groupId: `object-processor-test-${Math.random()}`,
2626
},
27+
transitionProcessor: {
28+
groupId: `transition-processor-test-${Math.random()}`,
29+
},
2730
bucketTasksTopic,
2831
objectTasksTopic,
2932
rules: {

0 commit comments

Comments
 (0)