From ecd0d62b601ccfa5689c23b55f7dc6ade3c690cf Mon Sep 17 00:00:00 2001 From: Piero Toffanin Date: Mon, 27 May 2019 11:37:49 -0400 Subject: [PATCH] Task CLI commands --- admincli.js | 87 ++++++++++++++++++++++++++++++++++++++++++-- libs/classes/Node.js | 51 ++++++++++++++++++++++++-- libs/routetable.js | 18 ++++++++- 3 files changed, 146 insertions(+), 10 deletions(-) diff --git a/admincli.js b/admincli.js index 077a193..f52d403 100644 --- a/admincli.js +++ b/admincli.js @@ -20,6 +20,7 @@ const net = require('net'); const package_info = require('./package_info'); const nodes = require('./libs/nodes'); const routetable = require('./libs/routetable'); +const async = require('async'); module.exports = { create: function(options){ @@ -52,6 +53,12 @@ module.exports = { const printNode = (socket, i, node) => { socket.write(`${(i + 1)}) ${node.toString()} ${node.isOnline() ? '[online]' : '[offline]'} [${node.getTaskQueueCount()}/${node.getMaxParallelTasks()}] ${node.isLocked() ? " [L]" : ""}\r\n`); }; + const prettyJson = (json) => { + return JSON.stringify(json, null, 2); + }; + const jsonResponse = (json) => { + return socket.write(prettyJson(json) + '\r\n'); + }; // Identify this client @@ -70,8 +77,13 @@ module.exports = { printCaret(); // Handle incoming messages from clients. + let lastInput = ""; socket.on('data', async function (data) { - const parts = data.toString().split(" ").map(p => p.trim()); + let input = data.toString(); + if (input.trim() === "!!") input = lastInput; + else lastInput = input; + + const parts = input.split(" ").map(p => p.trim()); const command = parts[0].toLocaleUpperCase(); let args = parts.slice(1, parts.length); @@ -91,7 +103,14 @@ module.exports = { socket.write("NODE UNLOCK - Resume forwarding tasks to this node\r\n"); socket.write("NODE UPDATE - Update all nodes info\r\n"); socket.write("NODE BEST - Show best node for the number of images\r\n"); - socket.write("ROUTE INFO - Find route information\r\n"); + socket.write("ROUTE INFO - Find route information for task\r\n"); + socket.write("ROUTE LIST [node number] - List routes\r\n"); + socket.write("TASK LIST [node number] - List tasks\r\n"); + socket.write("TASK INFO - View task info\r\n"); + socket.write("TASK OUTPUT [lines] - View task output\r\n"); + socket.write("TASK CANCEL - Cancel task\r\n"); + socket.write("TASK REMOVE - Remove task\r\n"); + socket.write("!! - Repeat last command\r\n"); }else if (command === "NODE" && args.length > 0){ const subcommand = args[0].toLocaleUpperCase(); args = args.slice(1, args.length); @@ -126,7 +145,7 @@ module.exports = { const [ number ] = args; const node = nodes.nth(number); if (node){ - socket.write(JSON.stringify(node.getInfo()) + "\r\n"); + jsonResponse(node.getInfo()); }else invalid(); }else if (subcommand === "BEST" && args.length >= 1){ const [ numImages ] = args; @@ -147,8 +166,68 @@ module.exports = { const [ taskId ] = args; const route = await routetable.lookup(taskId); if (route){ - socket.write(JSON.stringify(route) + "\r\n"); + jsonResponse(route); }else invalid(); + }else if (subcommand === "LIST"){ + const [ number ] = args; + let node = null; + if (number !== undefined) node = nodes.nth(number); + if (number !== undefined && !node) invalid(); + else{ + jsonResponse(await routetable.get(node)); + } + } + }else if (command === "TASK" && args.length > 0){ + const subcommand = args[0].toLocaleUpperCase(); + args = args.slice(1, args.length); + if (subcommand === "LIST"){ + const [ number ] = args; + let node = null; + if (number !== undefined) node = nodes.nth(number); + if (number !== undefined && !node) invalid(); + else{ + const routes = await routetable.get(node); + const tasks = []; + + await new Promise((resolve) => { + async.each(Object.keys(routes), async (taskId, cb) => { + const taskInfo = await (routes[taskId]).node.taskInfo(taskId); + if (taskInfo !== null && taskInfo.error === undefined) tasks.push(taskInfo); + cb(); + }, resolve); + }); + + jsonResponse(tasks); + } + }else if (subcommand === "INFO" && args.length >= 1){ + const [ taskId ] = args; + const route = await routetable.lookup(taskId); + if (!route) invalid(); + else{ + const taskInfo = await route.node.taskInfo(taskId); + jsonResponse(taskInfo); + } + }else if (subcommand === "OUTPUT" && args.length >= 1){ + const [ taskId, line ] = args; + const route = await routetable.lookup(taskId); + if (!route) invalid(); + else{ + const taskOutput = await route.node.taskOutput(taskId, line); + if (Array.isArray(taskOutput)){ + socket.write(taskOutput.join("\r\n") + '\r\n'); + }else{ + jsonResponse(taskOutput); + } + } + }else if (["CANCEL", "REMOVE"].indexOf(subcommand) !== -1 && args.length >= 1){ + const [ taskId ] = args; + const route = await routetable.lookup(taskId); + const func = subcommand === 'CANCEL' ? + 'taskCancel' : + 'taskRemove'; + + if (!route) invalid(); + jsonResponse(await (route.node[func])(taskId)); } }else{ invalid(); diff --git a/libs/classes/Node.js b/libs/classes/Node.js index 7e53e78..95b318e 100644 --- a/libs/classes/Node.js +++ b/libs/classes/Node.js @@ -47,13 +47,56 @@ module.exports = class Node{ } } - urlFor(pathname){ + async taskInfo(taskId){ + return this.getRequest(`/task/${taskId}/info`); + } + + async taskOutput(taskId, line = 0){ + return this.getRequest(`/task/${taskId}/output`, {line}); + } + + async taskCancel(taskId){ + return this.postRequest(`/task/cancel`, {uuid: taskId}); + } + + async taskRemove(taskId){ + return this.postRequest(`/task/remove`, {uuid: taskId}); + } + + async postRequest(url, formData = {}, query = {}){ + try{ + let response = await axios.post(this.urlFor(url, query), formData, { + timeout: this.timeout, + }); + if (response.status === 200){ + return response.data; + }else{ + throw new Error(`Got response code: ${response.status}`); + } + }catch(e){ + return {error: e.message}; + } + } + + async getRequest(url, query = {}){ + try{ + let response = await axios.get(this.urlFor(url, query), { timeout: this.timeout }); + if (response.status === 200){ + return response.data; + }else{ + throw new Error(`Got response code: ${response.status}`); + } + }catch(e){ + return {error: e.message}; + } + } + + urlFor(pathname, query = {}){ const { hostname, port, token } = this.nodeData; - const query = {}; + const proto = port === 443 ? 'https' : 'http'; if (token) query.token = token; - // TODO: add SSL support - return url.format({protocol: 'http', hostname, port, pathname, query}); + return url.format({protocol: proto, hostname, port, pathname, query}); } hostname(){ diff --git a/libs/routetable.js b/libs/routetable.js index 49d69f8..4d5ee51 100644 --- a/libs/routetable.js +++ b/libs/routetable.js @@ -30,8 +30,6 @@ module.exports = { initialize: async function(){ routes = await this.loadFromDisk(); - logger.info(`Loaded ${Object.keys(routes).length} routes`); - const cleanup = () => { const expires = 1000 * 60 * 60 * 24 * 5; // 5 days @@ -44,7 +42,10 @@ module.exports = { this.saveToDisk(); }; + cleanup(); setInterval(cleanup, 1000 * 60 * 60); + + logger.info(`Loaded ${Object.keys(routes).length} routes`); }, add: async function(taskId, node, token){ @@ -70,6 +71,19 @@ module.exports = { return null; }, + get: async function(node = null){ + if (!node) return routes; + else{ + const result = {}; + for (let taskId in routes){ + if (routes[taskId].node === node){ + result[taskId] = routes[taskId]; + } + } + return result; + } + }, + lookupNode: async function(taskId){ const entry = await this.lookup(taskId); if (entry) return entry.node;