From 81bfa3b2cdf417a84c4a1761e5a18f5aaf3f4154 Mon Sep 17 00:00:00 2001 From: Trotter Cashion Date: Mon, 18 Jun 2012 18:07:19 -0700 Subject: [PATCH 01/17] Add test showing how we want authentication to look --- test/authenticated-collector-test.js | 35 ++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 test/authenticated-collector-test.js diff --git a/test/authenticated-collector-test.js b/test/authenticated-collector-test.js new file mode 100644 index 00000000..4fba5f38 --- /dev/null +++ b/test/authenticated-collector-test.js @@ -0,0 +1,35 @@ +var vows = require("vows"), + assert = require("assert"), + cube = require("../"), + test = require("./test"); + +var suite = vows.describe("collector"); + +var port = ++test.port, server = cube.server({ + "mongo-host": "localhost", + "mongo-port": 27017, + "mongo-database": "cube_test", + "http-port": port +}); + +server.register = cube.collector.register; + +server.start(); + +suite.addBatch(test.batch({ + "POST /event/put with valid credentials": { + topic: test.request({method: "POST", port: port, path: "/1.0/even/put"}, JSON.stringify([{ + type: "test", + time: new Date, + user: "goodUser", + password: "goodPassword", + data: { + foo: "bar" + } + }])), + "responds with status 200": function(response) { + assert.equal(response.statusCode, 200); + assert.deepEqual(JSON.parse(response.body), {}); + } + } +})); From b0c6518c6a8477b845647a16d52d7fea50b94179 Mon Sep 17 00:00:00 2001 From: Trotter Cashion Date: Tue, 19 Jun 2012 19:26:38 -0700 Subject: [PATCH 02/17] Properly register with server. --- bin/collector.js | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/bin/collector.js b/bin/collector.js index a2b3e80d..f0c3ac16 100644 --- a/bin/collector.js +++ b/bin/collector.js @@ -2,8 +2,6 @@ var options = require("./collector-config"), cube = require("../"), server = cube.server(options); -server.register = function(db, endpoints) { - cube.collector.register(db, endpoints); -}; +server.register = cube.collector.register; server.start(); From b44777e65f4bf1c54a045143807eb6d5d41a1551 Mon Sep 17 00:00:00 2001 From: Trotter Cashion Date: Tue, 19 Jun 2012 19:28:20 -0700 Subject: [PATCH 03/17] Properly register evaluator with server. --- bin/evaluator.js | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/bin/evaluator.js b/bin/evaluator.js index b5da8894..154d62a0 100644 --- a/bin/evaluator.js +++ b/bin/evaluator.js @@ -2,8 +2,5 @@ var options = require("./evaluator-config"), cube = require("../"), server = cube.server(options); -server.register = function(db, endpoints) { - cube.evaluator.register(db, endpoints); -}; - +server.register = cube.evaluator.register; server.start(); From 4b106554c9b9870f5584b55ef55743d9e873a676 Mon Sep 17 00:00:00 2001 From: Trotter Cashion Date: Tue, 19 Jun 2012 19:33:35 -0700 Subject: [PATCH 04/17] Add authentication function option --- lib/cube/collector.js | 22 ++++++++++++++++++---- lib/cube/server.js | 5 +++-- test/authenticated-collector-test.js | 27 +++++++++++++++++++++++++-- 3 files changed, 46 insertions(+), 8 deletions(-) diff --git a/lib/cube/collector.js b/lib/cube/collector.js index f6a1c2fb..4896312a 100644 --- a/lib/cube/collector.js +++ b/lib/cube/collector.js @@ -6,8 +6,8 @@ var headers = { "Access-Control-Allow-Origin": "*" }; -exports.register = function(db, endpoints) { - var putter = require("./event").putter(db), +exports.register = function(db, endpoints, authFun) { + var putter = authentication(require("./event").putter(db), authFun), poster = post(putter); // @@ -36,8 +36,13 @@ function post(putter) { try { JSON.parse(content).forEach(putter); } catch (e) { - response.writeHead(400, headers); - response.end(JSON.stringify({error: e.toString()})); + if (e.toString() == "AuthenticationError: Invalid Credentials") { + response.writeHead(401, headers); + response.end(JSON.stringify({error: e.toString()})); + } else { + response.writeHead(400, headers); + response.end(JSON.stringify({error: e.toString()})); + } return; } response.writeHead(200, headers); @@ -45,3 +50,12 @@ function post(putter) { }); }; } + +function authentication(putter, authFun) { + return function(data) { + if (authFun && !authFun(data)) { + throw "AuthenticationError: Invalid Credentials"; + } + putter(data); + } +} diff --git a/lib/cube/server.js b/lib/cube/server.js index 8a2ceda2..fb3d76e2 100644 --- a/lib/cube/server.js +++ b/lib/cube/server.js @@ -42,7 +42,8 @@ module.exports = function(options) { endpoints = {ws: [], http: []}, mongo = new mongodb.Server(options["mongo-host"], options["mongo-port"], server_options), db = new mongodb.Db(options["mongo-database"], mongo, db_options), - id = 0; + id = 0, + authentication = options.authentication; secondary.server = primary; @@ -154,7 +155,7 @@ module.exports = function(options) { // Start the server! function ready() { - server.register(db, endpoints); + server.register(db, endpoints, authentication); meta = require("./event").putter(db); util.log("starting http server on port " + options["http-port"]); primary.listen(options["http-port"]); diff --git a/test/authenticated-collector-test.js b/test/authenticated-collector-test.js index 4fba5f38..be8a564f 100644 --- a/test/authenticated-collector-test.js +++ b/test/authenticated-collector-test.js @@ -9,7 +9,10 @@ var port = ++test.port, server = cube.server({ "mongo-host": "localhost", "mongo-port": 27017, "mongo-database": "cube_test", - "http-port": port + "http-port": port, + "authentication": function (data) { + return data.user === "goodUser" && data.password === "goodPassword"; + } }); server.register = cube.collector.register; @@ -18,7 +21,7 @@ server.start(); suite.addBatch(test.batch({ "POST /event/put with valid credentials": { - topic: test.request({method: "POST", port: port, path: "/1.0/even/put"}, JSON.stringify([{ + topic: test.request({method: "POST", port: port, path: "/1.0/event/put"}, JSON.stringify([{ type: "test", time: new Date, user: "goodUser", @@ -33,3 +36,23 @@ suite.addBatch(test.batch({ } } })); + +suite.addBatch(test.batch({ + "POST /event/put with bad password": { + topic: test.request({method: "POST", port: port, path: "/1.0/event/put"}, JSON.stringify([{ + type: "test", + time: new Date, + user: "goodUser", + password: "badPassword", + data: { + foo: "bar" + } + }])), + "responds with status 401": function(response) { + assert.equal(response.statusCode, 401); + assert.deepEqual(JSON.parse(response.body), {error: "AuthenticationError: Invalid Credentials"}); + } + } +})); + +suite.export(module); From e34f045b4528114737f109474a48f09ca2f20b78 Mon Sep 17 00:00:00 2001 From: Trotter Cashion Date: Wed, 20 Jun 2012 16:53:37 -0700 Subject: [PATCH 05/17] Correct test suite name --- test/authenticated-collector-test.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/authenticated-collector-test.js b/test/authenticated-collector-test.js index be8a564f..81eac1bb 100644 --- a/test/authenticated-collector-test.js +++ b/test/authenticated-collector-test.js @@ -3,7 +3,7 @@ var vows = require("vows"), cube = require("../"), test = require("./test"); -var suite = vows.describe("collector"); +var suite = vows.describe("authenticated-collector"); var port = ++test.port, server = cube.server({ "mongo-host": "localhost", From 034331531e0042ef0dfcb8ee5e29ab538a90ce68 Mon Sep 17 00:00:00 2001 From: Trotter Cashion Date: Wed, 20 Jun 2012 17:23:18 -0700 Subject: [PATCH 06/17] Add authentication function to evaluator --- lib/cube/evaluator.js | 78 +++++++++++++++++++++------- test/authenticated-evaluator-test.js | 48 +++++++++++++++++ 2 files changed, 108 insertions(+), 18 deletions(-) create mode 100644 test/authenticated-evaluator-test.js diff --git a/lib/cube/evaluator.js b/lib/cube/evaluator.js index 20e911d5..480fc117 100644 --- a/lib/cube/evaluator.js +++ b/lib/cube/evaluator.js @@ -12,10 +12,10 @@ var headers = { "Access-Control-Allow-Origin": "*" }; -exports.register = function(db, endpoints) { - var event = require("./event").getter(db), - metric = require("./metric").getter(db), - types = require("./types").getter(db); +exports.register = function(db, endpoints, authFun) { + var event = authentication(require("./event").getter(db)), + metric = authentication(require("./metric").getter(db)), + types = authentication(require("./types").getter(db)); // endpoints.ws.push( @@ -45,11 +45,22 @@ exports.register = function(db, endpoints) { if (!("start" in request)) request.start = 0; if (!(+request.limit <= limitMax)) request.limit = limitMax; - if (event(request, callback) < 0) { - response.writeHead(400, headers); - response.end(JSON.stringify(data[0])); - } else { - response.writeHead(200, headers); + try { + if (event(request, callback) < 0) { + response.writeHead(400, headers); + response.end(JSON.stringify(data[0])); + } else { + response.writeHead(200, headers); + } + } catch (e) { + if (e.toString() == "AuthenticationError: Invalid Credentials") { + response.writeHead(401, headers); + response.end(JSON.stringify({error: e.toString()})); + } else { + response.writeHead(400, headers); + response.end(JSON.stringify({error: e.toString()})); + } + return; } function callback(d) { @@ -80,11 +91,22 @@ exports.register = function(db, endpoints) { stop = new Date(request.stop); if ((stop - start) / step > limit) request.start = new Date(stop - step * limit); - if (metric(request, callback) < 0) { - response.writeHead(400, headers); - response.end(JSON.stringify(data[0])); - } else { - response.writeHead(200, headers); + try { + if (metric(request, callback) < 0) { + response.writeHead(400, headers); + response.end(JSON.stringify(data[0])); + } else { + response.writeHead(200, headers); + } + } catch (e) { + if (e.toString() == "AuthenticationError: Invalid Credentials") { + response.writeHead(401, headers); + response.end(JSON.stringify({error: e.toString()})); + } else { + response.writeHead(400, headers); + response.end(JSON.stringify({error: e.toString()})); + } + return; } function callback(d) { @@ -94,10 +116,30 @@ exports.register = function(db, endpoints) { } function typesGet(request, response) { - types(url.parse(request.url, true).query, function(data) { - response.writeHead(200, headers); - response.end(JSON.stringify(data)); - }); + try { + types(url.parse(request.url, true).query, function(data) { + response.writeHead(200, headers); + response.end(JSON.stringify(data)); + }); + } catch (e) { + if (e.toString() == "AuthenticationError: Invalid Credentials") { + response.writeHead(401, headers); + response.end(JSON.stringify({error: e.toString()})); + } else { + response.writeHead(400, headers); + response.end(JSON.stringify({error: e.toString()})); + } + return; + } + } + + function authentication(getter) { + return function(request, callback) { + if (authFun && !authFun(request)) { + throw "AuthenticationError: Invalid Credentials"; + } + getter(request, callback); + } } }; diff --git a/test/authenticated-evaluator-test.js b/test/authenticated-evaluator-test.js new file mode 100644 index 00000000..a9a0a754 --- /dev/null +++ b/test/authenticated-evaluator-test.js @@ -0,0 +1,48 @@ +var vows = require("vows"), + assert = require("assert"), + cube = require("../"), + test = require("./test"); + +var suite = vows.describe("authenticated-evaluator"); + +var port = ++test.port, server = cube.server({ + "mongo-host": "localhost", + "mongo-port": 27017, + "mongo-database": "cube_test", + "http-port": port, + "authentication": function (data) { + return data.user === "goodUser" && data.password === "goodPassword"; + } +}); + +server.register = cube.evaluator.register; + +server.start(); + +suite.addBatch(test.batch({ + "GET /event/get with valid credentials": { + topic: test.request({method: "GET", + port: port, + path: "/1.0/event/get?expression=test(index)&user=goodUser&password=goodPassword" + }), + "responds with status 200": function(response) { + assert.equal(response.statusCode, 200); + assert.deepEqual(JSON.parse(response.body), []); + } + } +})); + +suite.addBatch(test.batch({ + "GET /event/get with invalid credentials": { + topic: test.request({method: "GET", + port: port, + path: "/1.0/event/get?expression=test(index)&user=goodUser&password=badPassword" + }), + "responds with status 401": function(response) { + assert.equal(response.statusCode, 401); + assert.deepEqual(JSON.parse(response.body), {error: "AuthenticationError: Invalid Credentials"}); + } + } +})); + +suite.export(module); From 1838cfcae7f7ad2533ca10da9b555d32d2c32b91 Mon Sep 17 00:00:00 2001 From: Trotter Cashion Date: Wed, 20 Jun 2012 17:27:25 -0700 Subject: [PATCH 07/17] Use more consistent naming --- lib/cube/server.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/cube/server.js b/lib/cube/server.js index fb3d76e2..37a5fdd9 100644 --- a/lib/cube/server.js +++ b/lib/cube/server.js @@ -43,7 +43,7 @@ module.exports = function(options) { mongo = new mongodb.Server(options["mongo-host"], options["mongo-port"], server_options), db = new mongodb.Db(options["mongo-database"], mongo, db_options), id = 0, - authentication = options.authentication; + authFun = options.authentication; secondary.server = primary; @@ -155,7 +155,7 @@ module.exports = function(options) { // Start the server! function ready() { - server.register(db, endpoints, authentication); + server.register(db, endpoints, authFun); meta = require("./event").putter(db); util.log("starting http server on port " + options["http-port"]); primary.listen(options["http-port"]); From fb239eba77b6395e4ec1534b893e7d0c9e0c84f5 Mon Sep 17 00:00:00 2001 From: Trotter Cashion Date: Wed, 20 Jun 2012 18:25:47 -0700 Subject: [PATCH 08/17] Add optional namespacing of types --- lib/cube/collector.js | 4 ++-- lib/cube/evaluator.js | 8 ++++---- lib/cube/event.js | 10 ++++++++-- lib/cube/server.js | 5 +++-- 4 files changed, 17 insertions(+), 10 deletions(-) diff --git a/lib/cube/collector.js b/lib/cube/collector.js index 4896312a..41df19b7 100644 --- a/lib/cube/collector.js +++ b/lib/cube/collector.js @@ -6,8 +6,8 @@ var headers = { "Access-Control-Allow-Origin": "*" }; -exports.register = function(db, endpoints, authFun) { - var putter = authentication(require("./event").putter(db), authFun), +exports.register = function(db, endpoints, authFun, namespaceFun) { + var putter = authentication(require("./event").putter(db, namespaceFun), authFun), poster = post(putter); // diff --git a/lib/cube/evaluator.js b/lib/cube/evaluator.js index 480fc117..caed2899 100644 --- a/lib/cube/evaluator.js +++ b/lib/cube/evaluator.js @@ -12,10 +12,10 @@ var headers = { "Access-Control-Allow-Origin": "*" }; -exports.register = function(db, endpoints, authFun) { - var event = authentication(require("./event").getter(db)), - metric = authentication(require("./metric").getter(db)), - types = authentication(require("./types").getter(db)); +exports.register = function(db, endpoints, authFun, namespaceFun) { + var event = authentication(require("./event").getter(db, namespaceFun)), + metric = authentication(require("./metric").getter(db, namespaceFun)), + types = authentication(require("./types").getter(db, namespaceFun)); // endpoints.ws.push( diff --git a/lib/cube/event.js b/lib/cube/event.js index 4825904e..7d83406b 100644 --- a/lib/cube/event.js +++ b/lib/cube/event.js @@ -22,7 +22,7 @@ var streamDelayDefault = 5000, // How frequently to invalidate metrics after receiving events. var invalidateInterval = 5000; -exports.putter = function(db) { +exports.putter = function(db, namespaceFun) { var collection = types(db), knownByType = {}, eventsToSaveByType = {}, @@ -32,6 +32,9 @@ exports.putter = function(db) { var time = "time" in request ? new Date(request.time) : new Date(), type = request.type; + // Namespace the type if necessary. + if (namespaceFun) type = namespaceFun(type, request); + // Validate the date and type. if (!type_re.test(type)) return callback({error: "invalid type"}), -1; if (isNaN(time)) return callback({error: "invalid time"}), -1; @@ -140,7 +143,7 @@ exports.putter = function(db) { return putter; }; -exports.getter = function(db) { +exports.getter = function(db, namespaceFun) { var collection = types(db), streamsBySource = {}; @@ -162,6 +165,9 @@ exports.getter = function(db) { return callback({error: "invalid expression"}), -1; } + // Namespace the type if necessary. + if (namespaceFun) expression.type = namespaceFun(expression.type, request); + // Set an optional limit on the number of events to return. var options = {sort: {t: -1}, batchSize: 1000}; if ("limit" in request) options.limit = +request.limit; diff --git a/lib/cube/server.js b/lib/cube/server.js index 37a5fdd9..27df6059 100644 --- a/lib/cube/server.js +++ b/lib/cube/server.js @@ -43,7 +43,8 @@ module.exports = function(options) { mongo = new mongodb.Server(options["mongo-host"], options["mongo-port"], server_options), db = new mongodb.Db(options["mongo-database"], mongo, db_options), id = 0, - authFun = options.authentication; + authFun = options.authentication, + namespaceFun = options.namespace; secondary.server = primary; @@ -155,7 +156,7 @@ module.exports = function(options) { // Start the server! function ready() { - server.register(db, endpoints, authFun); + server.register(db, endpoints, authFun, namespaceFun); meta = require("./event").putter(db); util.log("starting http server on port " + options["http-port"]); primary.listen(options["http-port"]); From 81f88ee39cdc923bacdf4ab46bc0e30d31f7c37f Mon Sep 17 00:00:00 2001 From: Trotter Cashion Date: Wed, 20 Jun 2012 18:35:52 -0700 Subject: [PATCH 09/17] Record event when authentication has failed --- lib/cube/collector.js | 26 ++++++++++++++++---------- lib/cube/evaluator.js | 11 ++++++++++- 2 files changed, 26 insertions(+), 11 deletions(-) diff --git a/lib/cube/collector.js b/lib/cube/collector.js index 41df19b7..94216bef 100644 --- a/lib/cube/collector.js +++ b/lib/cube/collector.js @@ -7,7 +7,8 @@ var headers = { }; exports.register = function(db, endpoints, authFun, namespaceFun) { - var putter = authentication(require("./event").putter(db, namespaceFun), authFun), + var meta = require("./event").putter(db), + putter = authentication(require("./event").putter(db, namespaceFun), authFun), poster = post(putter); // @@ -24,6 +25,20 @@ exports.register = function(db, endpoints, authFun, namespaceFun) { // endpoints.udp = putter; + + function authentication(putter, authFun) { + return function(data) { + if (authFun && !authFun(data)) { + meta({ + type: "failed_authentication", + time: Date.now(), + data: data + }); + throw "AuthenticationError: Invalid Credentials"; + } + putter(data); + } + } }; function post(putter) { @@ -50,12 +65,3 @@ function post(putter) { }); }; } - -function authentication(putter, authFun) { - return function(data) { - if (authFun && !authFun(data)) { - throw "AuthenticationError: Invalid Credentials"; - } - putter(data); - } -} diff --git a/lib/cube/evaluator.js b/lib/cube/evaluator.js index caed2899..ea0dff11 100644 --- a/lib/cube/evaluator.js +++ b/lib/cube/evaluator.js @@ -13,7 +13,8 @@ var headers = { }; exports.register = function(db, endpoints, authFun, namespaceFun) { - var event = authentication(require("./event").getter(db, namespaceFun)), + var meta = require("./event").putter(db), + event = authentication(require("./event").getter(db, namespaceFun)), metric = authentication(require("./metric").getter(db, namespaceFun)), types = authentication(require("./types").getter(db, namespaceFun)); @@ -136,6 +137,14 @@ exports.register = function(db, endpoints, authFun, namespaceFun) { function authentication(getter) { return function(request, callback) { if (authFun && !authFun(request)) { + meta({ + type: "failed_authentication", + time: Date.now(), + data: { + ip: connection.remoteAddress, + path: request.url + } + }); throw "AuthenticationError: Invalid Credentials"; } getter(request, callback); From 08bc458e7bd553c51bb6d40b7e8f57c9747c1eca Mon Sep 17 00:00:00 2001 From: Trotter Cashion Date: Mon, 25 Jun 2012 16:23:24 -0700 Subject: [PATCH 10/17] Fix reference error --- lib/cube/evaluator.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/cube/evaluator.js b/lib/cube/evaluator.js index ea0dff11..0ae6d71d 100644 --- a/lib/cube/evaluator.js +++ b/lib/cube/evaluator.js @@ -141,7 +141,7 @@ exports.register = function(db, endpoints, authFun, namespaceFun) { type: "failed_authentication", time: Date.now(), data: { - ip: connection.remoteAddress, + ip: request.remoteAddress, path: request.url } }); From ef991af1033b0059e5a65666b97c0a996f513884 Mon Sep 17 00:00:00 2001 From: Trotter Cashion Date: Mon, 25 Jun 2012 16:25:16 -0700 Subject: [PATCH 11/17] Only pay the authFun performance penalty once --- lib/cube/collector.js | 24 ++++++++++++++---------- lib/cube/evaluator.js | 30 +++++++++++++++++------------- 2 files changed, 31 insertions(+), 23 deletions(-) diff --git a/lib/cube/collector.js b/lib/cube/collector.js index 94216bef..fe3bc29c 100644 --- a/lib/cube/collector.js +++ b/lib/cube/collector.js @@ -27,16 +27,20 @@ exports.register = function(db, endpoints, authFun, namespaceFun) { endpoints.udp = putter; function authentication(putter, authFun) { - return function(data) { - if (authFun && !authFun(data)) { - meta({ - type: "failed_authentication", - time: Date.now(), - data: data - }); - throw "AuthenticationError: Invalid Credentials"; - } - putter(data); + if (authFun) { + return function(data) { + if (!authFun(data)) { + meta({ + type: "failed_authentication", + time: Date.now(), + data: data + }); + throw "AuthenticationError: Invalid Credentials"; + } + putter(data); + }; + } else { + return putter; } } }; diff --git a/lib/cube/evaluator.js b/lib/cube/evaluator.js index 0ae6d71d..b77e8acd 100644 --- a/lib/cube/evaluator.js +++ b/lib/cube/evaluator.js @@ -135,19 +135,23 @@ exports.register = function(db, endpoints, authFun, namespaceFun) { } function authentication(getter) { - return function(request, callback) { - if (authFun && !authFun(request)) { - meta({ - type: "failed_authentication", - time: Date.now(), - data: { - ip: request.remoteAddress, - path: request.url - } - }); - throw "AuthenticationError: Invalid Credentials"; - } - getter(request, callback); + if (authFun) { + return function(request, callback) { + if (!authFun(request)) { + meta({ + type: "failed_authentication", + time: Date.now(), + data: { + ip: request.remoteAddress, + path: request.url + } + }); + throw "AuthenticationError: Invalid Credentials"; + } + getter(request, callback); + }; + } else { + return getter; } } }; From ff08eefa85237facc3c2c5e2c88dd9679bbb3918 Mon Sep 17 00:00:00 2001 From: Trotter Cashion Date: Tue, 26 Jun 2012 11:14:58 -0700 Subject: [PATCH 12/17] Pass success and failure fns to authentication. User may need to make a db request, which will require a callback. --- lib/cube/collector.js | 5 ++--- lib/cube/evaluator.js | 5 ++--- test/authenticated-collector-test.js | 8 ++++++-- test/authenticated-evaluator-test.js | 8 ++++++-- 4 files changed, 16 insertions(+), 10 deletions(-) diff --git a/lib/cube/collector.js b/lib/cube/collector.js index fe3bc29c..28856183 100644 --- a/lib/cube/collector.js +++ b/lib/cube/collector.js @@ -29,15 +29,14 @@ exports.register = function(db, endpoints, authFun, namespaceFun) { function authentication(putter, authFun) { if (authFun) { return function(data) { - if (!authFun(data)) { + authFun(data, function () { putter(data); }, function () { meta({ type: "failed_authentication", time: Date.now(), data: data }); throw "AuthenticationError: Invalid Credentials"; - } - putter(data); + }); }; } else { return putter; diff --git a/lib/cube/evaluator.js b/lib/cube/evaluator.js index b77e8acd..d010066f 100644 --- a/lib/cube/evaluator.js +++ b/lib/cube/evaluator.js @@ -137,7 +137,7 @@ exports.register = function(db, endpoints, authFun, namespaceFun) { function authentication(getter) { if (authFun) { return function(request, callback) { - if (!authFun(request)) { + authFun(request, function () { getter(request, callback) }, function () { meta({ type: "failed_authentication", time: Date.now(), @@ -147,8 +147,7 @@ exports.register = function(db, endpoints, authFun, namespaceFun) { } }); throw "AuthenticationError: Invalid Credentials"; - } - getter(request, callback); + }); }; } else { return getter; diff --git a/test/authenticated-collector-test.js b/test/authenticated-collector-test.js index 81eac1bb..322e2845 100644 --- a/test/authenticated-collector-test.js +++ b/test/authenticated-collector-test.js @@ -10,8 +10,12 @@ var port = ++test.port, server = cube.server({ "mongo-port": 27017, "mongo-database": "cube_test", "http-port": port, - "authentication": function (data) { - return data.user === "goodUser" && data.password === "goodPassword"; + "authentication": function (data, successFn, failureFn) { + if (data.user === "goodUser" && data.password === "goodPassword") { + successFn(); + } else { + failureFn(); + } } }); diff --git a/test/authenticated-evaluator-test.js b/test/authenticated-evaluator-test.js index a9a0a754..c42e4ada 100644 --- a/test/authenticated-evaluator-test.js +++ b/test/authenticated-evaluator-test.js @@ -10,8 +10,12 @@ var port = ++test.port, server = cube.server({ "mongo-port": 27017, "mongo-database": "cube_test", "http-port": port, - "authentication": function (data) { - return data.user === "goodUser" && data.password === "goodPassword"; + "authentication": function (data, successFn, failureFn) { + if (data.user === "goodUser" && data.password === "goodPassword") { + successFn(); + } else { + failureFn(); + } } }); From 4e937de357ba1abad8e1e01eb0d66d5aa5fd1307 Mon Sep 17 00:00:00 2001 From: Trotter Cashion Date: Tue, 26 Jun 2012 11:43:46 -0700 Subject: [PATCH 13/17] Pass callback to namespaceFun. Users may need to check the database to figure out how to namespace the type, therefore we really have to work with a callback. --- lib/cube/event.js | 90 +++++++++++++++++++++++++---------------------- 1 file changed, 48 insertions(+), 42 deletions(-) diff --git a/lib/cube/event.js b/lib/cube/event.js index 7d83406b..4e606915 100644 --- a/lib/cube/event.js +++ b/lib/cube/event.js @@ -30,60 +30,66 @@ exports.putter = function(db, namespaceFun) { function putter(request, callback) { var time = "time" in request ? new Date(request.time) : new Date(), - type = request.type; + typeWithoutNamespace = request.type; // Namespace the type if necessary. - if (namespaceFun) type = namespaceFun(type, request); + if (namespaceFun) { + namespaceFun(typeWithoutNamespace, request, namespaceFunCallback); + } else { + namespaceFunCallback(typeWithoutNamespace); + } - // Validate the date and type. - if (!type_re.test(type)) return callback({error: "invalid type"}), -1; - if (isNaN(time)) return callback({error: "invalid time"}), -1; + function namespaceFunCallback(type) { + // Validate the date and type. + if (!type_re.test(type)) return callback({error: "invalid type"}), -1; + if (isNaN(time)) return callback({error: "invalid time"}), -1; - // If an id is specified, promote it to Mongo's primary key. - var event = {t: time, d: request.data}; - if ("id" in request) event._id = request.id; + // If an id is specified, promote it to Mongo's primary key. + var event = {t: time, d: request.data}; + if ("id" in request) event._id = request.id; - // If this is a known event type, save immediately. - if (type in knownByType) return save(type, event); + // If this is a known event type, save immediately. + if (type in knownByType) return save(type, event); - // If someone is already creating the event collection for this new type, - // then append this event to the queue for later save. - if (type in eventsToSaveByType) return eventsToSaveByType[type].push(event); + // If someone is already creating the event collection for this new type, + // then append this event to the queue for later save. + if (type in eventsToSaveByType) return eventsToSaveByType[type].push(event); - // Otherwise, it's up to us to see if the collection exists, verify the - // associated indexes, create the corresponding metrics collection, and save - // any events that have queued up in the interim! + // Otherwise, it's up to us to see if the collection exists, verify the + // associated indexes, create the corresponding metrics collection, and save + // any events that have queued up in the interim! - // First add the new event to the queue. - eventsToSaveByType[type] = [event]; + // First add the new event to the queue. + eventsToSaveByType[type] = [event]; - // If the events collection exists, then we assume the metrics & indexes do - // too. Otherwise, we must create the required collections and indexes. Note - // that if you want to customize the size of the capped metrics collection, - // or add custom indexes, you can still do all that by hand. - db.collectionNames(type + "_events", function(error, names) { - var events = collection(type).events; - if (names.length) return saveEvents(); + // If the events collection exists, then we assume the metrics & indexes do + // too. Otherwise, we must create the required collections and indexes. Note + // that if you want to customize the size of the capped metrics collection, + // or add custom indexes, you can still do all that by hand. + db.collectionNames(type + "_events", function(error, names) { + var events = collection(type).events; + if (names.length) return saveEvents(); - // Events are indexed by time. - events.ensureIndex({"t": 1}, handle); + // Events are indexed by time. + events.ensureIndex({"t": 1}, handle); - // Create a capped collection for metrics. Three indexes are required: one - // for finding metrics, one (_id) for updating, and one for invalidation. - db.createCollection(type + "_metrics", metric_options, function(error, metrics) { - handle(error); - metrics.ensureIndex({"i": 1, "_id.e": 1, "_id.l": 1, "_id.t": 1}, handle); - metrics.ensureIndex({"i": 1, "_id.l": 1, "_id.t": 1}, handle); - saveEvents(); - }); + // Create a capped collection for metrics. Three indexes are required: one + // for finding metrics, one (_id) for updating, and one for invalidation. + db.createCollection(type + "_metrics", metric_options, function(error, metrics) { + handle(error); + metrics.ensureIndex({"i": 1, "_id.e": 1, "_id.l": 1, "_id.t": 1}, handle); + metrics.ensureIndex({"i": 1, "_id.l": 1, "_id.t": 1}, handle); + saveEvents(); + }); - // Save any pending events to the new collection. - function saveEvents() { - knownByType[type] = true; - eventsToSaveByType[type].forEach(function(event) { save(type, event); }); - delete eventsToSaveByType[type]; - } - }); + // Save any pending events to the new collection. + function saveEvents() { + knownByType[type] = true; + eventsToSaveByType[type].forEach(function(event) { save(type, event); }); + delete eventsToSaveByType[type]; + } + }); + } } // Save the event of the specified type, and queue invalidation of any cached From 50e90ec3c4bbf2dc3cf9cf11116deafada3d53f2 Mon Sep 17 00:00:00 2001 From: Trotter Cashion Date: Wed, 27 Jun 2012 12:39:50 -0700 Subject: [PATCH 14/17] Wrap getter in namespaceCallback as well --- lib/cube/event.js | 160 ++++++++++++++++++++++++---------------------- 1 file changed, 84 insertions(+), 76 deletions(-) diff --git a/lib/cube/event.js b/lib/cube/event.js index 4e606915..335f6855 100644 --- a/lib/cube/event.js +++ b/lib/cube/event.js @@ -172,96 +172,104 @@ exports.getter = function(db, namespaceFun) { } // Namespace the type if necessary. - if (namespaceFun) expression.type = namespaceFun(expression.type, request); - - // Set an optional limit on the number of events to return. - var options = {sort: {t: -1}, batchSize: 1000}; - if ("limit" in request) options.limit = +request.limit; + if (namespaceFun) { + namespaceFun(expression.type, request, namespaceFunCallback); + } else { + namespaceFunCallback(expression.type); + } - // Copy any expression filters into the query object. - var filter = {t: {$gte: start, $lt: stop}}; - expression.filter(filter); + function namespaceFunCallback(type) { + expression.type = type; - // Request any needed fields. - var fields = {t: 1}; - expression.fields(fields); + // Set an optional limit on the number of events to return. + var options = {sort: {t: -1}, batchSize: 1000}; + if ("limit" in request) options.limit = +request.limit; - // Query for the desired events. - function query(callback) { - collection(expression.type).events.find(filter, fields, options, function(error, cursor) { - handle(error); - cursor.each(function(error, event) { + // Copy any expression filters into the query object. + var filter = {t: {$gte: start, $lt: stop}}; + expression.filter(filter); - // If the callback is closed (i.e., if the WebSocket connection was - // closed), then abort the query. Note that closing the cursor mid- - // loop causes an error, which we subsequently ignore! - if (callback.closed) return cursor.close(); + // Request any needed fields. + var fields = {t: 1}; + expression.fields(fields); + // Query for the desired events. + function query(callback) { + collection(expression.type).events.find(filter, fields, options, function(error, cursor) { handle(error); + cursor.each(function(error, event) { - // A null event indicates that there are no more results. - if (event) callback({id: event._id instanceof ObjectID ? undefined : event._id, time: event.t, data: event.d}); - else callback(null); - }); - }); - } + // If the callback is closed (i.e., if the WebSocket connection was + // closed), then abort the query. Note that closing the cursor mid- + // loop causes an error, which we subsequently ignore! + if (callback.closed) return cursor.close(); + + handle(error); - // For streaming queries, share streams for efficient polling. - if (stream) { - var streams = streamsBySource[expression.source]; - - // If there is an existing stream to attach to, backfill the initial set - // of results to catch the client up to the stream. Add the new callback - // to a queue, so that when the shared stream finishes its current poll, - // it begins notifying the new client. Note that we don't pass the null - // (end terminator) to the callback, because more results are to come! - if (streams) { - filter.t.$lt = streams.time; - streams.waiting.push(callback); - query(function(event) { if (event) callback(event); }); + // A null event indicates that there are no more results. + if (event) callback({id: event._id instanceof ObjectID ? undefined : event._id, time: event.t, data: event.d}); + else callback(null); + }); + }); } - // Otherwise, we're creating a new stream, so we're responsible for - // starting the polling loop. This means notifying active callbacks, - // detecting when active callbacks are closed, advancing the time window, - // and moving waiting clients to active clients. - else { - streams = streamsBySource[expression.source] = {time: stop, waiting: [], active: [callback]}; - (function poll() { - query(function(event) { - - // If there's an event, send it to all active, open clients. - if (event) { - streams.active.forEach(function(callback) { - if (!callback.closed) callback(event); - }); - } - - // Otherwise, we've reached the end of a poll, and it's time to - // merge the waiting callbacks into the active callbacks. Advance - // the time range, and set a timeout for the next poll. - else { - streams.active = streams.active.concat(streams.waiting).filter(open); - streams.waiting = []; - - // If no clients remain, then it's safe to delete the shared - // stream, and we'll no longer be responsible for polling. - if (!streams.active.length) { - delete streamsBySource[expression.source]; - return; + // For streaming queries, share streams for efficient polling. + if (stream) { + var streams = streamsBySource[expression.source]; + + // If there is an existing stream to attach to, backfill the initial set + // of results to catch the client up to the stream. Add the new callback + // to a queue, so that when the shared stream finishes its current poll, + // it begins notifying the new client. Note that we don't pass the null + // (end terminator) to the callback, because more results are to come! + if (streams) { + filter.t.$lt = streams.time; + streams.waiting.push(callback); + query(function(event) { if (event) callback(event); }); + } + + // Otherwise, we're creating a new stream, so we're responsible for + // starting the polling loop. This means notifying active callbacks, + // detecting when active callbacks are closed, advancing the time window, + // and moving waiting clients to active clients. + else { + streams = streamsBySource[expression.source] = {time: stop, waiting: [], active: [callback]}; + (function poll() { + query(function(event) { + + // If there's an event, send it to all active, open clients. + if (event) { + streams.active.forEach(function(callback) { + if (!callback.closed) callback(event); + }); } - filter.t.$gte = streams.time; - filter.t.$lt = streams.time = new Date(Date.now() - delay); - setTimeout(poll, streamInterval); - } - }); - })(); + // Otherwise, we've reached the end of a poll, and it's time to + // merge the waiting callbacks into the active callbacks. Advance + // the time range, and set a timeout for the next poll. + else { + streams.active = streams.active.concat(streams.waiting).filter(open); + streams.waiting = []; + + // If no clients remain, then it's safe to delete the shared + // stream, and we'll no longer be responsible for polling. + if (!streams.active.length) { + delete streamsBySource[expression.source]; + return; + } + + filter.t.$gte = streams.time; + filter.t.$lt = streams.time = new Date(Date.now() - delay); + setTimeout(poll, streamInterval); + } + }); + })(); + } } - } - // For non-streaming queries, just send the single batch! - else query(callback); + // For non-streaming queries, just send the single batch! + else query(callback); + } } getter.close = function(callback) { From 19e31354a7e2f090d67be78ea3a50e182d6d1356 Mon Sep 17 00:00:00 2001 From: Trotter Cashion Date: Wed, 27 Jun 2012 14:47:45 -0700 Subject: [PATCH 15/17] Store types per namespace --- lib/cube/event.js | 11 +++++++++++ lib/cube/server.js | 17 ++++++++++++++--- 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/lib/cube/event.js b/lib/cube/event.js index 335f6855..ca7b3b26 100644 --- a/lib/cube/event.js +++ b/lib/cube/event.js @@ -68,6 +68,7 @@ exports.putter = function(db, namespaceFun) { // or add custom indexes, you can still do all that by hand. db.collectionNames(type + "_events", function(error, names) { var events = collection(type).events; + if (names.length) return saveEvents(); // Events are indexed by time. @@ -82,12 +83,22 @@ exports.putter = function(db, namespaceFun) { saveEvents(); }); + addTypeToNamespace(); + // Save any pending events to the new collection. function saveEvents() { knownByType[type] = true; eventsToSaveByType[type].forEach(function(event) { save(type, event); }); delete eventsToSaveByType[type]; } + + function addTypeToNamespace() { + if (!namespaceFun) return; + + namespaceFun("", request, function (ns) { + db.collection("cube_namespaces").insert({namespace: ns, collection: type}); + }); + } }); } } diff --git a/lib/cube/server.js b/lib/cube/server.js index 27df6059..df811515 100644 --- a/lib/cube/server.js +++ b/lib/cube/server.js @@ -145,17 +145,17 @@ module.exports = function(options) { // Connect to mongodb. util.log("starting mongodb client"); db.open(function(error) { - if (error) throw error; - if (options["mongo-username"] == null) return ready(); + if (options["mongo-username"] == null) return createNamespacesCollection(); db.authenticate(options["mongo-username"], options["mongo-password"], function(error, success) { if (error) throw error; if (!success) throw new Error("authentication failed"); - ready(); + createNamespacesCollection(); }); }); // Start the server! function ready() { + server.register(db, endpoints, authFun, namespaceFun); meta = require("./event").putter(db); util.log("starting http server on port " + options["http-port"]); @@ -169,6 +169,17 @@ module.exports = function(options) { udp.bind(options["udp-port"]); } } + + function createNamespacesCollection() { + if (namespaceFun) { + db.createCollection("cube_namespaces", {}, function (error, namespaces) { + if (error) throw error; + ready(); + }); + } else { + ready(); + } + } }; return server; From 4b61d3e5070e8ec31c0ff884ca4a11ec8d7a3426 Mon Sep 17 00:00:00 2001 From: Trotter Cashion Date: Wed, 27 Jun 2012 16:12:45 -0700 Subject: [PATCH 16/17] Store types as an array per namespace --- lib/cube/event.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/cube/event.js b/lib/cube/event.js index ca7b3b26..769bb948 100644 --- a/lib/cube/event.js +++ b/lib/cube/event.js @@ -96,7 +96,8 @@ exports.putter = function(db, namespaceFun) { if (!namespaceFun) return; namespaceFun("", request, function (ns) { - db.collection("cube_namespaces").insert({namespace: ns, collection: type}); + db.collection("cube_namespaces").insert({_id: ns, types: []}); + db.collection("cube_namespaces").update({_id: ns}, {$push: { types: type }}); }); } }); From af27c3c9a1e9aaac8b98f2b9ccc3df08fd8cf67b Mon Sep 17 00:00:00 2001 From: Trotter Cashion Date: Thu, 28 Jun 2012 16:39:45 -0700 Subject: [PATCH 17/17] Apply namespace to metric and types --- lib/cube/metric.js | 32 +++++++++++++++++++++----------- lib/cube/types.js | 25 +++++++++++++++++++------ 2 files changed, 40 insertions(+), 17 deletions(-) diff --git a/lib/cube/metric.js b/lib/cube/metric.js index 3421aea2..41c5e32b 100644 --- a/lib/cube/metric.js +++ b/lib/cube/metric.js @@ -11,8 +11,8 @@ var metric_fields = {v: 1}, event_options = {sort: {t: 1}, batchSize: 1000}; // Query for metrics. -exports.getter = function(db) { - var collection = types(db), +exports.getter = function(db, namespaceFun) { + var collection = types(db, namespaceFun), Double = db.bson_serializer.Double, queueByName = {}, meta = event.putter(db); @@ -34,16 +34,26 @@ exports.getter = function(db) { return callback({error: "invalid expression"}), -1; } - // Round start and stop to the appropriate time step. - var tier = tiers[+request.step]; - if (!tier) return callback({error: "invalid step"}), -1; - start = tier.floor(start); - stop = tier.ceil(stop); + if (namespaceFun) { + namespaceFun(expression.type, request, namespaceFunCallback); + } else { + namespaceFunCallback(expression.type); + } + + function namespaceFunCallback(type) { + expression.type = type; - // Compute the request metric! - measure(expression, start, stop, tier, "id" in request - ? function(time, value) { callback({time: time, value: value, id: id}); } - : function(time, value) { callback({time: time, value: value}); }); + // Round start and stop to the appropriate time step. + var tier = tiers[+request.step]; + if (!tier) return callback({error: "invalid step"}), -1; + start = tier.floor(start); + stop = tier.ceil(stop); + + // Compute the request metric! + measure(expression, start, stop, tier, "id" in request + ? function(time, value) { callback({time: time, value: value, id: id}); } + : function(time, value) { callback({time: time, value: value}); }); + } } // Computes the metric for the given expression for the time interval from diff --git a/lib/cube/types.js b/lib/cube/types.js index 251f5e8e..8dfcafae 100644 --- a/lib/cube/types.js +++ b/lib/cube/types.js @@ -19,15 +19,28 @@ var types = module.exports = function(db) { var eventRe = /_events$/; -types.getter = function(db) { +types.getter = function(db, namespaceFun) { return function(request, callback) { db.collectionNames(function(error, names) { handle(error); - callback(names - .map(function(d) { return d.name.split(".")[1]; }) - .filter(function(d) { return eventRe.test(d); }) - .map(function(d) { return d.substring(0, d.length - 7); }) - .sort()); + + if (namespaceFun) { + namespaceFun("", request, function (ns) { + var nsRe = new RegExp(ns); + + callback(names + .map(function(d) { return d.name.split(".")[1]; }) + .filter(function(d) { return eventRe.test(d) && nsRe.test(d); }) + .map(function(d) { return d.substring(ns.length, d.length - 7); }) + .sort()); + }); + } else { + callback(names + .map(function(d) { return d.name.split(".")[1]; }) + .filter(function(d) { return eventRe.test(d); }) + .map(function(d) { return d.substring(ns.length, d.length - 7); }) + .sort()); + } }); }; };