index.js 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. "use strict";
  2. module.exports = (function(){
  3. /**
  4. * Used to aggregate `inputs` using a MongoDB-style `pipeline`
  5. *
  6. * If `inputs` is given, it will run the `inputs` through the `pipeline` and call the `callback` with the results.
  7. * If `inputs` is omitted, it will return an "aggregator" function so you can reuse the given `pipeline` against various `inputs`.
  8. *
  9. * NOTE: you should be mindful about reusing the same `pipeline` against disparate `inputs` because document coming in can alter the state of it's `DocumentSource`s
  10. *
  11. * @method aggregate
  12. * @namespace mungedb
  13. * @module mungedb-aggregate
  14. * @param pipeline {Array} The list of pipeline document sources in JSON format
  15. * @param [inputs] {Array} Optional inputs to pass through the `docSrcs` pipeline
  16. * @param callback {Function} Called when done
  17. * @param callback.err {Error} The Error if one occurred
  18. * @param callback.docs {Array} The resulting documents
  19. **/
  20. exports = function aggregate(pipeline, inputs, callback) { // function-style interface; i.e., return the utility function directly as the require
  21. var ctx = {}, //not used yet
  22. pipelineInst = exports.pipeline.Pipeline.parseCommand({
  23. pipeline: pipeline
  24. }, ctx),
  25. aggregator = function aggregator(inputs, callback) {
  26. if (!callback) throw new Error("arg `callback` is required");
  27. if (!inputs) return callback("arg `inputs` is required");
  28. // rebuild the pipeline on subsequent calls
  29. if (!pipelineInst) {
  30. pipelineInst = exports.pipeline.Pipeline.parseCommand({
  31. pipeline: pipeline
  32. }, ctx);
  33. }
  34. // use or build input src
  35. var src;
  36. if(inputs instanceof exports.pipeline.documentSources.DocumentSource){
  37. src = inputs;
  38. }else{
  39. try{
  40. pipelineInst.collectionName = inputs; //NOTE: use the given `inputs` directly; not really a "name" but we don't really have collection names in mungedb-aggregate
  41. src = exports.pipeline.PipelineD.prepareCursorSource(pipelineInst, pipelineInst.ctx);
  42. }catch(err){
  43. return callback(err);
  44. }
  45. }
  46. // run the pipeline against the input src
  47. return pipelineInst.run(src, function aggregated(err, results){
  48. pipelineInst = null; // unset so that subsequent calls can rebuild the pipeline
  49. if(err) return callback(err);
  50. return callback(null, results.result);
  51. });
  52. };
  53. if(inputs) return aggregator(inputs, callback);
  54. return aggregator;
  55. };
  56. exports.aggregate = exports; // package-style interface; i.e., return a function underneath of the require
  57. //Expose these so that mungedb-aggregate can be extended.
  58. exports.Cursor = require("./Cursor");
  59. exports.pipeline = require("./pipeline/");
  60. // version info
  61. exports.version = "r2.4.0-rc0";
  62. exports.gitVersion = "cb8efcd6a2f05d35655ed9f9b947cc4a99ade8db";
  63. return exports;
  64. })();