Skip to content

Commit

Permalink
upgrade to 1.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
fantasyni committed Jan 19, 2017
2 parents 1ced570 + 693f2f4 commit 3538a97
Show file tree
Hide file tree
Showing 54 changed files with 6,102 additions and 539 deletions.
3 changes: 2 additions & 1 deletion AUTHORS
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
* Yongchang Zhou <[email protected]>
* Yongchang Zhou <[email protected]>
* fantasyni <[email protected]>
5 changes: 5 additions & 0 deletions histroy.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
1.0.0 / 2017-01-19
=================
* [NEW] use pure javascript implemented mqtt protocol
* [NEW] improved performance
* [NEW] better benchmark samples
145 changes: 82 additions & 63 deletions lib/rpc-client/client.js
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
var Loader = require('pomelo-loader');
var Proxy = require('../util/proxy');
var logger = require('pomelo-logger').getLogger('pomelo-rpc', 'rpc-client');
var failureProcess = require('./failureProcess');
var constants = require('../util/constants');
var Station = require('./mailstation');
var Tracer = require('../util/tracer');
var Loader = require('pomelo-loader');
var utils = require('../util/utils');
var Proxy = require('../util/proxy');
var router = require('./router');
var constants = require('../util/constants');
var Tracer = require('../util/tracer');
var failureProcess = require('./failureProcess');
var logger = require('pomelo-logger').getLogger('pomelo-rpc', __filename);
var async = require('async');

/**
* Client states
*/
var STATE_INITED = 1; // client has inited
var STATE_STARTED = 2; // client has started
var STATE_CLOSED = 3; // client has closed
var STATE_INITED = 1; // client has inited
var STATE_STARTED = 2; // client has started
var STATE_CLOSED = 3; // client has closed

/**
* RPC Client Class
Expand All @@ -24,7 +25,8 @@ var Client = function(opts) {
this._routeContext = opts.routeContext;
this.router = opts.router || router.df;
this.routerType = opts.routerType;
if(this._context) {
this.rpcDebugLog = opts.rpcDebugLog;
if (this._context) {
opts.clientId = this._context.serverId;
}
this.opts = opts;
Expand All @@ -42,21 +44,20 @@ var pro = Client.prototype;
* @param cb {Function} cb(err)
*/
pro.start = function(cb) {
if(this.state > STATE_INITED) {
utils.invokeCallback(cb, new Error('rpc client has started.'));
if (this.state > STATE_INITED) {
cb(new Error('rpc client has started.'));
return;
}

var self = this;
this._station.start(function(err) {
if(err) {
if (err) {
logger.error('[pomelo-rpc] client start fail for ' + err.stack);
utils.invokeCallback(cb, err);
return;
return cb(err);
}
self._station.on('error', failureProcess.bind(self._station));
self.state = STATE_STARTED;
utils.invokeCallback(cb);
cb();
});
};

Expand All @@ -67,7 +68,7 @@ pro.start = function(cb) {
* @return {Void}
*/
pro.stop = function(force) {
if(this.state !== STATE_STARTED) {
if (this.state !== STATE_STARTED) {
logger.warn('[pomelo-rpc] client is not running now.');
return;
}
Expand All @@ -83,11 +84,11 @@ pro.stop = function(force) {
* {namespace, serverType, path}
*/
pro.addProxy = function(record) {
if(!record) {
if (!record) {
return;
}
var proxy = generateProxy(this, record, this._context);
if(!proxy) {
if (!proxy) {
return;
}
insertProxy(this.proxies, record.namespace, record.serverType, proxy);
Expand All @@ -99,10 +100,10 @@ pro.addProxy = function(record) {
* @param {Array} records list of proxy description record
*/
pro.addProxies = function(records) {
if(!records || !records.length) {
if (!records || !records.length) {
return;
}
for(var i=0, l=records.length; i<l; i++) {
for (var i = 0, l = records.length; i < l; i++) {
this.addProxy(records[i]);
}
};
Expand Down Expand Up @@ -161,10 +162,16 @@ pro.replaceServers = function(servers) {
* @param cb {Function} cb(err, ...)
*/
pro.rpcInvoke = function(serverId, msg, cb) {
var tracer = new Tracer(this.opts.rpcLogger, this.opts.rpcDebugLog, this.opts.clientId, serverId, msg);
tracer.info('client', __filename, 'rpcInvoke', 'the entrance of rpc invoke');
if(this.state !== STATE_STARTED) {
tracer.error('client', __filename, 'rpcInvoke', 'fail to do rpc invoke for client is not running');
var rpcDebugLog = this.rpcDebugLog;
var tracer = null;

if (rpcDebugLog) {
tracer = new Tracer(this.opts.rpcLogger, this.opts.rpcDebugLog, this.opts.clientId, serverId, msg);
tracer.info('client', __filename, 'rpcInvoke', 'the entrance of rpc invoke');
}

if (this.state !== STATE_STARTED) {
tracer && tracer.error('client', __filename, 'rpcInvoke', 'fail to do rpc invoke for client is not running');
logger.error('[pomelo-rpc] fail to do rpc invoke for client is not running');
cb(new Error('[pomelo-rpc] fail to do rpc invoke for client is not running'));
return;
Expand Down Expand Up @@ -237,14 +244,14 @@ var createStation = function(opts) {
* @api private
*/
var generateProxy = function(client, record, context) {
if(!record) {
if (!record) {
return;
}
var res, name;
var modules = Loader.load(record.path, context);
if(modules) {
if (modules) {
res = {};
for(name in modules) {
for (name in modules) {
res[name] = Proxy.create({
service: name,
origin: modules[name],
Expand All @@ -269,30 +276,35 @@ var generateProxy = function(client, record, context) {
* @api private
*/
var proxyCB = function(client, serviceName, methodName, args, attach, isToSpecifiedServer) {
if(client.state !== STATE_STARTED) {
if (client.state !== STATE_STARTED) {
logger.error('[pomelo-rpc] fail to invoke rpc proxy for client is not running');
return;
}
if(args.length < 2) {
if (args.length < 2) {
logger.error('[pomelo-rpc] invalid rpc invoke, arguments length less than 2, namespace: %j, serverType, %j, serviceName: %j, methodName: %j',
attach.namespace, attach.serverType, serviceName, methodName);
return;
}
var routeParam = args.shift();
var cb = args.pop();
var serverType = attach.serverType;
var msg = {namespace: attach.namespace, serverType: serverType,
service: serviceName, method: methodName, args: args};
var msg = {
namespace: attach.namespace,
serverType: serverType,
service: serviceName,
method: methodName,
args: args
};

if (isToSpecifiedServer) {
rpcToSpecifiedServer(client, msg, serverType, routeParam, cb);
} else {
getRouteTarget(client, serverType, msg, routeParam, function(err, serverId) {
if(!!err) {
utils.invokeCallback(cb, err);
} else {
client.rpcInvoke(serverId, msg, cb);
if (err) {
return cb(err);
}

client.rpcInvoke(serverId, msg, cb);
});
}
};
Expand All @@ -308,9 +320,9 @@ var proxyCB = function(client, serviceName, methodName, args, attach, isToSpecif
* @api private
*/
var getRouteTarget = function(client, serverType, msg, routeParam, cb) {
if(!!client.routerType) {
if (!!client.routerType) {
var method;
switch(client.routerType) {
switch (client.routerType) {
case constants.SCHEDULE.ROUNDROBIN:
method = router.rr;
break;
Expand All @@ -328,55 +340,59 @@ var getRouteTarget = function(client, serverType, msg, routeParam, cb) {
break;
}
method.call(null, client, serverType, msg, function(err, serverId) {
utils.invokeCallback(cb, err, serverId);
cb(err, serverId);
});
} else {
var route, target;
if(typeof client.router === 'function') {
} else {
var route, target;
if (typeof client.router === 'function') {
route = client.router;
target = null;
} else if(typeof client.router.route === 'function') {
} else if (typeof client.router.route === 'function') {
route = client.router.route;
target = client.router;
} else {
logger.error('[pomelo-rpc] invalid route function.');
return;
}
route.call(target, routeParam, msg, client._routeContext, function(err, serverId) {
utils.invokeCallback(cb, err, serverId);
cb(err, serverId);
});
}
};

/**
* Rpc to specified server id or servers.
*
* @param client {Object} current client instance.
* @param msg {Object} rpc message.
* @param client {Object} current client instance.
* @param msg {Object} rpc message.
* @param serverType {String} remote server type.
* @param routeParam {Object} mailbox init context parameter.
* @param serverId {Object} mailbox init context parameter.
*
* @api private
*/
var rpcToSpecifiedServer = function(client, msg, serverType, routeParam, cb) {
if(typeof routeParam !== 'string') {
logger.error('[pomelo-rpc] server id is not a string, server id: %j', routeParam);
var rpcToSpecifiedServer = function(client, msg, serverType, serverId, cb) {
if (typeof serverId !== 'string') {
logger.error('[pomelo-rpc] serverId is not a string : %s', serverId);
return;
}
if (routeParam === '*') {
var servers = client._station.servers;
for(var serverId in servers) {
var server = servers[serverId];
if(server.serverType === serverType) {
client.rpcInvoke(serverId, msg, cb);
}
if (serverId === '*') {
var servers = client._routeContext.getServersByType(serverType);
if (!servers) {
logger.error('[pomelo-rpc] serverType %s servers not exist', serverType);
return;
}
return;

async.each(servers, function(server, next) {
var serverId = server['id'];
client.rpcInvoke(serverId, msg, function(err) {
next(err);
});
}, cb);
} else {
client.rpcInvoke(routeParam, msg, cb);
return;
client.rpcInvoke(serverId, msg, cb);
}
};

/**
* Add proxy into array.
*
Expand All @@ -393,14 +409,15 @@ var insertProxy = function(proxies, namespace, serverType, proxy) {
for (var attr in proxy) {
proxies[namespace][serverType][attr] = proxy[attr];
}
} else {
proxies[namespace][serverType] = proxy;
}
else proxies[namespace][serverType] = proxy;
};

/**
* RPC client factory method.
*
* @param {Object} opts client init parameter.
* @param {Object} opts client init parameter.
* opts.context: mail box init parameter,
* opts.router: (optional) rpc message route function, route(routeParam, msg, cb),
* opts.mailBoxFactory: (optional) mail box factory instance.
Expand All @@ -410,4 +427,6 @@ module.exports.create = function(opts) {
return new Client(opts);
};

module.exports.WSMailbox = require('./mailboxes/ws-mailbox');
// module.exports.WSMailbox = require('./mailboxes/ws-mailbox'); // socket.io
// module.exports.WS2Mailbox = require('./mailboxes/ws2-mailbox'); // ws
module.exports.MQTTMailbox = require('./mailboxes/mqtt-mailbox'); // mqtt
Loading

0 comments on commit 3538a97

Please sign in to comment.