diff --git a/.eslintrc b/.eslintrc index 09f051f6d..9734814f5 100644 --- a/.eslintrc +++ b/.eslintrc @@ -58,7 +58,7 @@ "no-trailing-spaces": ["warn"], "no-undef": ["error"], "no-unexpected-multiline": ["warn"], - "no-unused-vars": ["warn", {"caughtErrors":"all", "caughtErrorsIgnorePattern": "^unused($|[A-Z].*$)"}], + "no-unused-vars": ["warn", {"caughtErrors":"all", "caughtErrorsIgnorePattern": "^unused($|[A-Z].*$)", "argsIgnorePattern": "^unused($|[A-Z].*$)", "varsIgnorePattern": "^unused($|[A-Z].*$)" }], "no-use-before-define": ["error", {"functions":false}], "one-var": ["warn", "never"], "quotes": ["warn", "single", {"avoidEscape":false, "allowTemplateLiterals":true}], diff --git a/lib/waterline/methods/stream.js b/lib/waterline/methods/stream.js index 248c14563..fcc4aa447 100644 --- a/lib/waterline/methods/stream.js +++ b/lib/waterline/methods/stream.js @@ -10,12 +10,12 @@ var forgeStageTwoQuery = require('../utils/query/forge-stage-two-query'); var getQueryModifierMethods = require('../utils/query/get-query-modifier-methods'); var verifyModelMethodContext = require('../utils/query/verify-model-method-context'); - /** * Module constants */ var DEFERRED_METHODS = getQueryModifierMethods('stream'); +var STRIP_COMMENTS_RX = /(\/\/.*$)|(\/\*[\s\S]*?\*\/)|(\s*=[^,\)]*(('(?:\\'|[^'\r\n])*')|("(?:\\"|[^"\r\n])*"))|(\s*=[^,\)]*))/mg; @@ -341,6 +341,15 @@ module.exports = function stream( /* criteria?, eachRecordFn?, explicitCbMaybe?, // once. If it's eachRecordFn, we'll call it once per record. (function _makeCallOrCallsToAppropriateIteratee(proceed){ + // Check if the iteratee declares a callback parameter + var seemsToExpectCallback = (function(){ + var fn = query.eachBatchFn || query.eachRecordFn; + var fnStr = fn.toString().replace(STRIP_COMMENTS_RX, ''); + var parametersAsString = fnStr.slice(fnStr.indexOf('(')+1, fnStr.indexOf(')')); + // console.log(':seemsToExpectCallback:',parametersAsString, !!parametersAsString.match(/\,\s*([^,\{\}\[\]\s]+)\s*$/)); + return !! parametersAsString.match(/\,\s*([^,\{\}\[\]\s]+)\s*$/); + })();//† + // If an `eachBatchFn` iteratee was provided, we'll call it. // > At this point we already know it's a function, because // > we validated usage at the very beginning. @@ -351,10 +360,9 @@ module.exports = function stream( /* criteria?, eachRecordFn?, explicitCbMaybe?, // that occur after the first. var didIterateeAlreadyHalt; try { - // TODO: handle stream iteratees with no declared callback parameter var promiseMaybe = query.eachBatchFn(batchOfRecords, function (err) { - if (err) { return proceed(err); } - + if (!seemsToExpectCallback) { return proceed(new Error('Unexpected attempt to invoke callback. Since this per-batch iteratee function does not appear to expect a callback parameter, this stub callback was provided instead. Please either explicitly list the callback parameter among the arguments or change this code to no longer use a callback.')); }//• + if (err) { return proceed(err); }//• if (didIterateeAlreadyHalt) { console.warn( 'Warning: The per-batch iteratee provided to `.stream()` triggered its callback \n'+ @@ -363,15 +371,24 @@ module.exports = function stream( /* criteria?, eachRecordFn?, explicitCbMaybe?, ); return; }//-• - didIterateeAlreadyHalt = true; - return proceed(); });//_∏_ - // Take care of unhandled promise rejections from `await`. + // Take care of unhandled promise rejections from `await` (if appropriate) if (query.eachBatchFn.constructor.name === 'AsyncFunction') { + if (!seemsToExpectCallback) { + promiseMaybe = promiseMaybe.then(function(){ + didIterateeAlreadyHalt = true; + proceed(); + });//_∏_ + }//fi promiseMaybe.catch(function(e){ proceed(e); });//_∏_ + } else { + if (!seemsToExpectCallback) { + didIterateeAlreadyHalt = true; + return proceed(); + } } } catch (e) { return proceed(e); }//>-• @@ -391,8 +408,8 @@ module.exports = function stream( /* criteria?, eachRecordFn?, explicitCbMaybe?, // that occur after the first. var didIterateeAlreadyHalt; try { - // TODO: handle stream iteratees with no declared callback parameter var promiseMaybe = query.eachRecordFn(record, function (err) { + if (!seemsToExpectCallback) { return next(new Error('Unexpected attempt to invoke callback. Since this per-record iteratee function does not appear to expect a callback parameter, this stub callback was provided instead. Please either explicitly list the callback parameter among the arguments or change this code to no longer use a callback.')); }//• if (err) { return next(err); } if (didIterateeAlreadyHalt) { @@ -410,10 +427,22 @@ module.exports = function stream( /* criteria?, eachRecordFn?, explicitCbMaybe?, });//_∏_ - // Take care of unhandled promise rejections from `await`. + // Take care of unhandled promise rejections from `await` (if appropriate) if (query.eachRecordFn.constructor.name === 'AsyncFunction') { + if (!seemsToExpectCallback) { + promiseMaybe = promiseMaybe.then(function(){ + didIterateeAlreadyHalt = true; + next(); + });//_∏_ + }//fi promiseMaybe.catch(function(e){ next(e); });//_∏_ - } + } else { + if (!seemsToExpectCallback) { + didIterateeAlreadyHalt = true; + return next(); + } + }//fl + } catch (e) { return next(e); } },// ~∞%°