index.js 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. "use strict";
  2. /**
  3. * Used to aggregate `inputs` using a MongoDB-style `pipeline`
  4. *
  5. * If `inputs` is given, it will run the `inputs` through the `pipeline` and call the `callback` with the results.
  6. * If `inputs` is omitted, it will return an "aggregator" function so you can reuse the given `pipeline` against various `inputs`.
  7. *
  8. * @method aggregate
  9. * @namespace mungedb
  10. * @module mungedb-aggregate
  11. * @param pipelineObj {Array|Object} The list of pipeline document sources in JSON format or object with pipeline and options
  12. * @param [ctx] {Object} Optional context object to pass through to pipeline
  13. * @param [inputs] {Array} Optional inputs to pass through the `docSrcs` pipeline
  14. * @param [callback] {Function} Optional callback if using async extensions, called when done
  15. * @param callback.err {Error} The Error if one occurred
  16. * @param callback.docs {Array} The resulting documents
  17. **/
  18. exports = module.exports = function aggregate(pipelineObj, ctx, inputs, callback) { // export directly for a function-style interface
  19. var DocumentSource = exports.pipeline.documentSources.DocumentSource;
  20. if (ctx instanceof Array || ctx instanceof DocumentSource) callback = inputs, inputs = ctx, ctx = {};
  21. ctx = ctx || {};
  22. var parsePipelineInst;
  23. try {
  24. //Set up the command Object
  25. pipelineObj = (pipelineObj instanceof Array) ? {pipeline: pipelineObj} : pipelineObj;
  26. if (!(pipelineObj instanceof Object)) throw new Error("pipelineObj must be either an Object or an Array");
  27. for (var key in exports.cmdDefaults){
  28. if (exports.cmdDefaults.hasOwnProperty(key) && pipelineObj[key] === undefined){
  29. pipelineObj[key] = exports.cmdDefaults[key];
  30. }
  31. }
  32. parsePipelineInst = exports.pipeline.Pipeline.parseCommand(pipelineObj, ctx);
  33. } catch(ex) {
  34. // Error handling is funky since this can be used multiple different ways
  35. if (callback) {
  36. if (inputs) {
  37. return callback(ex);
  38. } else {
  39. return function aggregator(ctx, inputs, callback) {
  40. if (ctx instanceof Array || ctx instanceof DocumentSource) callback = inputs, inputs = ctx, ctx = {};
  41. return callback(ex);
  42. };
  43. }
  44. } else {
  45. throw ex;
  46. }
  47. }
  48. if (pipelineObj.explain){
  49. if (inputs){
  50. ctx.ns = inputs; //NOTE: use given `inputs` directly; hacking cursor source to use our inputs instead of the ctx namespace
  51. exports.pipeline.PipelineD.prepareCursorSource(parsePipelineInst, ctx);
  52. }
  53. return parsePipelineInst.writeExplainOps();
  54. }
  55. var aggregator = function aggregator(ctx, inputs, callback) {
  56. if (ctx instanceof Array || ctx instanceof DocumentSource) callback = inputs, inputs = ctx, ctx = {};
  57. var batchSize = pipelineObj.batchSize,
  58. pipelineInst = parsePipelineInst;
  59. parsePipelineInst = null;
  60. if (!callback) {
  61. batchSize = Infinity;
  62. callback = exports.SYNC_CALLBACK;
  63. }
  64. if (!inputs) return callback("arg `inputs` is required");
  65. try {
  66. // rebuild the pipeline on subsequent calls
  67. if (!pipelineInst) {
  68. pipelineInst = exports.pipeline.Pipeline.parseCommand(pipelineObj, ctx);
  69. }
  70. ctx.ns = inputs; //NOTE: use given `inputs` directly; hacking cursor source to use our inputs instead of the ctx namespace
  71. exports.pipeline.PipelineD.prepareCursorSource(pipelineInst, ctx);
  72. // run the pipeline against
  73. pipelineInst.stitch();
  74. } catch(err) {
  75. return callback(err);
  76. }
  77. var batch = [];
  78. var runCallback = function aggregated(err, document){
  79. if (!callback) return;
  80. if(err) {
  81. callback(err);
  82. callback = undefined;//we are officially done. make sure the callback doesn't get called anymore
  83. return;
  84. }
  85. if (document === null){
  86. callback(null, batch);
  87. if (batchSize !== Infinity){
  88. callback(null, null); //this is to tell the caller that that we are done aggregating
  89. }
  90. callback = undefined;//we are officially done. make sure the callback doesn't get called anymore
  91. return;
  92. }
  93. batch.push(document);
  94. if (batch.length >= batchSize){
  95. callback(null, batch);
  96. batch = [];
  97. return;
  98. }
  99. };
  100. pipelineInst.run(runCallback);
  101. return batch;
  102. };
  103. if(inputs) return aggregator(ctx, inputs, callback);
  104. return aggregator;
  105. };
  106. // sync callback for aggregate if none was provided
  107. exports.SYNC_CALLBACK = function(err, docs){
  108. if (err) throw err;
  109. return docs;
  110. };
  111. exports.cmdDefaults = {
  112. batchSize: 150,
  113. explain: false
  114. };
  115. // package-style interface; i.e., return a function underneath of the require
  116. exports.aggregate = exports;
  117. //Expose these so that mungedb-aggregate can be extended.
  118. exports.pipeline = require("./pipeline/");
  119. exports.query = require("./query/");
  120. exports.Errors = require("./Errors");
  121. // version info
  122. exports.version = "r2.6.5";
  123. exports.gitVersion = "e99d4fcb4279c0279796f237aa92fe3b64560bf6";