Pipeline.js 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. "use strict";
  2. var async = require("async");
  3. /**
  4. * mongodb "commands" (sent via db.$cmd.findOne(...)) subclass to make a command. define a singleton object for it.
  5. * @class Pipeline
  6. * @namespace mungedb-aggregate.pipeline
  7. * @module mungedb-aggregate
  8. * @constructor
  9. **/
  10. // CONSTRUCTOR
  11. var Pipeline = module.exports = function Pipeline(theCtx){
  12. this.collectionName = null;
  13. this.sourceVector = null;
  14. this.explain = false;
  15. this.splitMongodPipeline = false;
  16. this.ctx = theCtx;
  17. }, klass = Pipeline, base = Object, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
  18. var DocumentSource = require("./documentSources/DocumentSource"),
  19. LimitDocumentSource = require('./documentSources/LimitDocumentSource'),
  20. MatchDocumentSource = require('./documentSources/MatchDocumentSource'),
  21. ProjectDocumentSource = require('./documentSources/ProjectDocumentSource'),
  22. SkipDocumentSource = require('./documentSources/SkipDocumentSource'),
  23. UnwindDocumentSource = require('./documentSources/UnwindDocumentSource'),
  24. GroupDocumentSource = require('./documentSources/GroupDocumentSource'),
  25. SortDocumentSource = require('./documentSources/SortDocumentSource');
  26. klass.COMMAND_NAME = "aggregate";
  27. klass.PIPELINE_NAME = "pipeline";
  28. klass.EXPLAIN_NAME = "explain";
  29. klass.FROM_ROUTER_NAME = "fromRouter";
  30. klass.SPLIT_MONGOD_PIPELINE_NAME = "splitMongodPipeline";
  31. klass.SERVER_PIPELINE_NAME = "serverPipeline";
  32. klass.MONGOS_PIPELINE_NAME = "mongosPipeline";
  33. klass.stageDesc = {};//attaching this to the class for test cases
  34. klass.stageDesc[LimitDocumentSource.limitName] = LimitDocumentSource.createFromJson;
  35. klass.stageDesc[MatchDocumentSource.matchName] = MatchDocumentSource.createFromJson;
  36. klass.stageDesc[ProjectDocumentSource.projectName] = ProjectDocumentSource.createFromJson;
  37. klass.stageDesc[SkipDocumentSource.skipName] = SkipDocumentSource.createFromJson;
  38. klass.stageDesc[UnwindDocumentSource.unwindName] = UnwindDocumentSource.createFromJson;
  39. klass.stageDesc[GroupDocumentSource.groupName] = GroupDocumentSource.createFromJson;
  40. klass.stageDesc[SortDocumentSource.sortName] = SortDocumentSource.createFromJson;
  41. /**
  42. * Create an `Array` of `DocumentSource`s from the given JSON pipeline
  43. * // NOTE: DEVIATION FROM MONGO: split out into a separate function to better allow extensions (was in parseCommand)
  44. * @static
  45. * @method parseDocumentSources
  46. * @param pipeline {Array} The JSON pipeline
  47. * @returns {Array} The parsed `DocumentSource`s
  48. **/
  49. klass.parseDocumentSources = function parseDocumentSources(pipeline, ctx){
  50. var sourceVector = [];
  51. for (var nSteps = pipeline.length, iStep = 0; iStep < nSteps; ++iStep) {
  52. // pull out the pipeline element as an object
  53. var pipeElement = pipeline[iStep];
  54. if (!(pipeElement instanceof Object)) throw new Error("pipeline element " + iStep + " is not an object; code 15942");
  55. var obj = pipeElement;
  56. // Parse a pipeline stage from 'obj'.
  57. if (Object.keys(obj).length !== 1) throw new Error("A pipeline stage specification object must contain exactly one field; code 16435");
  58. var stageName = Object.keys(obj)[0],
  59. stageSpec = obj[stageName];
  60. // Create a DocumentSource pipeline stage from 'stageSpec'.
  61. var desc = klass.stageDesc[stageName];
  62. if (!desc) throw new Error("Unrecognized pipeline stage name: '" + stageName + "'; code 16435");
  63. // Parse the stage
  64. var stage = desc(stageSpec, ctx);
  65. if (!stage) throw new Error("Stage must not be undefined!");
  66. stage.setPipelineStep(iStep);
  67. sourceVector.push(stage);
  68. }
  69. return sourceVector;
  70. };
  71. /**
  72. * Create a pipeline from the command.
  73. * @static
  74. * @method parseCommand
  75. * @param cmdObj {Object} The command object sent from the client
  76. * @param cmdObj.aggregate {Array} the thing to aggregate against; // NOTE: DEVIATION FROM MONGO: expects an Array of inputs rather than a collection name
  77. * @param cmdObj.pipeline {Object} the JSON pipeline of `DocumentSource` specs
  78. * @param cmdObj.explain {Boolean} should explain?
  79. * @param cmdObj.fromRouter {Boolean} is from router?
  80. * @param cmdObj.splitMongodPipeline {Boolean} should split?
  81. * @param ctx {Object} Not used yet in mungedb-aggregate
  82. * @returns {Array} the pipeline, if created, otherwise a NULL reference
  83. **/
  84. klass.parseCommand = function parseCommand(cmdObj, ctx){
  85. var pipelineNamespace = require("./"),
  86. Pipeline = pipelineNamespace.Pipeline, // using require in case Pipeline gets replaced with an extension
  87. pipelineInst = new Pipeline(ctx);
  88. //gather the specification for the aggregation
  89. var pipeline;
  90. for(var fieldName in cmdObj){
  91. var cmdElement = cmdObj[fieldName];
  92. if(fieldName == klass.COMMAND_NAME) pipelineInst.collectionName = cmdElement; //look for the aggregation command
  93. else if(fieldName == klass.PIPELINE_NAME) pipeline = cmdElement; //check for the pipeline of JSON doc srcs
  94. else if(fieldName == klass.EXPLAIN_NAME) pipelineInst.explain = cmdElement; //check for explain option
  95. else if(fieldName == klass.FROM_ROUTER_NAME) pipelineInst.fromRouter = cmdElement; //if the request came from the router, we're in a shard
  96. else if(fieldName == klass.SPLIT_MONGOD_PIPELINE_NAME) pipelineInst.splitMongodPipeline = cmdElement; //check for debug options
  97. // NOTE: DEVIATION FROM MONGO: Not implementing: "Ignore $auth information sent along with the command. The authentication system will use it, it's not a part of the pipeline."
  98. else throw new Error("unrecognized field " + JSON.stringify(fieldName));
  99. }
  100. /**
  101. * If we get here, we've harvested the fields we expect for a pipeline
  102. * Set up the specified document source pipeline.
  103. **/
  104. // NOTE: DEVIATION FROM MONGO: split this into a separate function to simplify and better allow for extensions (now in parseDocumentSources)
  105. var sourceVector = pipelineInst.sourceVector = Pipeline.parseDocumentSources(pipeline, ctx);
  106. /* if there aren't any pipeline stages, there's nothing more to do */
  107. if (!sourceVector.length) return pipelineInst;
  108. /* Move filters up where possible.
  109. CW TODO -- move filter past projections where possible, and noting corresponding field renaming.
  110. */
  111. /*
  112. Wherever there is a match immediately following a sort, swap them.
  113. This means we sort fewer items. Neither changes the documents in the stream, so this transformation shouldn't affect the result.
  114. We do this first, because then when we coalesce operators below, any adjacent matches will be combined.
  115. */
  116. for(var srcn = sourceVector.length, srci = 1; srci < srcn; ++srci) {
  117. var source = sourceVector[srci];
  118. if (source instanceof MatchDocumentSource) {
  119. var previous = sourceVector[srci - 1];
  120. if (previous instanceof SortDocumentSource) {
  121. /* swap this item with the previous */
  122. sourceVector[srci - 1] = source;
  123. sourceVector[srci] = previous;
  124. }
  125. }
  126. }
  127. /*
  128. Coalesce adjacent filters where possible. Two adjacent filters are equivalent to one filter whose predicate is the conjunction of the two original filters' predicates.
  129. For now, capture this by giving any DocumentSource the option to absorb it's successor; this will also allow adjacent projections to coalesce when possible.
  130. Run through the DocumentSources, and give each one the opportunity to coalesce with its successor. If successful, remove the successor.
  131. Move all document sources to a temporary list.
  132. */
  133. var tempVector = sourceVector.slice(0);
  134. sourceVector.length = 0;
  135. // move the first one to the final list
  136. sourceVector.push(tempVector[0]);
  137. // run through the sources, coalescing them or keeping them
  138. for(var tempn = tempVector.length, tempi = 1; tempi < tempn; ++tempi) {
  139. /*
  140. If we can't coalesce the source with the last, then move it to the final list, and make it the new last.
  141. (If we succeeded, then we're still on the same last, and there's no need to move or do anything with the source -- the destruction of tempVector will take care of the rest.)
  142. */
  143. var lastSource = sourceVector[sourceVector.length - 1],
  144. temp = tempVector[tempi];
  145. if (!temp || !lastSource) throw new Error("null document sources found");
  146. if (!lastSource.coalesce(temp)){
  147. sourceVector.push(temp);
  148. }
  149. }
  150. // optimize the elements in the pipeline
  151. for(var i = 0, l = sourceVector.length; i<l; i++) {
  152. var iter = sourceVector[i];
  153. if (!iter) throw new Error("Pipeline received empty document as argument");
  154. iter.optimize();
  155. }
  156. return pipelineInst;
  157. };
  158. // sync callback for Pipeline#run if omitted
  159. klass.SYNC_CALLBACK = function(err, results){
  160. if (err) throw err;
  161. return results.result;
  162. };
  163. function ifError(err) {
  164. if (err) throw err;
  165. }
  166. /**
  167. * Run the pipeline
  168. * @method run
  169. * @param inputSource {DocumentSource} The input document source for the pipeline
  170. * @param [callback] {Function} Optional callback function if using async extensions
  171. **/
  172. proto.run = function run(inputSource, callback){
  173. if (inputSource && !(inputSource instanceof DocumentSource)) throw new Error("arg `inputSource` must be an instance of DocumentSource");
  174. if (!callback) callback = klass.SYNC_CALLBACK;
  175. var self = this;
  176. if (callback === klass.SYNC_CALLBACK) { // SYNCHRONOUS MODE
  177. inputSource.setSource(undefined, ifError); //TODO: HACK: temp solution to the fact that we need to initialize our source since we're using setSource as a workaround for the lack of real async cursors
  178. var source = inputSource;
  179. for(var i = 0, l = self.sourceVector.length; i < l; i++){
  180. var temp = self.sourceVector[i];
  181. temp.setSource(source, ifError);
  182. source = temp;
  183. }
  184. /*
  185. Iterate through the resulting documents, and add them to the result.
  186. We do this even if we're doing an explain, in order to capture the document counts and other stats.
  187. However, we don't capture the result documents for explain.
  188. */
  189. var resultArray = [];
  190. try{
  191. for(var hasDoc = !source.eof(); hasDoc; hasDoc = source.advance()) {
  192. var document = source.getCurrent();
  193. resultArray.push(document); // add the document to the result set
  194. //Commenting out this assertion for munge. MUHAHAHA!!!
  195. // object will be too large, assert. the extra 1KB is for headers
  196. //if(resultArray.len() < BSONObjMaxUserSize - 1024) throw new Error("aggregation result exceeds maximum document size (" + BSONObjMaxUserSize / (1024 * 1024) + "MB); code 16389");
  197. }
  198. } catch (err) {
  199. return callback(err);
  200. }
  201. var result = {
  202. result: resultArray
  203. // ,ok: true; //not actually in here... where does this come from?
  204. };
  205. return callback(null, result);
  206. } else { // ASYNCHRONOUS MODE //TODO: move this up to a higher level package?
  207. return inputSource.setSource(undefined, function(err){ //TODO: HACK: temp solution to the fact that we need to initialize our source since we're using setSource as a workaround for the lack of real async cursors
  208. if (err) return callback(err);
  209. // chain together the sources we found
  210. var source = inputSource;
  211. async.eachSeries(
  212. self.sourceVector,
  213. function eachSrc(temp, next){
  214. temp.setSource(source, function(err){
  215. if (err) return next(err);
  216. source = temp;
  217. return next();
  218. });
  219. },
  220. function doneSrcs(err){ //source is left pointing at the last source in the chain
  221. if (err) return callback(err);
  222. /*
  223. Iterate through the resulting documents, and add them to the result.
  224. We do this even if we're doing an explain, in order to capture the document counts and other stats.
  225. However, we don't capture the result documents for explain.
  226. */
  227. // the array in which the aggregation results reside
  228. var resultArray = [];
  229. try{
  230. for(var hasDoc = !source.eof(); hasDoc; hasDoc = source.advance()) {
  231. var document = source.getCurrent();
  232. resultArray.push(document); // add the document to the result set
  233. //Commenting out this assertion for munge. MUHAHAHA!!!
  234. // object will be too large, assert. the extra 1KB is for headers
  235. //if(resultArray.len() < BSONObjMaxUserSize - 1024) throw new Error("aggregation result exceeds maximum document size (" + BSONObjMaxUserSize / (1024 * 1024) + "MB); code 16389");
  236. }
  237. } catch (err) {
  238. return callback(err);
  239. }
  240. var result = {
  241. result: resultArray
  242. // ,ok: true; //not actually in here... where does this come from?
  243. };
  244. return callback(null, result);
  245. }
  246. );
  247. });
  248. }
  249. };