Skip to content
This repository has been archived by the owner on Dec 2, 2020. It is now read-only.

Stream feature for metric #74

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
23 changes: 19 additions & 4 deletions lib/cube/event.js
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,18 @@ exports.getter = function(db) {
function getter(request, callback) {
var stream = !("stop" in request),
delay = "delay" in request ? +request.delay : streamDelayDefault,
start = new Date(request.start),
start = "start" in request ? new Date(request.start) : new Date(0),
stop = stream ? new Date(Date.now() - delay) : new Date(request.stop);
id = "id" in request ? request.id : undefined;

// Validate the dates.
if (isNaN(start)) return callback({error: "invalid start"}), -1;
if (isNaN(stop)) return callback({error: "invalid stop"}), -1;

// Convert them to ObjectIDs.
start = ObjectID.createFromTime(start/1000);
stop = ObjectID.createFromTime(stop/1000);

// Parse the expression.
var expression;
try {
Expand Down Expand Up @@ -188,12 +193,22 @@ exports.getter = function(db) {
handle(error);

// A null event indicates that there are no more results.
if (event) callback({id: event._id instanceof ObjectID ? undefined : event._id, time: event.t, data: event.d});
if (event) callback({id: event._id instanceof ObjectID ? id : event._id, time: event.t, data: event.d});
else callback(null);
});
});
}

function formatEvent(event) {

if (event) {
event.id = id;
callback(event);
}
else
callback(null);
}

// For streaming queries, share streams for efficient polling.
if (stream) {
var streams = streamsBySource[expression.source];
Expand All @@ -205,7 +220,7 @@ exports.getter = function(db) {
// (end terminator) to the callback, because more results are to come!
if (streams) {
filter.t.$lt = streams.time;
streams.waiting.push(callback);
streams.waiting.push(formatEvent);
query(function(event) { if (event) callback(event); });
}

Expand All @@ -214,7 +229,7 @@ exports.getter = function(db) {
// detecting when active callbacks are closed, advancing the time window,
// and moving waiting clients to active clients.
else {
streams = streamsBySource[expression.source] = {time: stop, waiting: [], active: [callback]};
streams = streamsBySource[expression.source] = {time: stop, waiting: [], active: [formatEvent]};
(function poll() {
query(function(event) {

Expand Down
142 changes: 129 additions & 13 deletions lib/cube/metric.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,40 @@ var parser = require("./metric-expression"),
tiers = require("./tiers"),
types = require("./types"),
reduces = require("./reduces"),
event = require("./event");
event = require("./event"),
util = require("util");

var metric_fields = {v: 1},
metric_options = {sort: {"_id.t": 1}, batchSize: 1000},
event_options = {sort: {t: 1}, batchSize: 1000};
event_options = {sort: {t: 1}, batchSize: 1000},
limitMax = 1e4;

var streamInterval = 1000;

// Query for metrics.
exports.getter = function(db) {
var collection = types(db),
Double = db.bson_serializer.Double,
queueByName = {},
meta = event.putter(db);
meta = event.putter(db),
streamsByTier = {};

function getter(request, callback) {
var start = new Date(request.start),
stop = new Date(request.stop),

// Provide default start and stop times for recent events.
// If the limit is not specified, or too big, use the maximum limit.
var stream = (request.stop === undefined) ? true : false,//stream = !("stop" in request),
limit = !(+request.limit >= limitMax) ? request.limit : limitMax,
step = +request.step ? +request.step : 1e4,
stop = (request.stop !== undefined) ? new Date(request.stop) : new Date(Math.floor(Date.now() / step) * step),
start = (request.start !== undefined) ? new Date(request.start) : new Date(0),
id = request.id;

// Validate the dates.
if (isNaN(start)) return callback({error: "invalid start"}), -1;
if (isNaN(stop)) return callback({error: "invalid stop"}), -1;
// If the time between start and stop is too long, then bring the start time
// forward so that only the most recent results are returned. This is only
// approximate in the case of months, but why would you want to return
// exactly ten thousand months? Don't rely on exact limits!
if ((stop - start) / step > limit) start = new Date(stop - step * limit);

// Parse the expression.
var expression;
Expand All @@ -34,18 +47,113 @@ exports.getter = function(db) {
return callback({error: "invalid expression"}), -1;
}

// I don't understand why sometime expression.filter is defined, and sometime it's not.
// this condition make the tests successful.
if(expression.filter) {
// Copy any expression filters into the query object.
var filter = {t: {$gte: start, $lt: stop}};
expression.filter(filter);
}

// Round start and stop to the appropriate time step.
var tier = tiers[+request.step];
var tier = tiers[step];
if (!tier) return callback({error: "invalid step"}), -1;
start = tier.floor(start);
stop = tier.ceil(stop);
//stop = tier.ceil(stop);
stop = tier.floor(stop);

// Compute the request metric!
// Compute the request metric
measure(expression, start, stop, tier, "id" in request
? function(time, value) { callback({time: time, value: value, id: id}); }
: function(time, value) { callback({time: time, value: value}); });
? function(time, value) { callback({time: time, value: value, id: request.id}); }
: function(time, value) { callback({time: time, value: value}); });

if (stream) {

// for efficient polling, polling function handle all request with the same tier.
var streams = streamsByTier[tier.key];


// A poll function already exist for this interval :
// just push this request on the waiting stack, ready to be executed on next poll.
if (streams) {
streams.waiting.push({
expression: expression,
id: id,
callback: callback
});
}

// No poll function exist for this interval, let's create a new one.
else
{

streams = streamsByTier[tier.key] = {
tier: tier,
start: stop,
stop: new Date(stop + tier.key),
waiting: [],
active: [{
expression: expression,
id: id,
callback: callback
}]
};

// We call the poll function for the next loop.
// no need to call it right now because measure() already have been called for the current range.

var timer = streams.stop.getTime() - Date.now();
if (timer < 0) timer += tier.key;

setTimeout(poll.bind(this, streams), timer);

} // if streams
} // if stream
}

function poll(streams){
streams.active = streams.active.concat(streams.waiting).filter(open);
streams.waiting = [];

if (!streams.active.length) {
delete streamsByTier[streams.tier.key];
return;
}
else
{
streams.active.forEach(function(stream) {
measure(stream.expression, streams.start, streams.stop, streams.tier, function(time, value) {

if (stream.callback.closed) {
// callback closed already
return;
}

if (value !== undefined) {
var metric = {
time: time,
value: value
};

if (stream.id) {
metric.id = stream.id;
}

stream.callback(metric);
}
});
});
}

streams.start = streams.stop;
streams.stop = streams.tier.floor(Date.now());

var timer = streams.stop.getTime() + streams.tier.key - Date.now();
setTimeout(poll.bind(this, streams), timer);
}



// Computes the metric for the given expression for the time interval from
// start (inclusive) to stop (exclusive). The time granularity is determined
// by the specified tier, such as daily or hourly. The callback is invoked
Expand Down Expand Up @@ -238,9 +346,17 @@ exports.getter = function(db) {
});
}

getter.close = function(callback) {
callback.closed = true;
};

return getter;
};

function handle(error) {
if (error) throw error;
}

function open(request) {
return !request.callback.closed;
}