index.js 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. "use strict";
  2. /**
  3. * Used to aggregate `inputs` using a MongoDB-style `pipeline`
  4. *
  5. * If `inputs` is given, it will run the `inputs` through the `pipeline` and call the `callback` with the results.
  6. * If `inputs` is omitted, it will return an "aggregator" function so you can reuse the given `pipeline` against various `inputs`.
  7. *
  8. * NOTE: you should be mindful about reusing the same `pipeline` against disparate `inputs` because document coming in can alter the state of it's `DocumentSource`s
  9. *
  10. * @method aggregate
  11. * @namespace mungedb
  12. * @module mungedb-aggregate
  13. * @param pipeline {Array} The list of pipeline document sources in JSON format
  14. * @param [ctx] {Object} Optional context object to pass through to pipeline
  15. * @param [inputs] {Array} Optional inputs to pass through the `docSrcs` pipeline
  16. * @param [callback] {Function} Optional callback if using async extensions, called when done
  17. * @param callback.err {Error} The Error if one occurred
  18. * @param callback.docs {Array} The resulting documents
  19. **/
  20. exports = module.exports = function aggregate(cmdObject, ctx, inputs, callback) { // function-style interface; i.e., return the utility function directly as the require
  21. var DocumentSource = exports.pipeline.documentSources.DocumentSource;
  22. if (ctx instanceof Array || ctx instanceof DocumentSource) callback = inputs, inputs = ctx, ctx = {};
  23. var pipelineInst;
  24. try {
  25. //Set up the command Object
  26. cmdObject = (cmdObject instanceof Array) ? {pipeline: cmdObject} : cmdObject;
  27. if (!cmdObject instanceof Object) throw new Error("cmdObject must be either an Object or an Array");
  28. for (var key in exports.cmdDefaults){
  29. if (exports.cmdDefaults.hasOwnProperty(key) && cmdObject[key] === undefined){
  30. cmdObject[key] = exports.cmdDefaults[key];
  31. }
  32. }
  33. pipelineInst = exports.pipeline.Pipeline.parseCommand(cmdObject, ctx);
  34. } catch(ex) {
  35. // Error handling is funky since this can be used multiple different ways
  36. if (callback){
  37. if (inputs) return callback(ex);
  38. else {
  39. return function aggregator(ctx, inputs, callback) {
  40. if (ctx instanceof Array || ctx instanceof DocumentSource) callback = inputs, inputs = ctx, ctx = {};
  41. return callback(ex);
  42. };
  43. }
  44. } else {
  45. throw ex;
  46. }
  47. }
  48. if (cmdObject.explain){
  49. return pipelineInst.writeExplainOps();
  50. }
  51. var aggregator = function aggregator(ctx, inputs, callback) {
  52. if (ctx instanceof Array || ctx instanceof DocumentSource) callback = inputs, inputs = ctx, ctx = {};
  53. var batchSize = cmdObject.batchSize;
  54. if (!callback) {
  55. batchSize = Infinity;
  56. callback = exports.SYNC_CALLBACK;
  57. }
  58. if (!inputs) return callback("arg `inputs` is required");
  59. try {
  60. // rebuild the pipeline on subsequent calls
  61. if (!pipelineInst) {
  62. pipelineInst = exports.pipeline.Pipeline.parseCommand(cmdObject, ctx);
  63. }
  64. ctx.ns = inputs; //NOTE: use the given `inputs` directly; hacking so that the cursor source will be our inputs instead of the context namespace
  65. exports.pipeline.PipelineD.prepareCursorSource(pipelineInst, ctx);
  66. // run the pipeline against
  67. pipelineInst.stitch();
  68. } catch(err) {
  69. return callback(err);
  70. }
  71. var batch = [];
  72. var runCallback = function aggregated(err, document){
  73. if (!callback) return;//make sure the callback doesn't get called anymore after an error
  74. if(err) {
  75. pipelineInst = null;
  76. callback(err);
  77. callback = undefined;
  78. return;
  79. }
  80. if (document === null){
  81. pipelineInst = null;
  82. callback(null, batch);
  83. callback = undefined;
  84. return;
  85. }
  86. batch.push(document);
  87. if (batch.length >= batchSize){
  88. return callback(null, batch);
  89. }
  90. };
  91. pipelineInst.run(runCallback);
  92. return batch;
  93. };
  94. if(inputs) return aggregator(ctx, inputs, callback);
  95. return aggregator;
  96. };
  97. // sync callback for aggregate if none was provided
  98. exports.SYNC_CALLBACK = function(err, docs){
  99. if (err) throw err;
  100. return docs;
  101. };
  102. exports.cmdDefaults = {
  103. batchSize: 150,
  104. explain: false
  105. };
  106. // package-style interface; i.e., return a function underneath of the require
  107. exports.aggregate = exports;
  108. //Expose these so that mungedb-aggregate can be extended.
  109. exports.pipeline = require("./pipeline/");
  110. exports.query = require("./query/");
  111. // version info
  112. exports.version = "r2.6.5";
  113. exports.gitVersion = "e99d4fcb4279c0279796f237aa92fe3b64560bf6";
  114. // error code constants
  115. exports.ERRORS = require('./Errors.js');