Skip to content

Commit

Permalink
Fixed merge conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
vamsee committed Nov 14, 2019
2 parents 6429d4d + 9ba7e09 commit 229c8c6
Show file tree
Hide file tree
Showing 25 changed files with 2,409 additions and 114 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM ${TRUSTED_REGISTRY}/library/alpine-node:8.11.4
FROM ${REGISTRY}/alpine-node:8.11.4

RUN mkdir -p /home/src

Expand Down
3 changes: 0 additions & 3 deletions bin/component-config.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,2 @@
{
"loopback-component-explorer": {
"mountPath": "/explorer"
}
}
27 changes: 20 additions & 7 deletions common/mixins/maker-checker-mixin-v2.js
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,13 @@ function addOERemoteMethods(Model) {
source: 'path'
},
description: 'Model version'
},
{
}, {
arg: 'req',
type: 'object',
http: {
source: 'req'
}
}, {
arg: 'options',
type: 'object',
http: 'optionsFromRequest'
Expand Down Expand Up @@ -317,7 +322,7 @@ function addOERemoteMethods(Model) {
}
});

Model.deleteX = function deleteX(id, version, options, next) {
Model.deleteX = function deleteX(id, version, req, options, next) {
if (!id) {
let err = new Error('please provide id');
// log.error(options, err);
Expand All @@ -333,6 +338,7 @@ function addOERemoteMethods(Model) {
var app = Model.app;
var modelName = Model.definition.name;
var ChangeWorkflowRequest = app.models.ChangeWorkflowRequest;
var correlationId = req && req.headers && req.headers['correlation-id'];

Model.findById(id, options, function fetchInstance(err, sinst) {
/* istanbul ignore if*/
Expand Down Expand Up @@ -395,7 +401,8 @@ function addOERemoteMethods(Model) {
verificationStatus: 'pending',
_modifiers: [
options.ctx.username
]
],
correlationId: correlationId
};

var WorkflowMapping = loopback.getModel('WorkflowMapping', options);
Expand Down Expand Up @@ -438,6 +445,7 @@ function addOERemoteMethods(Model) {
workflowBody.processVariables._modelId = id;
// this is to identify while executing Finalize Transaction to follow which implementation
workflowBody.processVariables._maker_checker_impl = 'v2';
workflowBody.correlationId = correlationId;
WorkflowInstance.create(workflowBody, options, function triggerWorkflow(err, winst) {
if (err) {
return handleError(err, options, next);
Expand Down Expand Up @@ -476,7 +484,7 @@ function addOERemoteMethods(Model) {
var ChangeWorkflowRequest = app.models.ChangeWorkflowRequest;
var inputPV = data.pv;
delete data.pv;

var correlationId = inputPV && inputPV.correlationId;
Model.findById(id, options, function fetchInstance(err, cinst) {
/* istanbul ignore if*/
if (err) {
Expand Down Expand Up @@ -555,7 +563,8 @@ function addOERemoteMethods(Model) {
verificationStatus: data.__verificationStatus__,
_modifiers: [
options.ctx.username
]
],
correlationId: correlationId
};

var WorkflowMapping = loopback.getModel('WorkflowMapping', options);
Expand Down Expand Up @@ -599,6 +608,7 @@ function addOERemoteMethods(Model) {
workflowBody.processVariables._modelId = id;
// this is to identify while executing Finalize Transaction to follow which implementation
workflowBody.processVariables._maker_checker_impl = 'v2';
workflowBody.correlationId = correlationId;
WorkflowInstance.create(workflowBody, options, function triggerWorkflow(err, winst) {
if (err) {
return handleError(err, options, next);
Expand Down Expand Up @@ -873,6 +883,7 @@ function addOERemoteMethods(Model) {

let inputPV = data.pv;
delete data.pv;
var correlationId = inputPV && inputPV.correlationId;

var idName = Model.definition.idName();
var id = data[idName] || 'this_id_wont_exist';
Expand Down Expand Up @@ -935,7 +946,8 @@ function addOERemoteMethods(Model) {
verificationStatus: data.__verificationStatus__,
_modifiers: [
options.ctx.username
]
],
correlationId: correlationId
};
log.debug(options, 'Instance has been validated during maker checker creation');

Expand Down Expand Up @@ -980,6 +992,7 @@ function addOERemoteMethods(Model) {
workflowBody.processVariables._modelId = id;
// this is to identify while executing Finalize Transaction to follow which implementation
workflowBody.processVariables._maker_checker_impl = 'v2';
workflowBody.correlationId = correlationId;
WorkflowInstance.create(workflowBody, options, function triggerWorkflow(err, winst) {
if (err) {
return handleError(err, options, next);
Expand Down
4 changes: 4 additions & 0 deletions common/models/change-workflow-request.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@
"_modifiers": {
"type": ["string"] ,
"required": false
},
"correlationId": {
"type": "string",
"index": true
}
},
"validations": [],
Expand Down
1 change: 1 addition & 0 deletions common/models/process-instance.js
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ module.exports = function ProcessInstance(ProcessInstance) {

if (obj.isParallelGateway) {
meta = {
from: currentFlowObjectName,
type: 'ParallelGateway',
gwId: obj.bpmnId
};
Expand Down
85 changes: 43 additions & 42 deletions common/models/process-instance.json
Original file line number Diff line number Diff line change
@@ -1,40 +1,40 @@
{
"name": "ProcessInstance",
"base": "WorkflowBaseEntity",
"description" : "Stores Process level instance data of workflows in oe-workflow engine",
"description": "Stores Process level instance data of workflows in oe-workflow engine",
"idInjection": true,
"cacheable" : false,
"mixins":{
"cacheable": false,
"mixins": {
"VersionMixin": true,
"HistoryMixin": true
},
"options": {
"validateUpsert": true
},
"properties": {
"processDefinitionName" : {
"type" : "string",
"required" : true
"processDefinitionName": {
"type": "string",
"required": true
},
"processDefinitionBpmnId" : {
"type" : "string",
"required" : false
"processDefinitionBpmnId": {
"type": "string",
"required": false
},
"_processTokens" : {
"type" : "object",
"required" : false
"_processTokens": {
"type": "object",
"required": false
},
"parentToken" : {
"type" : "object",
"required" : false
"parentToken": {
"type": "object",
"required": false
},
"processVariables": {
"type": "object",
"required": false
},
"_processVariables" : {
"type" : "object",
"required" : false
"_processVariables": {
"type": "object",
"required": false
},
"_parentProcessVariables": {
"type": "object",
Expand All @@ -44,36 +44,37 @@
"type": "object",
"required": false
},
"_processTimerEvents" :{
"type" : "object",
"required" : false
"_processTimerEvents": {
"type": "object",
"required": false
},
"_synchronizeFlow" :{
"type" : "object",
"required" : false
"_synchronizeFlow": {
"type": "object",
"required": false
},
"_poolInfo" : {
"type" : "string",
"required" : false
"_poolInfo": {
"type": "string",
"required": false
},
"_workflowCtx" : {
"type" : "object",
"hidden" : true,
"required" : false
"_workflowCtx": {
"type": "object",
"hidden": true,
"required": false
},
"passiveWait" : {
"type" : "boolean",
"default" : false,
"required" : false
"passiveWait": {
"type": "boolean",
"default": false,
"required": false
},
"_status" : {
"type" : "string",
"required" : false
"_status": {
"type": "string",
"required": false
},
"correlationId":{
"type":"string"
"correlationId": {
"type": "string",
"index": true
}
},
},
"validations": [],
"relations": {
"processDefinition": {
Expand Down Expand Up @@ -137,4 +138,4 @@
}
],
"methods": {}
}
}
4 changes: 4 additions & 0 deletions common/models/task.json
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@
"comments" : {
"type" : "string",
"required" : false
},
"correlationId": {
"type": "string",
"index": true
}
},
"validations": [],
Expand Down
17 changes: 14 additions & 3 deletions common/models/workflow-instance.js
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ module.exports = function WorkflowInstance(WorkflowInstance) {
'processDefinitionName': name,
'processVariables': processVariables,
'message': self.message,
'workflowInstanceId': self.id
'workflowInstanceId': self.id,
'correlationId': self.correlationId
};

var processDefinitionName = postData.processDefinitionName;
Expand Down Expand Up @@ -192,7 +193,8 @@ module.exports = function WorkflowInstance(WorkflowInstance) {
'processDefinitionName': name,
'processVariables': processVariables,
'message': self.message,
'workflowInstanceId': self.id
'workflowInstanceId': self.id,
'correlationId': self.correlationId
};

var processDefinitionName = postData.processDefinitionName;
Expand Down Expand Up @@ -317,13 +319,22 @@ module.exports = function WorkflowInstance(WorkflowInstance) {
return handleError(err, options, callback);
}

/** Terminate only top-level processes and send terminate signal to them
* Each process' terminate takes care of terminating corresponding
* sub-processes and tasks.
* If we select all associated processes and terminate all of them them, two things happen
* 1. We may violate termination order and try killing a sub-sub-process
* 2. Also, when a processes tries killing their subprocesses, that may already be killed.
* raising "Error: trying to make invalid state change".
*/
workflowInstance.processes({}, options, function fetchProcesses(err, Processes) {
/* istanbul ignore if*/
if (err) {
return handleError(err, options, callback);
}
for (var i in Processes) {
if (Object.prototype.hasOwnProperty.call(Processes, i)) {
/** Select only records where parentProcessInstanceId is empty */
if (Object.prototype.hasOwnProperty.call(Processes, i) && !Processes[i].parentProcessInstanceId) {
ProcessInstance.emit('PROCESS_TERMINATE', options, ProcessInstance, Processes[i]);
}
}
Expand Down
3 changes: 2 additions & 1 deletion common/models/workflow-instance.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
"type": "string"
},
"correlationId":{
"type":"string"
"type":"string",
"index":true
}
},
"validations": [],
Expand Down
12 changes: 6 additions & 6 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ services:
networks:
- ${NETWORK_NAME}
- router_network
logging:
driver: gelf
options:
gelf-address: "udp://0.0.0.0:12201"
# logging:
# driver: gelf
# options:
# gelf-address: "udp://0.0.0.0:12201"

mongo:
image: ${TRUSTED_REGISTRY}/library/alpine-mongo:latest
image: ${REGISTRY}/alpine-mongo:latest
networks:
- ${NETWORK_NAME}

Expand Down
29 changes: 27 additions & 2 deletions lib/workflow-eventHandlers/tokeneventhandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ exports._tokenArrivedEventHandler = function _tokenArrivedEventHandler(options,
var taskObj = {
name: currentFlowObject.name,
processTokenId: token.id,
message: message
message: message,
correlationId: currentProcess.correlationId
};

var inputParameters;
Expand Down Expand Up @@ -246,7 +247,7 @@ exports._tokenArrivedEventHandler = function _tokenArrivedEventHandler(options,
let workflowAddons = ProcessInstance.app.workflowAddons || {};
if (currentFlowObject.creationHook) {
if (workflowAddons[currentFlowObject.creationHook]) {
preCreateFunction = workflowAddons[currentFlowObject.creationHook];
preCreateFunction = workflowAddons[currentFlowObject.creationHook];
} else {
log.error('Pre Complete function ' + currentFlowObject.creationHook + ' not defined');
}
Expand Down Expand Up @@ -563,6 +564,30 @@ exports._tokenArrivedEventHandler = function _tokenArrivedEventHandler(options,
return f1 && f2;
});
if (emittable) {
/**
* All incoming branches have completed and we can resume
* If only one branch is incoming,
* we pass the message as it is
* If more than one branch is converging then
* we create a map of messages
* where keys are name of message emitting node in each branch
*/
if (currentProcess._synchronizeFlow[gwId] && Object.keys(currentProcess._synchronizeFlow[gwId]).length > 1) {
/* More than one branch converging, create a merged message for passing to next node */
message = Object.values(currentProcess._synchronizeFlow[gwId]).map(function extractMessages(gwayTokenId) {
let gwayToken = currentProcess._processTokens[gwayTokenId];
return {
name: gwayToken.meta.from,
value: gwayToken.message
};
}).reduce(function mergeMessages(map, obj) {
if (typeof (obj.value) !== 'undefined') {
map[obj.name] = obj.value;
}
return map;
}, {});
}

currentProcess._endFlowObject(options, token, processDefinitionInstance, delta, message);
}
} else {
Expand Down
Loading

0 comments on commit 229c8c6

Please sign in to comment.