Skip to content

Commit

Permalink
Merge pull request #248 from plantinformatics/develop
Browse files Browse the repository at this point in the history
Master Release v2.8.0
  • Loading branch information
Don-Isdale authored Jun 24, 2021
2 parents daae631 + fce5df1 commit ff685ee
Show file tree
Hide file tree
Showing 52 changed files with 2,673 additions and 486 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ URL=$URL_A uploadData aliases.json

## Database Indexes

Refer to 'Database configuration' in doc/notes/docker_setup.md for indexes added to the Feature and Alias database collections which are required for reasonable performance as datasets grown beyond 1k documents.
Refer to 'Database configuration' in doc/notes/database_configuration.md for indexes added to the Feature and Alias database collections which are required for reasonable performance as datasets grown beyond 1k documents.


# Public genetic map references
Expand Down
11 changes: 9 additions & 2 deletions backend/common/models/block.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@ const { getAliases } = require('../utilities/localise-aliases');

var ObjectId = require('mongodb').ObjectID

var cache = require('memory-cache');

/** results-cache has the same API as 'memory-cache', so that can be
* used instead to avoid the need to setup a cache directory, and
* manage cache accumulation.
*/
const cacheLibraryName = '../utilities/results-cache'; // 'memory-cache';
var cache = require(cacheLibraryName);

var SSE = require('express-sse');

Expand Down Expand Up @@ -608,9 +614,10 @@ function blockAddFeatures(db, datasetId, blockId, features, cb) {
/** Logically part of reqStream(), but split out so that it can be called
* directly or via a promise. */
function pipeStream(sse, intervals, useCache, cacheId, filterFunction, res, cursor) {
console.log('pipeStream', sse, intervals, useCache, cacheId);
if (useCache)
cursor.
pipe(new pathsStream.CacheWritable(cacheId));
pipe(new pathsStream.CacheWritable(/*cache,*/ cacheId));

let pipeLine = [cursor];

Expand Down
18 changes: 15 additions & 3 deletions backend/common/models/dataset.js
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ module.exports = function(Dataset) {
}
};

/**
* @param data dataset, with .features with attributes :
* feature.name, .block (blockId), .val, .end (start, end position).
*/
Dataset.tableUpload = function(data, options, cb) {
var models = this.app.models;
var blocks = {};
Expand Down Expand Up @@ -232,12 +236,20 @@ module.exports = function(Dataset) {
});
var array_features = [];
data.features.forEach(function(feature) {
array_features.push({
let value = [feature.val];
if (feature.end !== undefined) {
value.push(feature.end);
}
let f = {
name: feature.name,
value: [feature.val],
value,
value_0: feature.val,
blockId: blocks_by_name[feature.block]
});
};
if (feature.values) {
f.values = feature.values;
}
array_features.push(f);
});
// create new features
return models.Feature.create(array_features);
Expand Down
50 changes: 42 additions & 8 deletions backend/common/models/feature.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,25 @@
var acl = require('../utilities/acl')
const { childProcess } = require('../utilities/child-process');
var upload = require('../utilities/upload');
var { filterBlastResults } = require('../utilities/sequence-search');

/*----------------------------------------------------------------------------*/

/** ids of sessions which have sent request : dnaSequenceSearch */
var sessionIds=[];

/** Map session ID (accessToken.id) to a small integer index.
*/
function sessionIndex(sessionId) {
let index = sessionIds.indexOf(sessionId);
if (index === -1) {
sessionIds.push(sessionId);
index = sessionIds.length - 1;
}
return index;
}

/*----------------------------------------------------------------------------*/

module.exports = function(Feature) {
Feature.search = function(filter, options, cb) {
Expand Down Expand Up @@ -58,36 +76,52 @@ module.exports = function(Feature) {
* @param resultRows
* @param addDataset
* @param datasetName
* @param minLengthOfHit, minPercentIdentity, minPercentCoverage : minimum values to filter results
* @param options
*
* @param cb node response callback
*/
Feature.dnaSequenceSearch = function(data, cb) {
Feature.dnaSequenceSearch = function(data, options, cb) {
const models = this.app.models;

let {dnaSequence, parent, searchType, resultRows, addDataset, datasetName, options} = data;
let {dnaSequence, parent, searchType, resultRows, addDataset, datasetName,
minLengthOfHit, minPercentIdentity, minPercentCoverage
} = data;
// data.options : params for streaming result, used later.
const fnName = 'dnaSequenceSearch';
console.log(fnName, dnaSequence.length, parent, searchType);
/** each user session may have 1 concurrent dnaSequenceSearch.
* Use session id for a unique index for dnaSequence fileName. */
let index = sessionIndex(options.accessToken.id),
queryStringFileName = 'dnaSequence.' + index + '.fasta';
console.log(fnName, dnaSequence.length, parent, searchType, index, queryStringFileName);

/** Receive the results from the Blast.
* @param chunk is a Buffer
* null / undefined indicates child process closed with status 0 (OK) and sent no output.
* @param cb is cbWrap of cb passed to dnaSequenceSearch().
*/
let searchDataOut = (chunk, cb) => {
if (! chunk) {
cb(null, []);
} else
if (chunk.asciiSlice(0,6) === 'Error:') {
cb(new Error(chunk.toString()));
} else {
const
textLines = chunk.toString().split('\n');
textLines = chunk.toString().split('\n')
.filter((textLine) => filterBlastResults(
minLengthOfHit, minPercentIdentity, minPercentCoverage, textLine));
textLines.forEach((textLine) => {
if (textLine !== "") {
console.log(fnName, 'stdout data', "'", textLine, "'");
}
});
if (addDataset) {
let jsonFile='tmp/' + datasetName + '.json';
console.log('before removeExisting "', datasetName, '"', '"', jsonFile, '"');
upload.removeExisting(models, datasetName, /*replaceDataset*/true, cb, loadAfterDelete);
/** same as convertSearchResults2Json() in dnaSequenceSearch.bash */
let datasetNameFull=`${parent}.${datasetName}`;
console.log('before removeExisting "', datasetNameFull, '"', '"', jsonFile, '"');
upload.removeExisting(models, datasetNameFull, /*replaceDataset*/true, cb, loadAfterDelete);

function loadAfterDelete(err) {
upload.loadAfterDeleteCb(
Expand All @@ -106,7 +140,7 @@ module.exports = function(Feature) {
if (true) {
let child = childProcess(
'dnaSequenceSearch.bash',
dnaSequence, true, 'dnaSequence', [parent, searchType, resultRows, addDataset, datasetName], searchDataOut, cb);
dnaSequence, true, queryStringFileName, [parent, searchType, resultRows, addDataset, datasetName], searchDataOut, cb, /*progressive*/ false);
} else {
let features = dev_blastResult;
cb(null, features);
Expand Down Expand Up @@ -141,9 +175,9 @@ module.exports = function(Feature) {
/* Within data : .dnaSequence, and :
{arg: 'parent', type: 'string', required: true},
{arg: 'searchType', type: 'string', required: true},
{arg: "options", type: "object", http: "optionsFromRequest"}
resultRows, addDataset, datasetName
*/
{arg: "options", type: "object", http: "optionsFromRequest"}
],
// http: {verb: 'post'},
returns: {arg: 'features', type: 'array'},
Expand Down
35 changes: 31 additions & 4 deletions backend/common/utilities/child-process.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@ var fs = require('fs');
* @param moreParams array of params to pass as command-line params to
* child process, after [fileName, useFile]
* @param dataOutCb (Buffer chunk, cb) {}
* If child process closes with status 0 (OK) and sent no output, then
* dataOutCb will be called with chunk === null
* @param cb response node callback
* @param progressive true means pass received data back directly to
* dataOutCb, otherwise catenate it and call dataOutCb just once when
* child closes
* @return child
*/
exports.childProcess = (scriptName, postData, useFile, fileName, moreParams, dataOutCb, cb) => {
exports.childProcess = (scriptName, postData, useFile, fileName, moreParams, dataOutCb, cb, progressive) => {
const fnName = 'childProcess';
/** messages from child via file descriptors 3 and 4 are
* collated in these arrays and can be sent back to provide
Expand Down Expand Up @@ -107,12 +112,21 @@ exports.childProcess = (scriptName, postData, useFile, fileName, moreParams, dat
warnings.push(message);
});

child.stdout.on('data', (chunk) => dataOutCb(chunk, cb));
/** output chunks received from child, if progressive. */
let outputs = [];
let outCb = progressive ?
(chunk) => dataOutCb(chunk, cb) :
(chunk) => outputs.push(chunk);
child.stdout.on('data', outCb);

// since these are streams, you can pipe them elsewhere
// child.stderr.pipe(dest);
child.on('close', (code) => {
console.log('child process exited with code', code);
if (! progressive && outputs.length) {
let combined = Buffer.concat(outputs);
dataOutCb(combined, cb);
}
if (code) {
const error = Error("Failed processing file '" + fileName + "'.");
cb(error);
Expand All @@ -126,7 +140,12 @@ exports.childProcess = (scriptName, postData, useFile, fileName, moreParams, dat
const message = 'Processed file ' + fileName;
if (child.killed) {
cb(null, message, true);
} // else check again after timeout
} else { // return empty data result
dataOutCb(null, cb);
/* it would be equivalent to do : cb(null, [], true);
* dataOutCb() may have completion actions.
*/
}
}
});

Expand All @@ -135,9 +154,15 @@ exports.childProcess = (scriptName, postData, useFile, fileName, moreParams, dat

/*----------------------------------------------------------------------------*/

/*
* childProcess() was factored out of Dataset.upload() in models/dataset.js
* The following is the remainder which was not used in childProcess(),
* i.e. not common, and can replace the use of spawn() there.
*/

/* dataset upload */
function factored(msg, cb) {
exports.childProcess('uploadSpreadsheet.bash', msg.data, true, msg.fileName, dataOutUpload, cb);
exports.childProcess('uploadSpreadsheet.bash', msg.data, true, msg.fileName, dataOutUpload, cb, /*progressive*/ false);
}

// msg file param from API request {fileName, data, replaceDataset}
Expand Down Expand Up @@ -178,3 +203,5 @@ let dataOutUpload = (chunk, cb) => {
}
});
};

/*----------------------------------------------------------------------------*/
33 changes: 33 additions & 0 deletions backend/common/utilities/results-cache.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
let flatCache = require('flat-cache');


/* global require */

/* global exports */

/*----------------------------------------------------------------------------*/


let cache = flatCache.load('resultsCache');

/*----------------------------------------------------------------------------*/


/** Expose the same API as memory-cache, to enable an easy substitution.
*/

exports.get = (key) => {
let cacheContent = cache.getKey(key);
return cacheContent;
};

exports.put = (key, body) => {
cache.setKey(key, body);
/** https://github.com/royriojas/flat-cache#readme : "Non visited
* keys are removed when cache.save() is called" if noPrune is not
* true
*/
cache.save(/*noPrune*/ true);
};

/*----------------------------------------------------------------------------*/
54 changes: 54 additions & 0 deletions backend/common/utilities/sequence-search.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/* global require */

/* global exports */

/*----------------------------------------------------------------------------*/

/*
* Blast Output Columns :
query ID, subject ID, % identity, length of HSP (hit), # mismatches, # gaps, query start, query end, subject start, subject end, e-value, score, query length, subject length
* column names :
0 name
1 chr
2 pcIdentity
3 lengthOfHspHit
4 numMismatches
5 numGaps
6 queryStart
7 queryEnd
8 pos
9 end
10 eValue
11 score
12 queryLength
13 subjectLength
*/

/** Identify the columns of blast output result
*/
const
c_name = 0, c_chr = 1, c_pcIdentity = 2, c_lengthOfHspHit = 3,
c_pos = 8, c_end = 9,
c_queryLength = 12;


/** Filter Blast Search output results.
*
* coverage = length of HSP/query length *100
*/
exports.filterBlastResults = (
minLengthOfHit, minPercentIdentity, minPercentCoverage, line) => {
let ok,
cols = line.split('\t');
if (cols && cols.length > c_end) {
ok = (+cols[c_pcIdentity] >= +minPercentIdentity) &&
(+cols[c_lengthOfHspHit] >= +minLengthOfHit) &&
(+cols[c_queryLength] && (100 * +cols[c_lengthOfHspHit] / +cols[c_queryLength] >= +minPercentCoverage));
}
console.log('filterBlastResults', ok, minLengthOfHit, minPercentIdentity, minPercentCoverage, line, cols);
return ok;
};




35 changes: 35 additions & 0 deletions backend/scripts/blastServer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from flask import Flask
from flask_executor import Executor
from flask_shell2http import Shell2HTTP


import sys


# python's inbuilt logging module
import logging
# get the flask_shell2http logger
logger = logging.getLogger("flask_shell2http")
# create new handler
handler = logging.StreamHandler(sys.stdout)
logger.addHandler(handler)
# log messages of severity DEBUG or lower to the console
logger.setLevel(logging.DEBUG) # this is really important!



# Flask application instance
app = Flask(__name__)

executor = Executor(app)
executor.init_app(app)
shell2http = Shell2HTTP(app=app, executor=executor, base_url_prefix="/commands/")
shell2http.init_app(app, executor)

def my_callback_fn(context, future):
# optional user-defined callback function
print(context, future.result())

# callback_fn=my_callback_fn,
shell2http.register_command(endpoint="blastn", command_name="/home/ec2-user/scripts/blastn_cont.bash", callback_fn=my_callback_fn, decorators=[])

Loading

0 comments on commit ff685ee

Please sign in to comment.