|
|
@@ -1,5 +1,6 @@
|
|
|
"use strict";
|
|
|
var AggregationCursor = require("./AggregationCursor");
|
|
|
+
|
|
|
/**
|
|
|
* Used to aggregate `inputs` using a MongoDB-style `pipeline`
|
|
|
*
|
|
|
@@ -57,27 +58,27 @@ exports = module.exports = function aggregate(pipelineObj, ctx, inputs, callback
|
|
|
}
|
|
|
|
|
|
var aggregator = function aggregator(ctx, inputs, callback) {
|
|
|
- if (ctx instanceof Array || ctx instanceof DocumentSource) callback = inputs, inputs = ctx, ctx = {};
|
|
|
- if (!inputs) return callback("arg `inputs` is required");
|
|
|
+ if (ctx instanceof Array || ctx instanceof DocumentSource) callback = inputs, inputs = ctx, ctx = {};
|
|
|
+ if (!inputs) return callback("arg `inputs` is required");
|
|
|
|
|
|
- try {
|
|
|
- // rebuild the pipeline on subsequent calls
|
|
|
- if (!pipelineInst) {
|
|
|
- pipelineInst = exports.pipeline.Pipeline.parseCommand(pipelineObj, ctx);
|
|
|
- }
|
|
|
- ctx.ns = inputs; //NOTE: use given `inputs` directly; hacking cursor source to use our inputs instead of the ctx namespace
|
|
|
- exports.pipeline.PipelineD.prepareCursorSource(pipelineInst, ctx);
|
|
|
- pipelineInst.stitch();
|
|
|
- } catch(err) {
|
|
|
- return callback(err);
|
|
|
+ try {
|
|
|
+ // rebuild the pipeline on subsequent calls
|
|
|
+ if (!pipelineInst) {
|
|
|
+ pipelineInst = exports.pipeline.Pipeline.parseCommand(pipelineObj, ctx);
|
|
|
}
|
|
|
+ ctx.ns = inputs; //NOTE: use given `inputs` directly; hacking cursor source to use our inputs instead of the ctx namespace
|
|
|
+ exports.pipeline.PipelineD.prepareCursorSource(pipelineInst, ctx);
|
|
|
+ pipelineInst.stitch();
|
|
|
+ } catch(err) {
|
|
|
+ return callback(err);
|
|
|
+ }
|
|
|
+
|
|
|
+ var cursor = new AggregationCursor(pipelineInst);
|
|
|
+ pipelineInst = null;
|
|
|
|
|
|
- var tmpInst= pipelineInst;
|
|
|
- pipelineInst = null;
|
|
|
+ return callback ? cursor.toArray(callback) : cursor;
|
|
|
+ };
|
|
|
|
|
|
- if (!callback) return new AggregationCursor(tmpInst);
|
|
|
- else return new AggregationCursor(tmpInst).toArray(callback);
|
|
|
- };
|
|
|
if (inputs) return aggregator(ctx, inputs, callback);
|
|
|
return aggregator;
|
|
|
};
|
|
|
@@ -85,7 +86,6 @@ exports = module.exports = function aggregate(pipelineObj, ctx, inputs, callback
|
|
|
exports.AggregationCursor = AggregationCursor;
|
|
|
|
|
|
exports.cmdDefaults = {
|
|
|
- batchSize: 150,
|
|
|
explain: false,
|
|
|
};
|
|
|
|