| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102 |
- "use strict";
- var AggregationCursor = require("./AggregationCursor");
- /**
- * Used to aggregate `inputs` using a MongoDB-style `pipeline`
- *
- * If `inputs` is given, it will run the `inputs` through the `pipeline` and call the `callback` with the results.
- * If `inputs` is omitted, it will return an "aggregator" function so you can reuse the given `pipeline` against various `inputs`.
- *
- * @method aggregate
- * @namespace mungedb
- * @module mungedb-aggregate
- * @param pipelineObj {Array|Object} The list of pipeline document sources in JSON format or object with pipeline and options
- * @param [ctx] {Object} Optional context object to pass through to pipeline
- * @param [inputs] {Array} Optional inputs to pass through the `docSrcs` pipeline
- * @param [callback] {Function} Optional callback if using async extensions, called when done
- * @param callback.err {Error} The Error if one occurred
- * @param callback.docs {Array} The resulting documents
- **/
- exports = module.exports = function aggregate(pipelineObj, ctx, inputs, callback) { // export directly for a function-style interface
- var DocumentSource = exports.pipeline.documentSources.DocumentSource;
- if (ctx instanceof Array || ctx instanceof DocumentSource) callback = inputs, inputs = ctx, ctx = {};
- ctx = ctx || {};
- var pipelineInst;
- try {
- //Set up the command Object
- pipelineObj = pipelineObj instanceof Array ? {pipeline: pipelineObj} : pipelineObj;
- if (!(pipelineObj instanceof Object)) throw new Error("pipelineObj must be either an Object or an Array");
- for (var key in exports.cmdDefaults) {
- if (exports.cmdDefaults.hasOwnProperty(key) && pipelineObj[key] === undefined) {
- pipelineObj[key] = exports.cmdDefaults[key];
- }
- }
- pipelineInst = exports.pipeline.Pipeline.parseCommand(pipelineObj, ctx);
- } catch(ex) {
- // Error handling is funky since this can be used multiple different ways
- if (callback) {
- if (inputs) {
- return callback(ex);
- } else {
- return function aggregator(ctx, inputs, callback) {
- if (ctx instanceof Array || ctx instanceof DocumentSource) callback = inputs, inputs = ctx, ctx = {};
- return callback(ex);
- };
- }
- } else {
- throw ex;
- }
- }
- if (pipelineObj.explain) {
- if (inputs) {
- ctx.ns = inputs; //NOTE: use given `inputs` directly; hacking cursor source to use our inputs instead of the ctx namespace
- exports.pipeline.PipelineD.prepareCursorSource(pipelineInst, ctx);
- }
- return pipelineInst.writeExplainOps();
- }
- var aggregator = function aggregator(ctx, inputs, callback) {
- if (ctx instanceof Array || ctx instanceof DocumentSource) callback = inputs, inputs = ctx, ctx = {};
- if (!inputs) return callback("arg `inputs` is required");
- try {
- // rebuild the pipeline on subsequent calls
- if (!pipelineInst) {
- pipelineInst = exports.pipeline.Pipeline.parseCommand(pipelineObj, ctx);
- }
- ctx.ns = inputs; //NOTE: use given `inputs` directly; hacking cursor source to use our inputs instead of the ctx namespace
- exports.pipeline.PipelineD.prepareCursorSource(pipelineInst, ctx);
- pipelineInst.stitch();
- } catch(err) {
- return callback(err);
- }
- var tmpInst= pipelineInst;
- pipelineInst = null;
- if (!callback) return new AggregationCursor(tmpInst);
- else return new AggregationCursor(tmpInst).toArray(callback);
- };
- if (inputs) return aggregator(ctx, inputs, callback);
- return aggregator;
- };
- exports.AggregationCursor = AggregationCursor;
- exports.cmdDefaults = {
- batchSize: 150,
- explain: false,
- };
- // package-style interface; i.e., return a function underneath of the require
- exports.aggregate = exports;
- //Expose these so that mungedb-aggregate can be extended.
- exports.pipeline = require("./pipeline/");
- exports.query = require("./query/");
- exports.errors = require("./errors");
- // version info
- exports.version = "r2.6.5";
- exports.gitVersion = "e99d4fcb4279c0279796f237aa92fe3b64560bf6";
|