Skip to content

Commit

Permalink
the same connection class can now be used for publishing mapreduce, r…
Browse files Browse the repository at this point in the history
…ace and regular jobs
  • Loading branch information
EmielBruijntjes committed Nov 6, 2015
1 parent 753bd0c commit f3cf411
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 12 deletions.
6 changes: 4 additions & 2 deletions connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,15 @@ class Connection :
std::string password = (param.contains("password") ? param["password"] : "guest");
std::string vhost = (param.contains("vhost") ? param["vhost"] : "/");
std::string exchange = (param.contains("exchange") ? param["exchange"] : "");
std::string routingkey = (param.contains("routingkey") ? param["routingkey"] : "mapreduce");
std::string mapreduce = (param.contains("mapreduce") ? param["mapreduce"] : "mapreduce");
std::string races = (param.contains("races") ? param["races"] : "races");
std::string jobs = (param.contains("jobs") ? param["jobs"] : "jobs");

// creating a connection could throw
try
{
// create the actual connection
_core= std::make_shared<Core>(host, user, password, vhost, exchange, routingkey);
_core= std::make_shared<Core>(host, user, password, vhost, exchange, mapreduce, races, jobs);
}
catch (const std::runtime_error &error)
{
Expand Down
50 changes: 44 additions & 6 deletions core.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,13 @@ class Core : private AMQP::TcpHandler
* @param password
* @param vhost
* @param exchange
* @param routingkey
* @param mapreduce
* @param races
* @param jobs
*
* @throws std::runtime_error
*/
Core(const std::string &host, const std::string &user, const std::string &password, const std::string &vhost, std::string &exchange, std::string &routingkey) :
Core(const std::string &host, const std::string &user, const std::string &password, const std::string &vhost, const std::string &exchange, const std::string &mapreduce, const std::string &races, const std::string &jobs) :
_connection(new AMQP::TcpConnection(this, AMQP::Address(host, 5672, ::AMQP::Login(user, password), vhost)))
{
// store all properties in the JSON
Expand All @@ -154,7 +156,9 @@ class Core : private AMQP::TcpHandler
_json.set("password", password);
_json.set("vhost", vhost);
_json.set("exchange", exchange);
_json.set("routingkey", routingkey);
_json.set("mapreduce", mapreduce);
_json.set("races", races);
_json.set("jobs", jobs);

// go run the event loop until the connection is connected
_loop.run(_connection.get());
Expand Down Expand Up @@ -189,12 +193,46 @@ class Core : private AMQP::TcpHandler
while (_connection) _loop.run(_connection.get());
};

/**
* Method to publish a JSON encoded message to the mapreduce queue
* @param json The JSON data to be published
* @return bool
*/
bool mapreduce(const JSON::Object &json)
{
// publish to the mapreduce queue
return publish(_json.c_str("mapreduce"), json);
}

/**
* Method to publish a JSON encoded message to the race queue
* @param json The JSON data to be published
* @return bool
*/
bool race(const JSON::Object &json)
{
// publish to the race queue
return publish(_json.c_str("races"), json);
}

/**
* Method to publish a JSON encoded message to the jobs queue
* @param json The JSON data to be published
* @return bool
*/
bool job(const JSON::Object &json)
{
// publish to the mapreduce queue
return publish(_json.c_str("jobs"), json);
}

/**
* Method to publish a JSON encoded message to the queue
* @param json
* @param queue The name of the queue to publish to
* @param json The JSON data to be published
* @return bool
*/
bool publish(const JSON::Object &json)
bool publish(const char *queue, const JSON::Object &json)
{
// create the connection to the RabbitMQ server
if (!connect()) return false;
Expand All @@ -203,7 +241,7 @@ class Core : private AMQP::TcpHandler
AMQP::TcpChannel channel(_connection.get());

// publish the json
channel.publish(_json.c_str("exchange"), _json.c_str("routingkey"), json.toString());
channel.publish(_json.c_str("exchange"), queue, json.toString());

// do a single step, to hopefully send the message..
//_loop.step(_connection.get());
Expand Down
34 changes: 33 additions & 1 deletion data.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ class Data : public JSON::Object
*/
JSON::Array _input;

/**
* What sort of algorithm are we going to run?
*/
enum {
algorithm_mapreduce,
algorithm_race,
algorithm_job
} _algorithm = algorithm_job;

/**
* Helper class that creates an array with the to-be-included files
*/
Expand Down Expand Up @@ -142,11 +151,13 @@ class Data : public JSON::Object
// set default limits
set("processes", 20);
set("input", _input);

set("modulo", 1);
set("mapper", Executable("mapper", includes, serialized));
set("reducer", Executable("reducer", includes, serialized));
set("finalizer", Executable("finalizer", includes, serialized));

// remember algorithm type
_algorithm = algorithm_mapreduce;
}
// in case we are a race we just set an executable manually etc.
else if (algo.instanceOf("Yothalot\\Race"))
Expand All @@ -155,6 +166,9 @@ class Data : public JSON::Object
set("executable", "php");
set("arguments", JSON::Array({"-r", "exit(YothalotInit('run'));"}));
set("stdin", Stdin(algo));

// remember algorithm type
_algorithm = algorithm_race;
}
}

Expand All @@ -171,6 +185,20 @@ class Data : public JSON::Object
*/
virtual ~Data() {}

/**
* Publish the data to a connection
* @param connection
*/
bool publish(Core *connection) const
{
switch (_algorithm) {
case algorithm_mapreduce: return connection->mapreduce(*this);
case algorithm_race: return connection->race(*this);
case algorithm_job: return connection->job(*this);
default: return false;
}
}

/**
* The directory that is set in the data
* @return std::string
Expand Down Expand Up @@ -220,6 +248,10 @@ class Data : public JSON::Object
object("reducer").object("limit").set("processes", value);
}

/**
* Update max number of finalizers
* @param value
*/
void maxfinalizers(int value)
{
// update JSON
Expand Down
6 changes: 3 additions & 3 deletions jobimpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ class JobImpl
_json.tempqueue(_tempqueue->name());

// now we can publish the job JSON data to the RabbitMQ server
if (_core->publish(_json)) return _started = true;
if (_json.publish(_core.get())) return _started = true;

// the weird situation is that we can not connect to RabbitMQ...
// (really weird because we did manage to create the temp queue...)
Expand Down Expand Up @@ -482,7 +482,7 @@ class JobImpl
if (_started) return true;

// if the job was not yet started, we should do that now
if (!_core->publish(_json)) return false;
if (!_json.publish(_core.get())) return false;

// mark job as started
return _started = true;
Expand All @@ -505,4 +505,4 @@ class JobImpl
{
return _core;
}
};
};

0 comments on commit f3cf411

Please sign in to comment.