Pipeline.js 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425
  1. "use strict";
  2. /**
  3. * mongodb "commands" (sent via db.$cmd.findOne(...)) subclass to make a command. define a singleton object for it.
  4. * @class Pipeline
  5. * @namespace mungedb-aggregate.pipeline
  6. * @module mungedb-aggregate
  7. * @constructor
  8. **/
  9. // CONSTRUCTOR
  10. var Pipeline = module.exports = function Pipeline(theCtx){
  11. this.sources = null;
  12. this.explain = false;
  13. this.splitMongodPipeline = false;
  14. this.ctx = theCtx;
  15. this.SYNC_MODE = false;
  16. }, klass = Pipeline, base = Object, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
  17. var DocumentSource = require("./documentSources/DocumentSource"),
  18. LimitDocumentSource = require('./documentSources/LimitDocumentSource'),
  19. MatchDocumentSource = require('./documentSources/MatchDocumentSource'),
  20. ProjectDocumentSource = require('./documentSources/ProjectDocumentSource'),
  21. SkipDocumentSource = require('./documentSources/SkipDocumentSource'),
  22. UnwindDocumentSource = require('./documentSources/UnwindDocumentSource'),
  23. GroupDocumentSource = require('./documentSources/GroupDocumentSource'),
  24. OutDocumentSource = require('./documentSources/OutDocumentSource'),
  25. GeoNearDocumentSource = require('./documentSources/GeoNearDocumentSource'),
  26. RedactDocumentSource = require('./documentSources/RedactDocumentSource'),
  27. SortDocumentSource = require('./documentSources/SortDocumentSource');
  28. klass.COMMAND_NAME = "aggregate";
  29. klass.PIPELINE_NAME = "pipeline";
  30. klass.EXPLAIN_NAME = "explain";
  31. klass.FROM_ROUTER_NAME = "fromRouter";
  32. klass.SERVER_PIPELINE_NAME = "serverPipeline";
  33. klass.MONGOS_PIPELINE_NAME = "mongosPipeline";
  34. klass.stageDesc = {};//attaching this to the class for test cases
  35. klass.stageDesc[GeoNearDocumentSource.geoNearName] = GeoNearDocumentSource.createFromJson;
  36. klass.stageDesc[GroupDocumentSource.groupName] = GroupDocumentSource.createFromJson;
  37. klass.stageDesc[LimitDocumentSource.limitName] = LimitDocumentSource.createFromJson;
  38. klass.stageDesc[MatchDocumentSource.matchName] = MatchDocumentSource.createFromJson;
  39. klass.stageDesc[OutDocumentSource.outName] = OutDocumentSource.createFromJson;
  40. klass.stageDesc[ProjectDocumentSource.projectName] = ProjectDocumentSource.createFromJson;
  41. klass.stageDesc[RedactDocumentSource.redactName] = ProjectDocumentSource.createFromJson;
  42. klass.stageDesc[SkipDocumentSource.skipName] = SkipDocumentSource.createFromJson;
  43. klass.stageDesc[SortDocumentSource.sortName] = SortDocumentSource.createFromJson;
  44. klass.stageDesc[UnwindDocumentSource.unwindName] = UnwindDocumentSource.createFromJson;
  45. klass.nStageDesc = Object.keys(klass.stageDesc).length;
  46. klass.optimizations = {};
  47. klass.optimizations.local = {};
  48. /**
  49. * Moves $match before $sort when they are placed next to one another
  50. * @static
  51. * @method moveMatchBeforeSort
  52. * @param pipelineInst An instance of a Pipeline
  53. **/
  54. klass.optimizations.local.moveMatchBeforeSort = function moveMatchBeforeSort(pipelineInst) {
  55. var sources = pipelineInst.sources;
  56. for(var srcn = sources.length, srci = 1; srci < srcn; ++srci) {
  57. var source = sources[srci];
  58. if(source.constructor === MatchDocumentSource) {
  59. var previous = sources[srci - 1];
  60. if(previous && previous.constructor === SortDocumentSource) { //Added check that previous exists
  61. /* swap this item with the previous */
  62. sources[srci] = previous;
  63. sources[srci-1] = source;
  64. }
  65. }
  66. }
  67. };
  68. /**
  69. * Moves $limit before $skip when they are placed next to one another
  70. * @static
  71. * @method moveLimitBeforeSkip
  72. * @param pipelineInst An instance of a Pipeline
  73. **/
  74. klass.optimizations.local.moveLimitBeforeSkip = function moveLimitBeforeSkip(pipelineInst) {
  75. var sources = pipelineInst.sources;
  76. if(sources.length === 0) return;
  77. for(var i = sources.length - 1; i >= 1 /* not looking at 0 */; i--) {
  78. var limit = sources[i].constructor === LimitDocumentSource ? sources[i] : undefined,
  79. skip = sources[i-1].constructor === SkipDocumentSource ? sources[i-1] : undefined;
  80. if(limit && skip) {
  81. limit.setLimit(limit.getLimit() + skip.getSkip());
  82. sources[i-1] = limit;
  83. sources[i] = skip;
  84. // Start at back again. This is needed to handle cases with more than 1 $limit
  85. // (S means skip, L means limit)
  86. //
  87. // These two would work without second pass (assuming back to front ordering)
  88. // SL -> LS
  89. // SSL -> LSS
  90. //
  91. // The following cases need a second pass to handle the second limit
  92. // SLL -> LLS
  93. // SSLL -> LLSS
  94. // SLSL -> LLSS
  95. i = sources.length; // decremented before next pass
  96. }
  97. }
  98. };
  99. /**
  100. * Attempts to coalesce every pipeline stage into the previous pipeline stage, starting after the first
  101. * @static
  102. * @method coalesceAdjacent
  103. * @param pipelineInst An instance of a Pipeline
  104. **/
  105. klass.optimizations.local.coalesceAdjacent = function coalesceAdjacent(pipelineInst) {
  106. var sources = pipelineInst.sources;
  107. if(sources.length === 0) return;
  108. // move all sources to a temporary list
  109. var moveSrc = sources.pop(),
  110. tempSources = [];
  111. while(moveSrc) {
  112. tempSources.unshift(moveSrc);
  113. moveSrc = sources.pop();
  114. }
  115. // move the first one to the final list
  116. sources.push(tempSources[0]);
  117. // run through the sources, coalescing them or keeping them
  118. for(var tempn = tempSources.length, tempi = 1; tempi < tempn; ++tempi) {
  119. // If we can't coalesce the source with the last, then move it
  120. // to the final list, and make it the new last. (If we succeeded,
  121. // then we're still on the same last, and there's no need to move
  122. // or do anything with the source -- the destruction of tempSources
  123. // will take care of the rest.)
  124. var lastSource = sources[sources.length-1],
  125. tempSrc = tempSources[tempi];
  126. if(!(lastSource && tempSrc)) {
  127. throw new Error('Must have a last and current source'); // verify(lastSource && tempSrc);
  128. }
  129. if(!lastSource.coalesce(tempSrc)) sources.push(tempSrc);
  130. }
  131. };
  132. /**
  133. * Iterates over sources in the pipelineInst, optimizing each
  134. * @static
  135. * @method optimizeEachDocumentSource
  136. * @param pipelineInst An instance of a Pipeline
  137. **/
  138. klass.optimizations.local.optimizeEachDocumentSource = function optimizeEachDocumentSource(pipelineInst) {
  139. var sources = pipelineInst.sources;
  140. for(var srci = 0, srcn = sources.length; srci < srcn; ++srci) {
  141. sources[srci].optimize();
  142. }
  143. };
  144. /**
  145. * Auto-places a $match before a $redact when the $redact is the first item in a pipeline
  146. * @static
  147. * @method duplicateMatchBeforeInitalRedact
  148. * @param pipelineInst An instance of a Pipeline
  149. **/
  150. klass.optimizations.local.duplicateMatchBeforeInitalRedact = function duplicateMatchBeforeInitalRedact(pipelineInst) {
  151. var sources = pipelineInst.sources;
  152. if(sources.length >= 2 && sources[0].constructor === RedactDocumentSource) {
  153. if(sources[1].constructor === MatchDocumentSource) {
  154. var match = sources[1],
  155. redactSafePortion = match.redactSafePortion();
  156. if(Object.keys(redactSafePortion).length > 0) {
  157. sources.shift(MatchDocumentSource.createFromJson(redactSafePortion, pipelineInst.ctx));
  158. }
  159. }
  160. }
  161. };
  162. /**
  163. * Create an `Array` of `DocumentSource`s from the given JSON pipeline
  164. * // NOTE: DEVIATION FROM MONGO: split out into a separate function to better allow extensions (was in parseCommand)
  165. * @static
  166. * @method parseDocumentSources
  167. * @param pipeline {Array} The JSON pipeline
  168. * @returns {Array} The parsed `DocumentSource`s
  169. **/
  170. klass.parseDocumentSources = function parseDocumentSources(pipeline, ctx){
  171. var sources = [];
  172. for (var nSteps = pipeline.length, iStep = 0; iStep < nSteps; ++iStep) {
  173. // pull out the pipeline element as an object
  174. var pipeElement = pipeline[iStep];
  175. if (!(pipeElement instanceof Object)) throw new Error("pipeline element " + iStep + " is not an object; code 15942");
  176. var obj = pipeElement;
  177. // Parse a pipeline stage from 'obj'.
  178. if (Object.keys(obj).length !== 1) throw new Error("A pipeline stage specification object must contain exactly one field; code 16435");
  179. var stageName = Object.keys(obj)[0],
  180. stageSpec = obj[stageName];
  181. // Create a DocumentSource pipeline stage from 'stageSpec'.
  182. var desc = klass.stageDesc[stageName];
  183. if (!desc) throw new Error("Unrecognized pipeline stage name: '" + stageName + "'; code 16435");
  184. // Parse the stage
  185. var stage = desc(stageSpec, ctx);
  186. if (!stage) throw new Error("Stage must not be undefined!"); // verify(stage)
  187. sources.push(stage);
  188. if(stage.constructor === OutDocumentSource && iStep !== nSteps - 1) {
  189. throw new Error("$out can only be the final stage in the pipeline; code 16435");
  190. }
  191. }
  192. return sources;
  193. };
  194. /**
  195. * Create a pipeline from the command.
  196. * @static
  197. * @method parseCommand
  198. * @param cmdObj {Object} The command object sent from the client
  199. * @param cmdObj.aggregate {Array} the thing to aggregate against; // NOTE: DEVIATION FROM MONGO: expects an Array of inputs rather than a collection name
  200. * @param cmdObj.pipeline {Object} the JSON pipeline of `DocumentSource` specs
  201. * @param cmdObj.explain {Boolean} should explain?
  202. * @param cmdObj.fromRouter {Boolean} is from router?
  203. * @param cmdObj.splitMongodPipeline {Boolean} should split?
  204. * @param ctx {Object} Not used yet in mungedb-aggregate
  205. * @returns {Array} the pipeline, if created, otherwise a NULL reference
  206. **/
  207. klass.parseCommand = function parseCommand(cmdObj, ctx){
  208. var pipelineNamespace = require("./"),
  209. Pipeline = pipelineNamespace.Pipeline, // using require in case Pipeline gets replaced with an extension
  210. pipelineInst = new Pipeline(ctx);
  211. //gather the specification for the aggregation
  212. var pipeline;
  213. for(var fieldName in cmdObj){
  214. var cmdElement = cmdObj[fieldName];
  215. if(fieldName[0] == "$") continue;
  216. else if(fieldName == "cursor") continue;
  217. else if(fieldName == klass.COMMAND_NAME) continue; //look for the aggregation command
  218. else if(fieldName == klass.PIPELINE_NAME) pipeline = cmdElement; //check for the pipeline of JSON doc srcs
  219. else if(fieldName == klass.EXPLAIN_NAME) pipelineInst.explain = cmdElement; //check for explain option
  220. else if(fieldName == klass.FROM_ROUTER_NAME) ctx.inShard = cmdElement; //if the request came from the router, we're in a shard
  221. else if(fieldName == "allowDiskUsage") {
  222. if(typeof cmdElement !== 'boolean') throw new Error("allowDiskUsage must be a bool, not a " + typeof allowDiskUsage+ "; uassert code 16949");
  223. }
  224. else throw new Error("unrecognized field " + JSON.stringify(fieldName));
  225. }
  226. /**
  227. * If we get here, we've harvested the fields we expect for a pipeline
  228. * Set up the specified document source pipeline.
  229. **/
  230. // NOTE: DEVIATION FROM MONGO: split this into a separate function to simplify and better allow for extensions (now in parseDocumentSources)
  231. var sources = pipelineInst.sources = Pipeline.parseDocumentSources(pipeline, ctx);
  232. klass.optimizations.local.moveMatchBeforeSort(pipelineInst);
  233. klass.optimizations.local.moveLimitBeforeSkip(pipelineInst);
  234. klass.optimizations.local.coalesceAdjacent(pipelineInst);
  235. klass.optimizations.local.optimizeEachDocumentSource(pipelineInst);
  236. klass.optimizations.local.duplicateMatchBeforeInitalRedact(pipelineInst);
  237. return pipelineInst;
  238. };
  239. // sync callback for Pipeline#run if omitted
  240. klass.SYNC_CALLBACK = function(err, results){
  241. if (err) throw err;
  242. return results.result;
  243. };
  244. function ifError(err) {
  245. if (err) throw err;
  246. }
  247. /**
  248. * Gets the initial $match query when $match is the first pipeline stage
  249. * @method run
  250. * @param inputSource {DocumentSource} The input document source for the pipeline
  251. * @param [callback] {Function} Optional callback function if using async extensions
  252. * @return {Object} An empty object or the match spec
  253. **/
  254. proto.getInitialQuery = function getInitialQuery() {
  255. var sources = this.sources;
  256. if(sources.length === 0) {
  257. return {};
  258. }
  259. /* look for an initial $match */
  260. var match = sources[0].constructor === MatchDocumentSource ? sources[0] : undefined;
  261. if(!match) return {};
  262. return match.getQuery();
  263. };
  264. /**
  265. * Creates the JSON representation of the pipeline
  266. * @method run
  267. * @param inputSource {DocumentSource} The input document source for the pipeline
  268. * @param [callback] {Function} Optional callback function if using async extensions
  269. * @return {Object} An empty object or the match spec
  270. **/
  271. proto.serialize = function serialize() {
  272. var serialized = {},
  273. array = [];
  274. // create an array out of the pipeline operations
  275. this.sources.forEach(function(source) {
  276. source.serializeToArray(array);
  277. });
  278. serialized[klass.COMMAND_NAME] = this.ctx && this.ctx.ns && this.ctx.ns.coll ? this.ctx.ns.coll : '';
  279. serialized[klass.PIPELINE_NAME] = array;
  280. if(this.explain) serialized[klass.EXPLAIN_NAME] = this.explain;
  281. return serialized;
  282. };
  283. /**
  284. * Points each source at its previous source
  285. * @method stitch
  286. **/
  287. proto.stitch = function stitch() {
  288. if(this.sources.length <= 0) throw new Error("should not have an empty pipeline; massert code 16600");
  289. /* chain together the sources we found */
  290. var prevSource = this.sources[0];
  291. for(var srci = 1, srcn = this.sources.length; srci < srcn; srci++) {
  292. var tempSource = this.sources[srci];
  293. tempSource.setSource(prevSource);
  294. prevSource = tempSource;
  295. }
  296. };
  297. /**
  298. * Run the pipeline
  299. * @method run
  300. * @param callback {Function} Optional. Run the pipeline in async mode; callback(err, result)
  301. * @return result {Object} The result of executing the pipeline
  302. **/
  303. proto.run = function run(callback) {
  304. // should not get here in the explain case
  305. if(this.explain) throw new Error("Should not be running a pipeline in explain mode!");
  306. /* NOTE: DEVIATION FROM MONGO SOURCE. WE'RE SUPPORTING SYNC AND ASYNC */
  307. if(this.SYNC_MODE) {
  308. callback();
  309. return this._runSync();
  310. } else {
  311. return this._runAsync(callback);
  312. }
  313. };
  314. /**
  315. * Get the last document source in the pipeline
  316. * @method _getFinalSource
  317. * @return {Object} The DocumentSource at the end of the pipeline
  318. * @private
  319. **/
  320. proto._getFinalSource = function _getFinalSource() {
  321. return this.sources[this.sources.length - 1];
  322. };
  323. /**
  324. * Run the pipeline synchronously
  325. * @method _runSync
  326. * @return {Object} The results object {result:resultArray}
  327. * @private
  328. **/
  329. proto._runSync = function _runSync(callback) {
  330. var resultArray = [],
  331. finalSource = this._getFinalSource(),
  332. handleErr = function(err) {
  333. if(err) throw err;
  334. },
  335. next;
  336. while((next = finalSource.getNext(handleErr)) !== DocumentSource.EOF) {
  337. resultArray.push(next);
  338. }
  339. return {result:resultArray};
  340. };
  341. /**
  342. * Run the pipeline asynchronously
  343. * @method _runAsync
  344. * @param callback {Function} callback(err, resultObject)
  345. * @private
  346. **/
  347. proto._runAsync = function _runAsync(callback) {
  348. var resultArray = [],
  349. finalSource = this._getFinalSource(),
  350. gotNext = function(err, doc) {
  351. if(err) return callback(err);
  352. if(doc !== DocumentSource.EOF) {
  353. resultArray.push(doc);
  354. return setImmediate(function() { //setImmediate to avoid callstack size issues
  355. finalSource.getNext(gotNext);
  356. });
  357. } else {
  358. return callback(null, {result:resultArray});
  359. }
  360. };
  361. finalSource.getNext(gotNext);
  362. };
  363. /**
  364. * Get the pipeline explanation
  365. * @method writeExplainOps
  366. * @return {Array} An array of source explanations
  367. **/
  368. proto.writeExplainOps = function writeExplainOps() {
  369. var array = [];
  370. this.sources.forEach(function(source) {
  371. source.serializeToArray(array, /*explain=*/true);
  372. });
  373. return array;
  374. };
  375. /**
  376. * Set the source of documents for the pipeline
  377. * @method addInitialSource
  378. * @param source {DocumentSource}
  379. **/
  380. proto.addInitialSource = function addInitialSource(source) {
  381. this.sources.unshift(source);
  382. };