|
|
@@ -5,8 +5,6 @@
|
|
|
* If `inputs` is given, it will run the `inputs` through the `pipeline` and call the `callback` with the results.
|
|
|
* If `inputs` is omitted, it will return an "aggregator" function so you can reuse the given `pipeline` against various `inputs`.
|
|
|
*
|
|
|
- * NOTE: you should be mindful about reusing the same `pipeline` against disparate `inputs` because document coming in can alter the state of it's `DocumentSource`s
|
|
|
- *
|
|
|
* @method aggregate
|
|
|
* @namespace mungedb
|
|
|
* @module mungedb-aggregate
|
|
|
@@ -17,12 +15,12 @@
|
|
|
* @param callback.err {Error} The Error if one occurred
|
|
|
* @param callback.docs {Array} The resulting documents
|
|
|
**/
|
|
|
-exports = module.exports = function aggregate(pipelineObj, ctx, inputs, callback) { // function-style interface; i.e., return the utility function directly as the require
|
|
|
+exports = module.exports = function aggregate(pipelineObj, ctx, inputs, callback) { // export directly for a function-style interface
|
|
|
var DocumentSource = exports.pipeline.documentSources.DocumentSource;
|
|
|
if (ctx instanceof Array || ctx instanceof DocumentSource) callback = inputs, inputs = ctx, ctx = {};
|
|
|
ctx = ctx || {};
|
|
|
var parsePipelineInst;
|
|
|
-
|
|
|
+
|
|
|
try {
|
|
|
//Set up the command Object
|
|
|
pipelineObj = (pipelineObj instanceof Array) ? {pipeline: pipelineObj} : pipelineObj;
|
|
|
@@ -35,9 +33,10 @@ exports = module.exports = function aggregate(pipelineObj, ctx, inputs, callback
|
|
|
parsePipelineInst = exports.pipeline.Pipeline.parseCommand(pipelineObj, ctx);
|
|
|
} catch(ex) {
|
|
|
// Error handling is funky since this can be used multiple different ways
|
|
|
- if (callback){
|
|
|
- if (inputs) return callback(ex);
|
|
|
- else {
|
|
|
+ if (callback) {
|
|
|
+ if (inputs) {
|
|
|
+ return callback(ex);
|
|
|
+ } else {
|
|
|
return function aggregator(ctx, inputs, callback) {
|
|
|
if (ctx instanceof Array || ctx instanceof DocumentSource) callback = inputs, inputs = ctx, ctx = {};
|
|
|
return callback(ex);
|
|
|
@@ -47,34 +46,34 @@ exports = module.exports = function aggregate(pipelineObj, ctx, inputs, callback
|
|
|
throw ex;
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
if (pipelineObj.explain){
|
|
|
if (inputs){
|
|
|
- ctx.ns = inputs; //NOTE: use the given `inputs` directly; hacking so that the cursor source will be our inputs instead of the context namespace
|
|
|
+ ctx.ns = inputs; //NOTE: use given `inputs` directly; hacking cursor source to use our inputs instead of the ctx namespace
|
|
|
exports.pipeline.PipelineD.prepareCursorSource(parsePipelineInst, ctx);
|
|
|
}
|
|
|
return parsePipelineInst.writeExplainOps();
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
var aggregator = function aggregator(ctx, inputs, callback) {
|
|
|
if (ctx instanceof Array || ctx instanceof DocumentSource) callback = inputs, inputs = ctx, ctx = {};
|
|
|
var batchSize = pipelineObj.batchSize,
|
|
|
pipelineInst = parsePipelineInst;
|
|
|
-
|
|
|
+
|
|
|
parsePipelineInst = null;
|
|
|
-
|
|
|
+
|
|
|
if (!callback) {
|
|
|
batchSize = Infinity;
|
|
|
callback = exports.SYNC_CALLBACK;
|
|
|
}
|
|
|
if (!inputs) return callback("arg `inputs` is required");
|
|
|
-
|
|
|
+
|
|
|
try {
|
|
|
// rebuild the pipeline on subsequent calls
|
|
|
if (!pipelineInst) {
|
|
|
pipelineInst = exports.pipeline.Pipeline.parseCommand(pipelineObj, ctx);
|
|
|
}
|
|
|
- ctx.ns = inputs; //NOTE: use the given `inputs` directly; hacking so that the cursor source will be our inputs instead of the context namespace
|
|
|
+ ctx.ns = inputs; //NOTE: use given `inputs` directly; hacking cursor source to use our inputs instead of the ctx namespace
|
|
|
exports.pipeline.PipelineD.prepareCursorSource(pipelineInst, ctx);
|
|
|
|
|
|
// run the pipeline against
|
|
|
@@ -130,10 +129,8 @@ exports.aggregate = exports;
|
|
|
//Expose these so that mungedb-aggregate can be extended.
|
|
|
exports.pipeline = require("./pipeline/");
|
|
|
exports.query = require("./query/");
|
|
|
+exports.Errors = require("./Errors");
|
|
|
|
|
|
// version info
|
|
|
exports.version = "r2.6.5";
|
|
|
exports.gitVersion = "e99d4fcb4279c0279796f237aa92fe3b64560bf6";
|
|
|
-
|
|
|
-// error code constants
|
|
|
-exports.ERRORS = require('./Errors.js');
|