index.js 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. "use strict";
  2. /**
  3. * Used to aggregate `inputs` using a MongoDB-style `pipeline`
  4. *
  5. * If `inputs` is given, it will run the `inputs` through the `pipeline` and call the `callback` with the results.
  6. * If `inputs` is omitted, it will return an "aggregator" function so you can reuse the given `pipeline` against various `inputs`.
  7. *
  8. * NOTE: you should be mindful about reusing the same `pipeline` against disparate `inputs` because document coming in can alter the state of it's `DocumentSource`s
  9. *
  10. * @method aggregate
  11. * @namespace mungedb
  12. * @module mungedb-aggregate
  13. * @param pipeline {Array} The list of pipeline document sources in JSON format
  14. * @param [ctx] {Object} Optional context object to pass through to pipeline
  15. * @param [inputs] {Array} Optional inputs to pass through the `docSrcs` pipeline
  16. * @param [callback] {Function} Optional callback if using async extensions, called when done
  17. * @param callback.err {Error} The Error if one occurred
  18. * @param callback.docs {Array} The resulting documents
  19. **/
  20. exports = module.exports = function aggregate(pipeline, ctx, inputs, callback) { // function-style interface; i.e., return the utility function directly as the require
  21. var DocumentSource = exports.pipeline.documentSources.DocumentSource;
  22. if (ctx instanceof Array || ctx instanceof DocumentSource) callback = inputs, inputs = ctx, ctx = {};
  23. var pipelineInst = exports.pipeline.Pipeline.parseCommand({
  24. pipeline: pipeline
  25. }, ctx),
  26. aggregator = function aggregator(ctx, inputs, callback) {
  27. if (ctx instanceof Array || ctx instanceof DocumentSource) callback = inputs, inputs = ctx, ctx = {};
  28. if (!callback) callback = exports.SYNC_CALLBACK;
  29. if (!inputs) return callback("arg `inputs` is required");
  30. // rebuild the pipeline on subsequent calls
  31. if (!pipelineInst) {
  32. pipelineInst = exports.pipeline.Pipeline.parseCommand({
  33. pipeline: pipeline
  34. }, ctx);
  35. }
  36. // use or build input src
  37. var src;
  38. if(inputs instanceof DocumentSource){
  39. src = inputs;
  40. }else{
  41. try{
  42. ctx.ns = inputs; //NOTE: use the given `inputs` directly; hacking so that the cursor source will be our inputs instead of the context namespace
  43. src = exports.pipeline.PipelineD.prepareCursorSource(pipelineInst, ctx);
  44. }catch(err){
  45. return callback(err);
  46. }
  47. }
  48. var runCallback;
  49. if (!callback) {
  50. runCallback = exports.SYNC_CALLBACK;
  51. pipelineInst.SYNC_MODE = true;
  52. } else {
  53. runCallback = function aggregated(err, results){
  54. if(err) return callback(err);
  55. return callback(null, results.result);
  56. };
  57. }
  58. // run the pipeline against
  59. pipelineInst.stitch();
  60. var results = pipelineInst.run(runCallback);
  61. return results ? results.result : undefined;
  62. };
  63. if(inputs) return aggregator(ctx, inputs, callback);
  64. return aggregator;
  65. };
  66. // sync callback for aggregate if none was provided
  67. exports.SYNC_CALLBACK = function(err, docs){
  68. if (err) throw err;
  69. return docs;
  70. };
  71. // package-style interface; i.e., return a function underneath of the require
  72. exports.aggregate = exports;
  73. //Expose these so that mungedb-aggregate can be extended.
  74. exports.Cursor = require("./Cursor");
  75. exports.pipeline = require("./pipeline/");
  76. // version info
  77. exports.version = "r2.5.4";
  78. exports.gitVersion = "ffd52e5f46cf2ba74ba931c78da62d4a7f480d8e";
  79. // error code constants
  80. exports.ERRORS = require('./Errors.js');