Skip to content

Commit 979f599

Browse files
committed
added queue support for insert update and delete operation
1 parent 88c2df8 commit 979f599

File tree

7 files changed

+143
-60
lines changed

7 files changed

+143
-60
lines changed

README.md

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44

55
```
66
docker-compose up elasticsearch kibana
7-
docker-compose up mysql
8-
docker-compose up app
7+
docker-compose up mysql redis
8+
docker-compose up app listener
99
```
1010

1111
## Database Structure and Initialization
@@ -21,6 +21,12 @@ Port : 33060
2121

2222
And import `data/sample_data.sql` file for our sample data.
2323

24+
Redis configuration :
25+
26+
```
27+
Host: redis
28+
```
29+
2430

2531
## Elastic Integration
2632

docker-compose.yml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,3 +59,17 @@ services:
5959
- elasticsearch
6060
- mysql
6161
- redis
62+
63+
listener:
64+
image: "node:8"
65+
user: "node"
66+
working_dir: /home/node/app
67+
environment:
68+
- NODE_ENV=production
69+
volumes:
70+
- ./:/home/node/app
71+
command: "npm run listen"
72+
links:
73+
- elasticsearch
74+
- mysql
75+
- redis

libraries/redis-client.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@ var redis = require('redis');
22
client = redis.createClient({'host': 'redis'});
33

44
var RedisClient = {
5+
getClient: function() {
6+
return client;
7+
},
8+
59
push: function(key, value, callback) {
610
client.lpush(key, value, function(err, result) {
711
callback(err, result);

package-lock.json

Lines changed: 24 additions & 11 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
"version": "0.0.0",
44
"private": true,
55
"scripts": {
6-
"start": "nodemon -e js,twig,css ./bin/www"
6+
"start": "nodemon -e js,twig,css ./bin/www",
7+
"listen": "nodemon -e queue-listener.js ./queue-listener.js"
78
},
89
"dependencies": {
910
"async": "^2.5.0",
@@ -16,11 +17,12 @@
1617
"express": "~4.15.5",
1718
"less-middleware": "~2.2.1",
1819
"morgan": "~1.9.0",
19-
"mysql": "^2.14.1",
20+
"mysql": "^2.15.0",
2021
"node-datetime": "^2.0.3",
2122
"nodemon": "^1.12.1",
2223
"redis": "^2.8.0",
2324
"serve-favicon": "~2.4.5",
25+
"simple-redis-queue": "git+ssh://[email protected]/hkulekci/node-simple-redis-queue.git",
2426
"twig": "~0.10.3"
2527
}
2628
}

queue-listener.js

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
#!/usr/bin/env node
2+
3+
require('dotenv').config();
4+
var productService = require('./services/productService');
5+
var categoryService = require('./services/categoryService');
6+
var productSearchService = require('./services/productSearchService');
7+
var redisClient = require('./libraries/redis-client');
8+
var RedisQueue = require("simple-redis-queue");
9+
var waterfall = require('async/waterfall');
10+
11+
var pop_queue = new RedisQueue(redisClient.getClient());
12+
13+
var upsertOperation = function(productId, functionCallback) {
14+
waterfall([
15+
// Insert Operation
16+
function(waterfallCallback) { // Getting Product Category From DB (Preparing for Elasticsearch)
17+
var result = {};
18+
categoryService.getProductCategories(productId, function(err, productCategories) {
19+
if (err) {}
20+
result.productCategories = productCategories;
21+
waterfallCallback(false, result);
22+
});
23+
},
24+
function(result, waterfallCallback) { // Getting Product Data From MySQL DB (Preparing for Elasticsearch)
25+
productService.getRecord(productId, function(err, product) {
26+
if (err) {}
27+
result.product = product;
28+
waterfallCallback(false, result);
29+
});
30+
},
31+
function(result, waterfallCallback) { // Saving Product to Elasticsearch
32+
var product = result.product;
33+
product.categories = result.productCategories;
34+
35+
productSearchService.insert(product, function() {
36+
waterfallCallback(false, product);
37+
});
38+
},
39+
function(err, result) {
40+
functionCallback(err, result);
41+
}
42+
]);
43+
};
44+
45+
var deleteOperation = function(productId, functionCallback) {
46+
// Delete Operation
47+
productSearchService.delete(productId, function(err, result) {
48+
functionCallback(false);
49+
});
50+
};
51+
52+
53+
/***** Redis Message Queue Listener ******/
54+
pop_queue.on("message", function (queueName, payload) {
55+
var messageData = JSON.parse(payload);
56+
if (messageData.action == 'update' || messageData.action == 'insert') {
57+
upsertOperation(messageData.productId, function() {
58+
console.log('[' + queueName + '] - Processed! - ' + payload);
59+
pop_queue.next("product_updates");
60+
});
61+
} else if (messageData.action == 'delete') {
62+
deleteOperation(messageData.productId, function() {
63+
console.log('[' + queueName + '] - Processed! - ' + payload);
64+
pop_queue.next("product_updates");
65+
});
66+
} else {
67+
console.log('[' + queueName + '] - Not Processed! - ' + payload);
68+
pop_queue.next("product_updates");
69+
}
70+
});
71+
72+
// Listen for errors
73+
pop_queue.on("error", function (error) {
74+
console.log("pop_queue Error : " + error);
75+
});
76+
77+
pop_queue.next("product_updates");

routes/products.js

Lines changed: 12 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ var waterfall = require('async/waterfall');
66
var productSearchService = require('../services/productSearchService');
77
var redisClient = require('../libraries/redis-client');
88
var md5 = require('blueimp-md5');
9+
var RedisQueue = require("simple-redis-queue");
10+
11+
var push_queue = new RedisQueue(redisClient.getClient());
912

1013
/* GET products listing. */
1114
router.get('/', function(req, res, next) {
@@ -80,32 +83,15 @@ router.post('/new', function(req, res, next) {
8083
})
8184
});
8285
},
83-
function(result, waterfallCallback) { // Getting Product Category From DB (Preparing for Elasticsearch)
84-
categoryService.getProductCategories(result.insertId, function(err, productCategories) {
85-
if (err) {}
86-
result.productCategories = productCategories;
87-
waterfallCallback(false, result);
88-
});
89-
},
90-
function(result, waterfallCallback) { // Getting Product Data From MySQL DB (Preparing for Elasticsearch)
91-
productService.getRecord(result.insertId, function(err, product) {
92-
if (err) {}
93-
result.product = product;
86+
function(result, waterfallCallback) {
87+
push_queue.push('product_updates', {'action':'insert', 'productId': result.insertId}, function(err, res) {
9488
waterfallCallback(false, result);
9589
});
96-
},
97-
function(result, waterfallCallback) { // Saving Product to Elasticsearch
98-
var product = result.product;
99-
product.categories = result.productCategories;
100-
101-
productSearchService.insert(product, function() {
102-
waterfallCallback(false, product);
103-
});
10490
}
10591
],
106-
function(err, product) {
92+
function(err, result) {
10793
if (err) {}
108-
res.redirect('/product/id/'+product.id);
94+
res.redirect('/product/id/'+result.insertId);
10995
}
11096
);
11197
});
@@ -122,8 +108,7 @@ router.get('/:id/delete', function(req, res, next) {
122108
});
123109
},
124110
function(waterfallCallback) {
125-
productSearchService.delete(params.id, function(err, result) {
126-
//TODO: check error status
111+
push_queue.push('product_updates', {'action':'delete', 'productId': params.id}, function(err, res) {
127112
waterfallCallback(false);
128113
});
129114
}
@@ -210,33 +195,15 @@ router.post('/:id/edit', function(req, res, next) {
210195
})
211196
});
212197
},
213-
function(waterfallCallback) { // Getting Product Categories Data from MySQL (Preparing for Elasticsearch)
214-
var result = {};
215-
categoryService.getProductCategories(params.id, function(err, productCategories) {
216-
if (err) {}
217-
result.productCategories = productCategories;
218-
waterfallCallback(false, result);
219-
});
220-
},
221-
function(result, waterfallCallback) { // Getting Product Data from MySQL (Preparing for Elasticsearch)
222-
productService.getRecord(params.id, function(err, product) {
223-
if (err) {}
224-
result.product = product;
225-
waterfallCallback(false, result);
226-
});
227-
},
228-
function(result, waterfallCallback) { // Saving Data to Elasticsearch
229-
var product = result.product;
230-
product.categories = result.productCategories;
231-
232-
productSearchService.insert(product, function() {
233-
waterfallCallback(false, product);
198+
function(waterfallCallback) {
199+
push_queue.push('product_updates', {'action':'update', 'productId': queryParams.id}, function(err, res) {
200+
waterfallCallback(false);
234201
});
235202
}
236203
],
237204
function(err, product) {
238205
if (err) {}
239-
res.redirect('/product/id/' + product.id);
206+
res.redirect('/product/id/' + queryParams.id);
240207
return;
241208
}
242209
);

0 commit comments

Comments
 (0)