diff --git a/lib/rpc-client/client.js b/lib/rpc-client/client.js index 2cccea2..bee2e4b 100644 --- a/lib/rpc-client/client.js +++ b/lib/rpc-client/client.js @@ -24,10 +24,14 @@ var STATE_CLOSED = 3; // client has closed * RPC Client Class */ var Client = function(opts) { - this.opts = opts || {}; + opts = opts || {}; this._context = opts.context; this._routeContext = opts.routeContext; this.router = opts.router || router; + if(this._context) { + opts.clientId = this._context.serverId; + } + this.opts = opts; this._station = createStation(opts); this.proxies = {}; @@ -167,7 +171,7 @@ pro.rpcInvoke = function(serverId, msg, cb) { cb(new Error('[pomelo-rpc] fail to do rpc invoke for client is not running')); return; } - var tracer = new Tracer(this.opts.rpcLogger, this.opts.rpcDebugLog, this._context.serverId, serverId, msg); + var tracer = new Tracer(this.opts.rpcLogger, this.opts.rpcDebugLog, this.opts.clientId, serverId, msg); this._station.dispatch(tracer, serverId, msg, null, cb); }; diff --git a/lib/rpc-client/mailboxes/blackhole.js b/lib/rpc-client/mailboxes/blackhole.js index 028706e..45c047b 100644 --- a/lib/rpc-client/mailboxes/blackhole.js +++ b/lib/rpc-client/mailboxes/blackhole.js @@ -4,7 +4,8 @@ var logger = require('pomelo-logger').getLogger('pomelo-rpc', __filename); var exp = module.exports = new EventEmitter(); -exp.connect = function(cb) { +exp.connect = function(tracer, cb) { + tracer.info('client', __filename, 'connect', 'connect to blackhole'); process.nextTick(function() { utils.invokeCallback(cb, new Error('fail to connect to remote server and switch to blackhole.')); }); @@ -13,9 +14,10 @@ exp.connect = function(cb) { exp.close = function(cb) { }; -exp.send = function(msg, opts, cb) { +exp.send = function(tracer, msg, opts, cb) { + tracer.info('client', __filename, 'send', 'send rpc msg to blackhole'); logger.info('message into blackhole: %j', msg); process.nextTick(function() { - utils.invokeCallback(cb, new Error('message was forward to blackhole.')); + utils.invokeCallback(cb, tracer, new Error('message was forward to blackhole.')); }); -}; \ No newline at end of file +}; diff --git a/lib/rpc-client/mailboxes/tcp-mailbox.js b/lib/rpc-client/mailboxes/tcp-mailbox.js index 930d446..0f39134 100644 --- a/lib/rpc-client/mailboxes/tcp-mailbox.js +++ b/lib/rpc-client/mailboxes/tcp-mailbox.js @@ -9,7 +9,7 @@ var DEFAULT_INTERVAL = 50; var MailBox = function(server, opts) { EventEmitter.call(this); - opts = opts || {}; + this.opts = opts || {}; this.id = server.id; this.host = server.host; this.port = server.port; @@ -104,12 +104,12 @@ pro.close = function() { pro.send = function(tracer, msg, opts, cb) { tracer.info('client', __filename, 'send', 'tcp-mailbox try to send'); if(!this.connected) { - utils.invokeCallback(cb, new Error('not init.')); + utils.invokeCallback(cb, tracer, new Error('not init.')); return; } if(this.closed) { - utils.invokeCallback(cb, new Error('mailbox alread closed.')); + utils.invokeCallback(cb, tracer, new Error('mailbox alread closed.')); return; } @@ -157,14 +157,14 @@ var processMsg = function(mailbox, pkg) { return; } - var tracer = new Tracer(mailbox.opts.rpcLogger, mailbox.opts.rpcDebugLog, mailbox.opts.context.serverId, pkg.source, pkg.resp, pkg.traceId, pkg.seqId); + var tracer = new Tracer(mailbox.opts.rpcLogger, mailbox.opts.rpcDebugLog, mailbox.opts.clientId, pkg.source, pkg.resp, pkg.traceId, pkg.seqId); var args = [tracer]; pkg.resp.forEach(function(arg){ args.push(arg); }); - cb.apply(null, pkg.resp); + cb.apply(null, args); }; /** diff --git a/lib/rpc-client/mailboxes/ws-mailbox.js b/lib/rpc-client/mailboxes/ws-mailbox.js index 3ddd95e..1d8f306 100644 --- a/lib/rpc-client/mailboxes/ws-mailbox.js +++ b/lib/rpc-client/mailboxes/ws-mailbox.js @@ -26,7 +26,7 @@ var pro = MailBox.prototype; pro.connect = function(tracer, cb) { tracer.info('client', __filename, 'connect', 'ws-mailbox try to connect'); if(this.connected) { - utils.invokeCallback(new Error('mailbox has already connected.')); + utils.invokeCallback(cb, new Error('mailbox has already connected.')); return; } @@ -100,12 +100,12 @@ pro.close = function() { pro.send = function(tracer, msg, opts, cb) { tracer.info('client', __filename, 'send', 'ws-mailbox try to send'); if(!this.connected) { - utils.invokeCallback(cb, new Error('not init.')); + utils.invokeCallback(cb, tracer, new Error('not init.')); return; } if(this.closed) { - utils.invokeCallback(cb, new Error('mailbox alread closed.')); + utils.invokeCallback(cb, tracer, new Error('mailbox alread closed.')); return; } @@ -152,7 +152,7 @@ var processMsg = function(mailbox, pkg) { return; } - var tracer = new Tracer(mailbox.opts.rpcLogger, mailbox.opts.rpcDebugLog, mailbox.opts.context.serverId, pkg.source, pkg.resp, pkg.traceId, pkg.seqId); + var tracer = new Tracer(mailbox.opts.rpcLogger, mailbox.opts.rpcDebugLog, mailbox.opts.clientId, pkg.source, pkg.resp, pkg.traceId, pkg.seqId); var args = [tracer]; pkg.resp.forEach(function(arg){ diff --git a/lib/rpc-client/mailstation.js b/lib/rpc-client/mailstation.js index ecf9696..9201e64 100644 --- a/lib/rpc-client/mailstation.js +++ b/lib/rpc-client/mailstation.js @@ -16,7 +16,6 @@ var STATE_CLOSED = 3; // station has closed var DEFAULT_PENDING_SIZE = 1000; // default pending message limit -var app; /** * Mail station constructor. * @@ -25,8 +24,6 @@ var app; var MailStation = function(opts) { EventEmitter.call(this); this.opts = opts; - app = opts.context; - this.servers = {}; // remote server info map, key: server id, value: info this.mailboxFactory = opts.mailboxFactory || defaultMailboxFactory; @@ -373,4 +370,4 @@ var flushPending = function(tracer, station, serverId) { */ module.exports.create = function(opts) { return new MailStation(opts || {}); -}; \ No newline at end of file +}; diff --git a/lib/rpc-server/acceptors/tcp-acceptor.js b/lib/rpc-server/acceptors/tcp-acceptor.js index 0ac2f49..a9c2e45 100644 --- a/lib/rpc-server/acceptors/tcp-acceptor.js +++ b/lib/rpc-server/acceptors/tcp-acceptor.js @@ -164,7 +164,7 @@ var flush = function(acceptor) { * create acceptor * * @param opts init params - * @param cb(msg, cb) callback function that would be invoked when new message arrives + * @param cb(tracer, msg, cb) callback function that would be invoked when new message arrives */ module.exports.create = function(opts, cb) { return new Acceptor(opts || {}, cb); diff --git a/lib/rpc-server/acceptors/ws-acceptor.js b/lib/rpc-server/acceptors/ws-acceptor.js index b837322..adfdad6 100644 --- a/lib/rpc-server/acceptors/ws-acceptor.js +++ b/lib/rpc-server/acceptors/ws-acceptor.js @@ -152,7 +152,7 @@ var flush = function(acceptor) { * create acceptor * * @param opts init params - * @param cb(msg, cb) callback function that would be invoked when new message arrives + * @param cb(tracer, msg, cb) callback function that would be invoked when new message arrives */ module.exports.create = function(opts, cb) { return new Acceptor(opts || {}, cb); diff --git a/test/rpc-client/client.js b/test/rpc-client/client.js index a0ab0f6..543fa23 100644 --- a/test/rpc-client/client.js +++ b/test/rpc-client/client.js @@ -173,4 +173,4 @@ describe('client', function() { }); }); -}); \ No newline at end of file +}); diff --git a/test/rpc-client/mailstation.js b/test/rpc-client/mailstation.js index 500d50c..ad19c2a 100644 --- a/test/rpc-client/mailstation.js +++ b/test/rpc-client/mailstation.js @@ -2,6 +2,7 @@ var lib = process.env.POMELO_RPC_COV ? 'lib-cov' : 'lib'; var MailStation = require('../../' + lib + '/rpc-client/mailstation'); var should = require('should'); var Server = require('../../').server; +var Tracer = require('../../lib/util/tracer'); var WAIT_TIME = 100; @@ -128,13 +129,14 @@ describe('mail station', function() { callbackCount++; }; }; + var tracer = new Tracer(null, false); station.start(function(err) { var item; for(var i=0, l=serverList.length; i