From f3cf4115d766a93c0abeba50386266e3001ab3ac Mon Sep 17 00:00:00 2001 From: Emiel Bruijntjes Date: Fri, 6 Nov 2015 11:20:13 +0100 Subject: [PATCH] the same connection class can now be used for publishing mapreduce, race and regular jobs --- connection.h | 6 ++++-- core.h | 50 ++++++++++++++++++++++++++++++++++++++++++++------ data.h | 34 +++++++++++++++++++++++++++++++++- jobimpl.h | 6 +++--- 4 files changed, 84 insertions(+), 12 deletions(-) diff --git a/connection.h b/connection.h index 4019ba9..c2d6ad2 100644 --- a/connection.h +++ b/connection.h @@ -58,13 +58,15 @@ class Connection : std::string password = (param.contains("password") ? param["password"] : "guest"); std::string vhost = (param.contains("vhost") ? param["vhost"] : "/"); std::string exchange = (param.contains("exchange") ? param["exchange"] : ""); - std::string routingkey = (param.contains("routingkey") ? param["routingkey"] : "mapreduce"); + std::string mapreduce = (param.contains("mapreduce") ? param["mapreduce"] : "mapreduce"); + std::string races = (param.contains("races") ? param["races"] : "races"); + std::string jobs = (param.contains("jobs") ? param["jobs"] : "jobs"); // creating a connection could throw try { // create the actual connection - _core= std::make_shared(host, user, password, vhost, exchange, routingkey); + _core= std::make_shared(host, user, password, vhost, exchange, mapreduce, races, jobs); } catch (const std::runtime_error &error) { diff --git a/core.h b/core.h index e1475b9..4143cdb 100644 --- a/core.h +++ b/core.h @@ -141,11 +141,13 @@ class Core : private AMQP::TcpHandler * @param password * @param vhost * @param exchange - * @param routingkey + * @param mapreduce + * @param races + * @param jobs * * @throws std::runtime_error */ - Core(const std::string &host, const std::string &user, const std::string &password, const std::string &vhost, std::string &exchange, std::string &routingkey) : + Core(const std::string &host, const std::string &user, const std::string &password, const std::string &vhost, const std::string &exchange, const std::string &mapreduce, const std::string &races, const std::string &jobs) : _connection(new AMQP::TcpConnection(this, AMQP::Address(host, 5672, ::AMQP::Login(user, password), vhost))) { // store all properties in the JSON @@ -154,7 +156,9 @@ class Core : private AMQP::TcpHandler _json.set("password", password); _json.set("vhost", vhost); _json.set("exchange", exchange); - _json.set("routingkey", routingkey); + _json.set("mapreduce", mapreduce); + _json.set("races", races); + _json.set("jobs", jobs); // go run the event loop until the connection is connected _loop.run(_connection.get()); @@ -189,12 +193,46 @@ class Core : private AMQP::TcpHandler while (_connection) _loop.run(_connection.get()); }; + /** + * Method to publish a JSON encoded message to the mapreduce queue + * @param json The JSON data to be published + * @return bool + */ + bool mapreduce(const JSON::Object &json) + { + // publish to the mapreduce queue + return publish(_json.c_str("mapreduce"), json); + } + + /** + * Method to publish a JSON encoded message to the race queue + * @param json The JSON data to be published + * @return bool + */ + bool race(const JSON::Object &json) + { + // publish to the race queue + return publish(_json.c_str("races"), json); + } + + /** + * Method to publish a JSON encoded message to the jobs queue + * @param json The JSON data to be published + * @return bool + */ + bool job(const JSON::Object &json) + { + // publish to the mapreduce queue + return publish(_json.c_str("jobs"), json); + } + /** * Method to publish a JSON encoded message to the queue - * @param json + * @param queue The name of the queue to publish to + * @param json The JSON data to be published * @return bool */ - bool publish(const JSON::Object &json) + bool publish(const char *queue, const JSON::Object &json) { // create the connection to the RabbitMQ server if (!connect()) return false; @@ -203,7 +241,7 @@ class Core : private AMQP::TcpHandler AMQP::TcpChannel channel(_connection.get()); // publish the json - channel.publish(_json.c_str("exchange"), _json.c_str("routingkey"), json.toString()); + channel.publish(_json.c_str("exchange"), queue, json.toString()); // do a single step, to hopefully send the message.. //_loop.step(_connection.get()); diff --git a/data.h b/data.h index 60650f3..7f3413a 100644 --- a/data.h +++ b/data.h @@ -32,6 +32,15 @@ class Data : public JSON::Object */ JSON::Array _input; + /** + * What sort of algorithm are we going to run? + */ + enum { + algorithm_mapreduce, + algorithm_race, + algorithm_job + } _algorithm = algorithm_job; + /** * Helper class that creates an array with the to-be-included files */ @@ -142,11 +151,13 @@ class Data : public JSON::Object // set default limits set("processes", 20); set("input", _input); - set("modulo", 1); set("mapper", Executable("mapper", includes, serialized)); set("reducer", Executable("reducer", includes, serialized)); set("finalizer", Executable("finalizer", includes, serialized)); + + // remember algorithm type + _algorithm = algorithm_mapreduce; } // in case we are a race we just set an executable manually etc. else if (algo.instanceOf("Yothalot\\Race")) @@ -155,6 +166,9 @@ class Data : public JSON::Object set("executable", "php"); set("arguments", JSON::Array({"-r", "exit(YothalotInit('run'));"})); set("stdin", Stdin(algo)); + + // remember algorithm type + _algorithm = algorithm_race; } } @@ -171,6 +185,20 @@ class Data : public JSON::Object */ virtual ~Data() {} + /** + * Publish the data to a connection + * @param connection + */ + bool publish(Core *connection) const + { + switch (_algorithm) { + case algorithm_mapreduce: return connection->mapreduce(*this); + case algorithm_race: return connection->race(*this); + case algorithm_job: return connection->job(*this); + default: return false; + } + } + /** * The directory that is set in the data * @return std::string @@ -220,6 +248,10 @@ class Data : public JSON::Object object("reducer").object("limit").set("processes", value); } + /** + * Update max number of finalizers + * @param value + */ void maxfinalizers(int value) { // update JSON diff --git a/jobimpl.h b/jobimpl.h index 3b83677..64dd3a6 100644 --- a/jobimpl.h +++ b/jobimpl.h @@ -414,7 +414,7 @@ class JobImpl _json.tempqueue(_tempqueue->name()); // now we can publish the job JSON data to the RabbitMQ server - if (_core->publish(_json)) return _started = true; + if (_json.publish(_core.get())) return _started = true; // the weird situation is that we can not connect to RabbitMQ... // (really weird because we did manage to create the temp queue...) @@ -482,7 +482,7 @@ class JobImpl if (_started) return true; // if the job was not yet started, we should do that now - if (!_core->publish(_json)) return false; + if (!_json.publish(_core.get())) return false; // mark job as started return _started = true; @@ -505,4 +505,4 @@ class JobImpl { return _core; } -}; \ No newline at end of file +};