Skip to content

Commit

Permalink
Small cleanup in backchannel
Browse files Browse the repository at this point in the history
  • Loading branch information
Ilya Radchenko committed Jan 26, 2015
1 parent a2c24ae commit bbe0243
Showing 1 changed file with 63 additions and 60 deletions.
123 changes: 63 additions & 60 deletions lib/backchannel.js
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
'use strict';

/*
* Backchannel server allows workers to stream messages back to the Node server
* in a line-buffered way. Uses SSL.
*/

var _ = require('lodash')
, async = require('async')

, jobs = require('./jobs')
, common = require('./common')
, utils = require('./utils')
, config = require('./config')
var _ = require('lodash');
var async = require('async');
var jobs = require('./jobs');
var common = require('./common');
var utils = require('./utils');
var config = require('./config');
var models = require('./models');

, models = require('./models')
, Job = models.Job
, User = models.User
, Project = models.Project
var Job = models.Job;
var User = models.User;
var Project = models.Project;

function striderJson(provider, project, ref, done) {
function finished(err, contents) {
Expand Down Expand Up @@ -81,19 +82,18 @@ function prepareJob(emitter, job) {
}

function BackChannel(emitter, ws) {
this.users = {}
this.public = {}
this.waiting = {}
this.ws = ws
emitter.on('job.prepare', prepareJob.bind(null, emitter))
emitter.on('job.new', this.newJob.bind(this));
emitter.on('browser.update', this.onUpdate.bind(this))
emitter.on('job.done', this.jobDone.bind(this, emitter))
this.ws = ws;
this.users = {};
this.public = {};
this.waiting = {};

emitter.on('job.prepare', prepareJob.bind(null, emitter));
emitter.on('job.new', this.newJob.bind(this));
emitter.on('browser.update', this.onUpdate.bind(this));
emitter.on('job.done', this.jobDone.bind(this, emitter));
}

BackChannel.prototype = {

send: function (project, event, args) {
if (this.users[project]) {
this.ws.send(this.users[project], [event, args, 'yours'])
Expand All @@ -102,48 +102,51 @@ BackChannel.prototype = {
this.ws.sendPublic(this.users[project], [event, args, 'public'])
}
},

newJob: function (job) {
var name = job.project.name
, self = this
this.waiting[name] = []
this.public[name] = job.project.public

async.parallel
( { collaborators: function (paraCallback) {
User.collaborators(name, 0, function (err, users) {
paraCallback(err, users)
})
}
, admins: function (paraCallback) {
User.admins(paraCallback)
}
}
, function (err, users) {
if (err) return console.error('new job: Failed to query for users')
if (!users.collaborators) return console.error('new job: no users found')
self.users[name] = []

users.collaborators.forEach(function (user) {
self.users[name].push(user._id.toString())
})
// also send to system admins
users.admins.forEach(function (user) {
self.users[name].push(user._id.toString())
})

// Admins maybe collaborators, so unique the array
self.users[name] = _.uniq(self.users[name])

var njob = jobs.small(job)
njob.project = utils.sanitizeProject(job.project)
self.send(name, 'job.new', [njob])

self.waiting[name].forEach(function (item) {
self.send.apply(self, [name].concat(item))
})
delete self.waiting[name]
var self = this;
var name = job.project.name;

this.waiting[name] = [];
this.public[name] = job.project.public;

async.parallel({
collaborators: function (paraCallback) {
User.collaborators(name, 0, function (err, users) {
paraCallback(err, users);
});
},

admins: function (paraCallback) {
User.admins(paraCallback);
}
)
}, function (err, users) {
if (err) return console.error('new job: Failed to query for users')
if (!users.collaborators) return console.error('new job: no users found')
self.users[name] = []

users.collaborators.forEach(function (user) {
self.users[name].push(user._id.toString())
})
// also send to system admins
users.admins.forEach(function (user) {
self.users[name].push(user._id.toString())
})

// Admins maybe collaborators, so unique the array
self.users[name] = _.uniq(self.users[name])

var njob = jobs.small(job)

njob.project = utils.sanitizeProject(job.project)
self.send(name, 'job.new', [njob])

self.waiting[name].forEach(function (item) {
self.send.apply(self, [name].concat(item))
})

delete self.waiting[name]
});
},
// [project name, event name, [list of arguments]]
onUpdate: function (project, event, args) {
Expand Down

0 comments on commit bbe0243

Please sign in to comment.