index.js 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. "use strict";
  2. var AggregationCursor = require("./AggregationCursor");
  3. /**
  4. * Used to aggregate `inputs` using a MongoDB-style `pipeline`
  5. *
  6. * If `inputs` is given, it will run the `inputs` through the `pipeline` and call the `callback` with the results.
  7. * If `inputs` is omitted, it will return an "aggregator" function so you can reuse the given `pipeline` against various `inputs`.
  8. *
  9. * @method aggregate
  10. * @namespace mungedb
  11. * @module mungedb-aggregate
  12. * @param pipelineObj {Array|Object} The list of pipeline document sources in JSON format or object with pipeline and options
  13. * @param [ctx] {Object} Optional context object to pass through to pipeline
  14. * @param [inputs] {Array} Optional inputs to pass through the `docSrcs` pipeline
  15. * @param [callback] {Function} Optional callback if using async extensions, called when done
  16. * @param callback.err {Error} The Error if one occurred
  17. * @param callback.docs {Array} The resulting documents
  18. **/
  19. exports = module.exports = function aggregate(pipelineObj, ctx, inputs, callback) { // export directly for a function-style interface
  20. var DocumentSource = exports.pipeline.documentSources.DocumentSource;
  21. if (ctx instanceof Array || ctx instanceof DocumentSource) callback = inputs, inputs = ctx, ctx = {};
  22. ctx = ctx || {};
  23. var pipelineInst;
  24. try {
  25. //Set up the command Object
  26. pipelineObj = pipelineObj instanceof Array ? {pipeline: pipelineObj} : pipelineObj;
  27. if (!(pipelineObj instanceof Object)) throw new Error("pipelineObj must be either an Object or an Array");
  28. for (var key in exports.cmdDefaults) {
  29. if (exports.cmdDefaults.hasOwnProperty(key) && pipelineObj[key] === undefined) {
  30. pipelineObj[key] = exports.cmdDefaults[key];
  31. }
  32. }
  33. pipelineInst = exports.pipeline.Pipeline.parseCommand(pipelineObj, ctx);
  34. } catch(ex) {
  35. // Error handling is funky since this can be used multiple different ways
  36. if (callback) {
  37. if (inputs) {
  38. return callback(ex);
  39. } else {
  40. return function aggregator(ctx, inputs, callback) {
  41. if (ctx instanceof Array || ctx instanceof DocumentSource) callback = inputs, inputs = ctx, ctx = {};
  42. return callback(ex);
  43. };
  44. }
  45. } else {
  46. throw ex;
  47. }
  48. }
  49. if (pipelineObj.explain) {
  50. if (inputs) {
  51. ctx.ns = inputs; //NOTE: use given `inputs` directly; hacking cursor source to use our inputs instead of the ctx namespace
  52. exports.pipeline.PipelineD.prepareCursorSource(pipelineInst, ctx);
  53. }
  54. return pipelineInst.writeExplainOps();
  55. }
  56. var aggregator = function aggregator(ctx, inputs, callback) {
  57. if (ctx instanceof Array || ctx instanceof DocumentSource) callback = inputs, inputs = ctx, ctx = {};
  58. if (!inputs) return callback("arg `inputs` is required");
  59. try {
  60. // rebuild the pipeline on subsequent calls
  61. if (!pipelineInst) {
  62. pipelineInst = exports.pipeline.Pipeline.parseCommand(pipelineObj, ctx);
  63. }
  64. ctx.ns = inputs; //NOTE: use given `inputs` directly; hacking cursor source to use our inputs instead of the ctx namespace
  65. exports.pipeline.PipelineD.prepareCursorSource(pipelineInst, ctx);
  66. pipelineInst.stitch();
  67. } catch(err) {
  68. return callback(err);
  69. }
  70. var tmpInst= pipelineInst;
  71. pipelineInst = null;
  72. if (!callback) return new AggregationCursor(tmpInst);
  73. else return new AggregationCursor(tmpInst).toArray(callback);
  74. };
  75. if (inputs) return aggregator(ctx, inputs, callback);
  76. return aggregator;
  77. };
  78. exports.AggregationCursor = AggregationCursor;
  79. exports.cmdDefaults = {
  80. batchSize: 150,
  81. explain: false,
  82. };
  83. // package-style interface; i.e., return a function underneath of the require
  84. exports.aggregate = exports;
  85. //Expose these so that mungedb-aggregate can be extended.
  86. exports.pipeline = require("./pipeline/");
  87. exports.query = require("./query/");
  88. exports.errors = require("./errors");
  89. // version info
  90. exports.version = "r2.6.5";
  91. exports.gitVersion = "e99d4fcb4279c0279796f237aa92fe3b64560bf6";