Skip to content

Commit

Permalink
Task CLI commands
Browse files Browse the repository at this point in the history
  • Loading branch information
pierotofy committed May 27, 2019
1 parent f8e638e commit ecd0d62
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 10 deletions.
87 changes: 83 additions & 4 deletions admincli.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const net = require('net');
const package_info = require('./package_info');
const nodes = require('./libs/nodes');
const routetable = require('./libs/routetable');
const async = require('async');

module.exports = {
create: function(options){
Expand Down Expand Up @@ -52,6 +53,12 @@ module.exports = {
const printNode = (socket, i, node) => {
socket.write(`${(i + 1)}) ${node.toString()} ${node.isOnline() ? '[online]' : '[offline]'} [${node.getTaskQueueCount()}/${node.getMaxParallelTasks()}] <version ${node.getVersion()}>${node.isLocked() ? " [L]" : ""}\r\n`);
};
const prettyJson = (json) => {
return JSON.stringify(json, null, 2);
};
const jsonResponse = (json) => {
return socket.write(prettyJson(json) + '\r\n');
};


// Identify this client
Expand All @@ -70,8 +77,13 @@ module.exports = {
printCaret();

// Handle incoming messages from clients.
let lastInput = "";
socket.on('data', async function (data) {
const parts = data.toString().split(" ").map(p => p.trim());
let input = data.toString();
if (input.trim() === "!!") input = lastInput;
else lastInput = input;

const parts = input.split(" ").map(p => p.trim());
const command = parts[0].toLocaleUpperCase();
let args = parts.slice(1, parts.length);

Expand All @@ -91,7 +103,14 @@ module.exports = {
socket.write("NODE UNLOCK <node number> - Resume forwarding tasks to this node\r\n");
socket.write("NODE UPDATE - Update all nodes info\r\n");
socket.write("NODE BEST <number of images> - Show best node for the number of images\r\n");
socket.write("ROUTE INFO <taskId> - Find route information\r\n");
socket.write("ROUTE INFO <taskId> - Find route information for task\r\n");
socket.write("ROUTE LIST [node number] - List routes\r\n");
socket.write("TASK LIST [node number] - List tasks\r\n");
socket.write("TASK INFO <taskId> - View task info\r\n");
socket.write("TASK OUTPUT <taskId> [lines] - View task output\r\n");
socket.write("TASK CANCEL <taskId> - Cancel task\r\n");
socket.write("TASK REMOVE <taskId> - Remove task\r\n");
socket.write("!! - Repeat last command\r\n");
}else if (command === "NODE" && args.length > 0){
const subcommand = args[0].toLocaleUpperCase();
args = args.slice(1, args.length);
Expand Down Expand Up @@ -126,7 +145,7 @@ module.exports = {
const [ number ] = args;
const node = nodes.nth(number);
if (node){
socket.write(JSON.stringify(node.getInfo()) + "\r\n");
jsonResponse(node.getInfo());
}else invalid();
}else if (subcommand === "BEST" && args.length >= 1){
const [ numImages ] = args;
Expand All @@ -147,8 +166,68 @@ module.exports = {
const [ taskId ] = args;
const route = await routetable.lookup(taskId);
if (route){
socket.write(JSON.stringify(route) + "\r\n");
jsonResponse(route);
}else invalid();
}else if (subcommand === "LIST"){
const [ number ] = args;
let node = null;
if (number !== undefined) node = nodes.nth(number);
if (number !== undefined && !node) invalid();
else{
jsonResponse(await routetable.get(node));
}
}
}else if (command === "TASK" && args.length > 0){
const subcommand = args[0].toLocaleUpperCase();
args = args.slice(1, args.length);
if (subcommand === "LIST"){
const [ number ] = args;
let node = null;
if (number !== undefined) node = nodes.nth(number);
if (number !== undefined && !node) invalid();
else{
const routes = await routetable.get(node);
const tasks = [];

await new Promise((resolve) => {
async.each(Object.keys(routes), async (taskId, cb) => {
const taskInfo = await (routes[taskId]).node.taskInfo(taskId);
if (taskInfo !== null && taskInfo.error === undefined) tasks.push(taskInfo);
cb();
}, resolve);
});

jsonResponse(tasks);
}
}else if (subcommand === "INFO" && args.length >= 1){
const [ taskId ] = args;
const route = await routetable.lookup(taskId);
if (!route) invalid();
else{
const taskInfo = await route.node.taskInfo(taskId);
jsonResponse(taskInfo);
}
}else if (subcommand === "OUTPUT" && args.length >= 1){
const [ taskId, line ] = args;
const route = await routetable.lookup(taskId);
if (!route) invalid();
else{
const taskOutput = await route.node.taskOutput(taskId, line);
if (Array.isArray(taskOutput)){
socket.write(taskOutput.join("\r\n") + '\r\n');
}else{
jsonResponse(taskOutput);
}
}
}else if (["CANCEL", "REMOVE"].indexOf(subcommand) !== -1 && args.length >= 1){
const [ taskId ] = args;
const route = await routetable.lookup(taskId);
const func = subcommand === 'CANCEL' ?
'taskCancel' :
'taskRemove';

if (!route) invalid();
jsonResponse(await (route.node[func])(taskId));
}
}else{
invalid();
Expand Down
51 changes: 47 additions & 4 deletions libs/classes/Node.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,56 @@ module.exports = class Node{
}
}

urlFor(pathname){
async taskInfo(taskId){
return this.getRequest(`/task/${taskId}/info`);
}

async taskOutput(taskId, line = 0){
return this.getRequest(`/task/${taskId}/output`, {line});
}

async taskCancel(taskId){
return this.postRequest(`/task/cancel`, {uuid: taskId});
}

async taskRemove(taskId){
return this.postRequest(`/task/remove`, {uuid: taskId});
}

async postRequest(url, formData = {}, query = {}){
try{
let response = await axios.post(this.urlFor(url, query), formData, {
timeout: this.timeout,
});
if (response.status === 200){
return response.data;
}else{
throw new Error(`Got response code: ${response.status}`);
}
}catch(e){
return {error: e.message};
}
}

async getRequest(url, query = {}){
try{
let response = await axios.get(this.urlFor(url, query), { timeout: this.timeout });
if (response.status === 200){
return response.data;
}else{
throw new Error(`Got response code: ${response.status}`);
}
}catch(e){
return {error: e.message};
}
}

urlFor(pathname, query = {}){
const { hostname, port, token } = this.nodeData;
const query = {};
const proto = port === 443 ? 'https' : 'http';
if (token) query.token = token;

// TODO: add SSL support
return url.format({protocol: 'http', hostname, port, pathname, query});
return url.format({protocol: proto, hostname, port, pathname, query});
}

hostname(){
Expand Down
18 changes: 16 additions & 2 deletions libs/routetable.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ module.exports = {
initialize: async function(){
routes = await this.loadFromDisk();

logger.info(`Loaded ${Object.keys(routes).length} routes`);

const cleanup = () => {
const expires = 1000 * 60 * 60 * 24 * 5; // 5 days

Expand All @@ -44,7 +42,10 @@ module.exports = {
this.saveToDisk();
};

cleanup();
setInterval(cleanup, 1000 * 60 * 60);

logger.info(`Loaded ${Object.keys(routes).length} routes`);
},

add: async function(taskId, node, token){
Expand All @@ -70,6 +71,19 @@ module.exports = {
return null;
},

get: async function(node = null){
if (!node) return routes;
else{
const result = {};
for (let taskId in routes){
if (routes[taskId].node === node){
result[taskId] = routes[taskId];
}
}
return result;
}
},

lookupNode: async function(taskId){
const entry = await this.lookup(taskId);
if (entry) return entry.node;
Expand Down

0 comments on commit ecd0d62

Please sign in to comment.