Skip to content

Commit

Permalink
fix some bugs && update the test cases
Browse files Browse the repository at this point in the history
the test cases were not updated synchronously
after adding tracer to the rpc framework. Also,
it is found some bugs when running test cases and were
fixed in this commit.
  • Loading branch information
cynron committed Dec 2, 2013
1 parent 23c4e6b commit e9c1287
Show file tree
Hide file tree
Showing 13 changed files with 105 additions and 79 deletions.
8 changes: 6 additions & 2 deletions lib/rpc-client/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@ var STATE_CLOSED = 3; // client has closed
* RPC Client Class
*/
var Client = function(opts) {
this.opts = opts || {};
opts = opts || {};
this._context = opts.context;
this._routeContext = opts.routeContext;
this.router = opts.router || router;
if(this._context) {
opts.clientId = this._context.serverId;
}
this.opts = opts;

this._station = createStation(opts);
this.proxies = {};
Expand Down Expand Up @@ -167,7 +171,7 @@ pro.rpcInvoke = function(serverId, msg, cb) {
cb(new Error('[pomelo-rpc] fail to do rpc invoke for client is not running'));
return;
}
var tracer = new Tracer(this.opts.rpcLogger, this.opts.rpcDebugLog, this._context.serverId, serverId, msg);
var tracer = new Tracer(this.opts.rpcLogger, this.opts.rpcDebugLog, this.opts.clientId, serverId, msg);
this._station.dispatch(tracer, serverId, msg, null, cb);
};

Expand Down
10 changes: 6 additions & 4 deletions lib/rpc-client/mailboxes/blackhole.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ var logger = require('pomelo-logger').getLogger('pomelo-rpc', __filename);

var exp = module.exports = new EventEmitter();

exp.connect = function(cb) {
exp.connect = function(tracer, cb) {
tracer.info('client', __filename, 'connect', 'connect to blackhole');
process.nextTick(function() {
utils.invokeCallback(cb, new Error('fail to connect to remote server and switch to blackhole.'));
});
Expand All @@ -13,9 +14,10 @@ exp.connect = function(cb) {
exp.close = function(cb) {
};

exp.send = function(msg, opts, cb) {
exp.send = function(tracer, msg, opts, cb) {
tracer.info('client', __filename, 'send', 'send rpc msg to blackhole');
logger.info('message into blackhole: %j', msg);
process.nextTick(function() {
utils.invokeCallback(cb, new Error('message was forward to blackhole.'));
utils.invokeCallback(cb, tracer, new Error('message was forward to blackhole.'));
});
};
};
10 changes: 5 additions & 5 deletions lib/rpc-client/mailboxes/tcp-mailbox.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ var DEFAULT_INTERVAL = 50;

var MailBox = function(server, opts) {
EventEmitter.call(this);
opts = opts || {};
this.opts = opts || {};
this.id = server.id;
this.host = server.host;
this.port = server.port;
Expand Down Expand Up @@ -104,12 +104,12 @@ pro.close = function() {
pro.send = function(tracer, msg, opts, cb) {
tracer.info('client', __filename, 'send', 'tcp-mailbox try to send');
if(!this.connected) {
utils.invokeCallback(cb, new Error('not init.'));
utils.invokeCallback(cb, tracer, new Error('not init.'));
return;
}

if(this.closed) {
utils.invokeCallback(cb, new Error('mailbox alread closed.'));
utils.invokeCallback(cb, tracer, new Error('mailbox alread closed.'));
return;
}

Expand Down Expand Up @@ -157,14 +157,14 @@ var processMsg = function(mailbox, pkg) {
return;
}

var tracer = new Tracer(mailbox.opts.rpcLogger, mailbox.opts.rpcDebugLog, mailbox.opts.context.serverId, pkg.source, pkg.resp, pkg.traceId, pkg.seqId);
var tracer = new Tracer(mailbox.opts.rpcLogger, mailbox.opts.rpcDebugLog, mailbox.opts.clientId, pkg.source, pkg.resp, pkg.traceId, pkg.seqId);
var args = [tracer];

pkg.resp.forEach(function(arg){
args.push(arg);
});

cb.apply(null, pkg.resp);
cb.apply(null, args);
};

/**
Expand Down
8 changes: 4 additions & 4 deletions lib/rpc-client/mailboxes/ws-mailbox.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ var pro = MailBox.prototype;
pro.connect = function(tracer, cb) {
tracer.info('client', __filename, 'connect', 'ws-mailbox try to connect');
if(this.connected) {
utils.invokeCallback(new Error('mailbox has already connected.'));
utils.invokeCallback(cb, new Error('mailbox has already connected.'));
return;
}

Expand Down Expand Up @@ -100,12 +100,12 @@ pro.close = function() {
pro.send = function(tracer, msg, opts, cb) {
tracer.info('client', __filename, 'send', 'ws-mailbox try to send');
if(!this.connected) {
utils.invokeCallback(cb, new Error('not init.'));
utils.invokeCallback(cb, tracer, new Error('not init.'));
return;
}

if(this.closed) {
utils.invokeCallback(cb, new Error('mailbox alread closed.'));
utils.invokeCallback(cb, tracer, new Error('mailbox alread closed.'));
return;
}

Expand Down Expand Up @@ -152,7 +152,7 @@ var processMsg = function(mailbox, pkg) {
return;
}

var tracer = new Tracer(mailbox.opts.rpcLogger, mailbox.opts.rpcDebugLog, mailbox.opts.context.serverId, pkg.source, pkg.resp, pkg.traceId, pkg.seqId);
var tracer = new Tracer(mailbox.opts.rpcLogger, mailbox.opts.rpcDebugLog, mailbox.opts.clientId, pkg.source, pkg.resp, pkg.traceId, pkg.seqId);
var args = [tracer];

pkg.resp.forEach(function(arg){
Expand Down
5 changes: 1 addition & 4 deletions lib/rpc-client/mailstation.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ var STATE_CLOSED = 3; // station has closed

var DEFAULT_PENDING_SIZE = 1000; // default pending message limit

var app;
/**
* Mail station constructor.
*
Expand All @@ -25,8 +24,6 @@ var app;
var MailStation = function(opts) {
EventEmitter.call(this);
this.opts = opts;
app = opts.context;

this.servers = {}; // remote server info map, key: server id, value: info
this.mailboxFactory = opts.mailboxFactory || defaultMailboxFactory;

Expand Down Expand Up @@ -373,4 +370,4 @@ var flushPending = function(tracer, station, serverId) {
*/
module.exports.create = function(opts) {
return new MailStation(opts || {});
};
};
2 changes: 1 addition & 1 deletion lib/rpc-server/acceptors/tcp-acceptor.js
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ var flush = function(acceptor) {
* create acceptor
*
* @param opts init params
* @param cb(msg, cb) callback function that would be invoked when new message arrives
* @param cb(tracer, msg, cb) callback function that would be invoked when new message arrives
*/
module.exports.create = function(opts, cb) {
return new Acceptor(opts || {}, cb);
Expand Down
2 changes: 1 addition & 1 deletion lib/rpc-server/acceptors/ws-acceptor.js
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ var flush = function(acceptor) {
* create acceptor
*
* @param opts init params
* @param cb(msg, cb) callback function that would be invoked when new message arrives
* @param cb(tracer, msg, cb) callback function that would be invoked when new message arrives
*/
module.exports.create = function(opts, cb) {
return new Acceptor(opts || {}, cb);
Expand Down
2 changes: 1 addition & 1 deletion test/rpc-client/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -173,4 +173,4 @@ describe('client', function() {
});

});
});
});
31 changes: 22 additions & 9 deletions test/rpc-client/mailstation.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ var lib = process.env.POMELO_RPC_COV ? 'lib-cov' : 'lib';
var MailStation = require('../../' + lib + '/rpc-client/mailstation');
var should = require('should');
var Server = require('../../').server;
var Tracer = require('../../lib/util/tracer');

var WAIT_TIME = 100;

Expand Down Expand Up @@ -128,13 +129,14 @@ describe('mail station', function() {
callbackCount++;
};
};
var tracer = new Tracer(null, false);

station.start(function(err) {
var item;
for(var i=0, l=serverList.length; i<l; i++) {
count++;
item = serverList[i];
station.dispatch(item.id, msg, null, func(item.id));
station.dispatch(tracer, item.id, msg, null, func(item.id));
}
});
setTimeout(function() {
Expand Down Expand Up @@ -162,12 +164,14 @@ describe('mail station', function() {
};
};

var tracer = new Tracer(null, false);

station.start(function(err) {
var item;
for(var i=0, l=serverList.length; i<l; i++) {
count++;
item = serverList[i];
station.dispatch(item.id, msg, null, func(item.id));
station.dispatch(tracer, item.id, msg, null, func(item.id));
}
});
setTimeout(function() {
Expand All @@ -187,11 +191,13 @@ describe('mail station', function() {
station.addServer(serverList[i]);
}

var tracer = new Tracer(null, false);

station.start(function(err) {
// add area server
var item = serverList[0];
station.addServer(item);
station.dispatch(item.id, msg, null, function(err, remoteId) {
station.dispatch(tracer, item.id, msg, null, function(err, remoteId) {
should.exist(remoteId);
remoteId.should.equal(item.id);
callbackCount++;
Expand Down Expand Up @@ -221,9 +227,11 @@ describe('mail station', function() {
eventCount++;
});

var tracer = new Tracer(null, false);

station.start(function(err) {
should.exist(station);
station.dispatch(serverId, msg, null, function(err) {
station.dispatch(tracer, serverId, msg, null, function(err) {
should.exist(err);
'message was forward to blackhole.'.should.equal(err.message);
callbackCount++;
Expand Down Expand Up @@ -263,12 +271,14 @@ describe('mail station', function() {
};
};

var tracer = new Tracer(null, false);

station.start(function(err) {
// invoke the lazy connect
var item;
for(var i=0, l=serverList.length; i<l; i++) {
item = serverList[i];
station.dispatch(item.id, msg, null, func(item.id));
station.dispatch(tracer, item.id, msg, null, func(item.id));
}

station.on('close', function(mailboxId) {
Expand Down Expand Up @@ -304,12 +314,14 @@ describe('mail station', function() {
errorEventCount++;
};

var tracer = new Tracer(null, false);

station.start(function(err) {
station.stop();
var item;
for(i=0, l=serverList.length; i<l; i++) {
item = serverList[i];
station.dispatch(item.id, msg, null, func);
station.dispatch(tracer, item.id, msg, null, func);
}
});
setTimeout(function() {
Expand All @@ -326,14 +338,15 @@ describe('mail station', function() {
var sid = 'connector-server-1';
var orgMsg = msg;
var orgOpts = {something: 'hello'};

var station = MailStation.create();
should.exist(station);

for(var i=0, l=serverList.length; i<l; i++) {
station.addServer(serverList[i]);
}

var tracer = new Tracer(null, false);

station.start(function(err) {
station.before(function(fsid, fmsg, fopts, next) {
preFilterCount.should.equal(0);
Expand Down Expand Up @@ -375,7 +388,7 @@ describe('mail station', function() {
next(fsid, fmsg, fopts);
});

station.dispatch(sid, orgMsg, orgOpts, function() {});
station.dispatch(tracer, sid, orgMsg, orgOpts, function() {});
});

setTimeout(function() {
Expand All @@ -386,4 +399,4 @@ describe('mail station', function() {
}, WAIT_TIME);
});
});
});
});
Loading

0 comments on commit e9c1287

Please sign in to comment.