Pipeline.js 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423
  1. "use strict";
  2. /**
  3. * mongodb "commands" (sent via db.$cmd.findOne(...)) subclass to make a command. define a singleton object for it.
  4. * @class Pipeline
  5. * @namespace mungedb-aggregate.pipeline
  6. * @module mungedb-aggregate
  7. * @constructor
  8. **/
  9. // CONSTRUCTOR
  10. var Pipeline = module.exports = function Pipeline(theCtx){
  11. this.sources = null;
  12. this.explain = false;
  13. this.splitMongodPipeline = false;
  14. this.ctx = theCtx;
  15. }, klass = Pipeline, base = Object, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
  16. var DocumentSource = require("./documentSources/DocumentSource"),
  17. LimitDocumentSource = require('./documentSources/LimitDocumentSource'),
  18. MatchDocumentSource = require('./documentSources/MatchDocumentSource'),
  19. ProjectDocumentSource = require('./documentSources/ProjectDocumentSource'),
  20. SkipDocumentSource = require('./documentSources/SkipDocumentSource'),
  21. UnwindDocumentSource = require('./documentSources/UnwindDocumentSource'),
  22. GroupDocumentSource = require('./documentSources/GroupDocumentSource'),
  23. OutDocumentSource = require('./documentSources/OutDocumentSource'),
  24. GeoNearDocumentSource = require('./documentSources/GeoNearDocumentSource'),
  25. RedactDocumentSource = require('./documentSources/RedactDocumentSource'),
  26. SortDocumentSource = require('./documentSources/SortDocumentSource');
  27. klass.COMMAND_NAME = "aggregate";
  28. klass.PIPELINE_NAME = "pipeline";
  29. klass.EXPLAIN_NAME = "explain";
  30. klass.FROM_ROUTER_NAME = "fromRouter";
  31. klass.SERVER_PIPELINE_NAME = "serverPipeline";
  32. klass.MONGOS_PIPELINE_NAME = "mongosPipeline";
  33. klass.stageDesc = {};//attaching this to the class for test cases
  34. klass.stageDesc[GeoNearDocumentSource.geoNearName] = GeoNearDocumentSource.createFromJson;
  35. klass.stageDesc[GroupDocumentSource.groupName] = GroupDocumentSource.createFromJson;
  36. klass.stageDesc[LimitDocumentSource.limitName] = LimitDocumentSource.createFromJson;
  37. klass.stageDesc[MatchDocumentSource.matchName] = MatchDocumentSource.createFromJson;
  38. klass.stageDesc[OutDocumentSource.outName] = OutDocumentSource.createFromJson;
  39. klass.stageDesc[ProjectDocumentSource.projectName] = ProjectDocumentSource.createFromJson;
  40. klass.stageDesc[RedactDocumentSource.redactName] = ProjectDocumentSource.createFromJson;
  41. klass.stageDesc[SkipDocumentSource.skipName] = SkipDocumentSource.createFromJson;
  42. klass.stageDesc[SortDocumentSource.sortName] = SortDocumentSource.createFromJson;
  43. klass.stageDesc[UnwindDocumentSource.unwindName] = UnwindDocumentSource.createFromJson;
  44. klass.nStageDesc = Object.keys(klass.stageDesc).length;
  45. klass.optimizations = {};
  46. klass.optimizations.local = {};
  47. /**
  48. * Moves $match before $sort when they are placed next to one another
  49. * @static
  50. * @method moveMatchBeforeSort
  51. * @param pipelineInst An instance of a Pipeline
  52. **/
  53. klass.optimizations.local.moveMatchBeforeSort = function moveMatchBeforeSort(pipelineInst) {
  54. var sources = pipelineInst.sources;
  55. for(var srcn = sources.length, srci = 1; srci < srcn; ++srci) {
  56. var source = sources[srci];
  57. if(source.constructor === MatchDocumentSource) {
  58. var previous = sources[srci - 1];
  59. if(previous && previous.constructor === SortDocumentSource) { //Added check that previous exists
  60. /* swap this item with the previous */
  61. sources[srci] = previous;
  62. sources[srci-1] = source;
  63. }
  64. }
  65. }
  66. };
  67. /**
  68. * Moves $limit before $skip when they are placed next to one another
  69. * @static
  70. * @method moveLimitBeforeSkip
  71. * @param pipelineInst An instance of a Pipeline
  72. **/
  73. klass.optimizations.local.moveLimitBeforeSkip = function moveLimitBeforeSkip(pipelineInst) {
  74. var sources = pipelineInst.sources;
  75. if(sources.length === 0) return;
  76. for(var i = sources.length - 1; i >= 1 /* not looking at 0 */; i--) {
  77. var limit = sources[i].constructor === LimitDocumentSource ? sources[i] : undefined,
  78. skip = sources[i-1].constructor === SkipDocumentSource ? sources[i-1] : undefined;
  79. if(limit && skip) {
  80. limit.setLimit(limit.getLimit + skip.getSkip());
  81. sources[i-1] = limit;
  82. sources[i] = skip;
  83. // Start at back again. This is needed to handle cases with more than 1 $limit
  84. // (S means skip, L means limit)
  85. //
  86. // These two would work without second pass (assuming back to front ordering)
  87. // SL -> LS
  88. // SSL -> LSS
  89. //
  90. // The following cases need a second pass to handle the second limit
  91. // SLL -> LLS
  92. // SSLL -> LLSS
  93. // SLSL -> LLSS
  94. i = sources.length; // decremented before next pass
  95. }
  96. }
  97. };
  98. /**
  99. * Attempts to coalesce every pipeline stage into the previous pipeline stage, starting after the first
  100. * @static
  101. * @method coalesceAdjacent
  102. * @param pipelineInst An instance of a Pipeline
  103. **/
  104. klass.optimizations.local.coalesceAdjacent = function coalesceAdjacent(pipelineInst) {
  105. var sources = pipelineInst.sources;
  106. if(sources.length === 0) return;
  107. // move all sources to a temporary list
  108. var moveSrc = sources.pop(),
  109. tempSources = [];
  110. while(moveSrc) {
  111. tempSources.unshift(moveSrc);
  112. moveSrc = sources.pop();
  113. }
  114. // move the first one to the final list
  115. sources.push(tempSources[0]);
  116. // run through the sources, coalescing them or keeping them
  117. for(var tempn = tempSources.length, tempi = 1; tempi < tempn; ++tempi) {
  118. // If we can't coalesce the source with the last, then move it
  119. // to the final list, and make it the new last. (If we succeeded,
  120. // then we're still on the same last, and there's no need to move
  121. // or do anything with the source -- the destruction of tempSources
  122. // will take care of the rest.)
  123. var lastSource = sources[sources.length-1],
  124. tempSrc = tempSources[tempi];
  125. if(!(lastSource && tempSrc)) {
  126. throw new Error('Must have a last and current source'); // verify(lastSource && tempSrc);
  127. }
  128. if(!lastSource.coalesce(tempSrc)) sources.push(tempSrc);
  129. }
  130. };
  131. /**
  132. * Iterates over sources in the pipelineInst, optimizing each
  133. * @static
  134. * @method optimizeEachDocumentSource
  135. * @param pipelineInst An instance of a Pipeline
  136. **/
  137. klass.optimizations.local.optimizeEachDocumentSource = function optimizeEachDocumentSource(pipelineInst) {
  138. var sources = pipelineInst.sources;
  139. for(var srci = 0, srcn = sources.length; srci < srcn; ++srci) {
  140. sources[srci].optimize();
  141. }
  142. };
  143. /**
  144. * Auto-places a $match before a $redact when the $redact is the first item in a pipeline
  145. * @static
  146. * @method duplicateMatchBeforeInitalRedact
  147. * @param pipelineInst An instance of a Pipeline
  148. **/
  149. klass.optimizations.local.duplicateMatchBeforeInitalRedact = function duplicateMatchBeforeInitalRedact(pipelineInst) {
  150. var sources = pipelineInst.sources;
  151. if(sources.length >= 2 && sources[0].constructor === RedactDocumentSource) {
  152. if(sources[1].constructor === MatchDocumentSource) {
  153. var match = sources[1],
  154. redactSafePortion = match.redactSafePortion();
  155. if(Object.keys(redactSafePortion).length > 0) {
  156. sources.shift(MatchDocumentSource.createFromJson(redactSafePortion, pipelineInst.ctx));
  157. }
  158. }
  159. }
  160. };
  161. /**
  162. * Create an `Array` of `DocumentSource`s from the given JSON pipeline
  163. * // NOTE: DEVIATION FROM MONGO: split out into a separate function to better allow extensions (was in parseCommand)
  164. * @static
  165. * @method parseDocumentSources
  166. * @param pipeline {Array} The JSON pipeline
  167. * @returns {Array} The parsed `DocumentSource`s
  168. **/
  169. klass.parseDocumentSources = function parseDocumentSources(pipeline, ctx){
  170. var sources = [];
  171. for (var nSteps = pipeline.length, iStep = 0; iStep < nSteps; ++iStep) {
  172. // pull out the pipeline element as an object
  173. var pipeElement = pipeline[iStep];
  174. if (!(pipeElement instanceof Object)) throw new Error("pipeline element " + iStep + " is not an object; code 15942");
  175. var obj = pipeElement;
  176. // Parse a pipeline stage from 'obj'.
  177. if (Object.keys(obj).length !== 1) throw new Error("A pipeline stage specification object must contain exactly one field; code 16435");
  178. var stageName = Object.keys(obj)[0],
  179. stageSpec = obj[stageName];
  180. // Create a DocumentSource pipeline stage from 'stageSpec'.
  181. var desc = klass.stageDesc[stageName];
  182. if (!desc) throw new Error("Unrecognized pipeline stage name: '" + stageName + "'; code 16435");
  183. // Parse the stage
  184. var stage = desc(stageSpec, ctx);
  185. if (!stage) throw new Error("Stage must not be undefined!"); // verify(stage)
  186. sources.push(stage);
  187. if(stage.constructor === OutDocumentSource && iStep !== nSteps - 1) {
  188. throw new Error("$out can only be the final stage in the pipeline; code 16435");
  189. }
  190. }
  191. return sources;
  192. };
  193. /**
  194. * Create a pipeline from the command.
  195. * @static
  196. * @method parseCommand
  197. * @param cmdObj {Object} The command object sent from the client
  198. * @param cmdObj.aggregate {Array} the thing to aggregate against; // NOTE: DEVIATION FROM MONGO: expects an Array of inputs rather than a collection name
  199. * @param cmdObj.pipeline {Object} the JSON pipeline of `DocumentSource` specs
  200. * @param cmdObj.explain {Boolean} should explain?
  201. * @param cmdObj.fromRouter {Boolean} is from router?
  202. * @param cmdObj.splitMongodPipeline {Boolean} should split?
  203. * @param ctx {Object} Not used yet in mungedb-aggregate
  204. * @returns {Array} the pipeline, if created, otherwise a NULL reference
  205. **/
  206. klass.parseCommand = function parseCommand(cmdObj, ctx){
  207. var pipelineNamespace = require("./"),
  208. Pipeline = pipelineNamespace.Pipeline, // using require in case Pipeline gets replaced with an extension
  209. pipelineInst = new Pipeline(ctx);
  210. //gather the specification for the aggregation
  211. var pipeline;
  212. for(var fieldName in cmdObj){
  213. var cmdElement = cmdObj[fieldName];
  214. if(fieldName[0] == "$") continue;
  215. else if(fieldName == "cursor") continue;
  216. else if(fieldName == klass.COMMAND_NAME) continue; //look for the aggregation command
  217. else if(fieldName == klass.PIPELINE_NAME) pipeline = cmdElement; //check for the pipeline of JSON doc srcs
  218. else if(fieldName == klass.EXPLAIN_NAME) pipelineInst.explain = cmdElement; //check for explain option
  219. else if(fieldName == klass.FROM_ROUTER_NAME) ctx.inShard = cmdElement; //if the request came from the router, we're in a shard
  220. else if(fieldName == "allowDiskUsage") {
  221. if(typeof cmdElement !== 'boolean') throw new Error("allowDiskUsage must be a bool, not a " + typeof allowDiskUsage+ "; uassert code 16949");
  222. }
  223. else throw new Error("unrecognized field " + JSON.stringify(fieldName));
  224. }
  225. /**
  226. * If we get here, we've harvested the fields we expect for a pipeline
  227. * Set up the specified document source pipeline.
  228. **/
  229. // NOTE: DEVIATION FROM MONGO: split this into a separate function to simplify and better allow for extensions (now in parseDocumentSources)
  230. var sources = pipelineInst.sources = Pipeline.parseDocumentSources(pipeline, ctx);
  231. klass.optimizations.local.moveMatchBeforeSort(pipelineInst);
  232. klass.optimizations.local.moveLimitBeforeSkip(pipelineInst);
  233. klass.optimizations.local.coalesceAdjacent(pipelineInst);
  234. klass.optimizations.local.optimizeEachDocumentSource(pipelineInst);
  235. klass.optimizations.local.duplicateMatchBeforeInitalRedact(pipelineInst);
  236. return pipelineInst;
  237. };
  238. // sync callback for Pipeline#run if omitted
  239. klass.SYNC_CALLBACK = function(err, results){
  240. if (err) throw err;
  241. return results.result;
  242. };
  243. function ifError(err) {
  244. if (err) throw err;
  245. }
  246. /**
  247. * Gets the initial $match query when $match is the first pipeline stage
  248. * @method run
  249. * @param inputSource {DocumentSource} The input document source for the pipeline
  250. * @param [callback] {Function} Optional callback function if using async extensions
  251. * @return {Object} An empty object or the match spec
  252. **/
  253. proto.getInitialQuery = function getInitialQuery() {
  254. var sources = this.sources;
  255. if(sources.length === 0) {
  256. return {};
  257. }
  258. /* look for an initial $match */
  259. var match = sources[0].constructor === MatchDocumentSource ? sources[0] : undefined;
  260. if(!match) return {};
  261. return match.getQuery();
  262. };
  263. /**
  264. * Creates the JSON representation of the pipeline
  265. * @method run
  266. * @param inputSource {DocumentSource} The input document source for the pipeline
  267. * @param [callback] {Function} Optional callback function if using async extensions
  268. * @return {Object} An empty object or the match spec
  269. **/
  270. proto.serialize = function serialize() {
  271. var serialized = {},
  272. array = [];
  273. // create an array out of the pipeline operations
  274. this.sources.forEach(function(source) {
  275. source.serializeToArray(array);
  276. });
  277. serialized[klass.COMMAND_NAME] = this.ctx && this.ctx.ns && this.ctx.ns.coll ? this.ctx.ns.coll : '';
  278. serialized[klass.PIPELINE_NAME] = array;
  279. if(this.explain) serialized[klass.EXPLAIN_NAME] = this.explain;
  280. return serialized;
  281. };
  282. /**
  283. * Points each source at its previous source
  284. * @method stitch
  285. **/
  286. proto.stitch = function stitch() {
  287. if(this.sources.length <= 0) throw new Error("should not have an empty pipeline; massert code 16600");
  288. /* chain together the sources we found */
  289. var prevSource = this.sources[0];
  290. for(var srci = 1, srcn = this.sources.length; srci < srcn; srci++) {
  291. var tempSource = this.sources[srci];
  292. tempSource.setSource(prevSource);
  293. prevSource = tempSource;
  294. }
  295. };
  296. /**
  297. * Run the pipeline
  298. * @method run
  299. * @param callback {Function} Optional. Run the pipeline in async mode; callback(err, result)
  300. * @return result {Object} The result of executing the pipeline
  301. **/
  302. proto.run = function run(callback) {
  303. // should not get here in the explain case
  304. if(this.explain) throw new Error("Should not be running a pipeline in explain mode!");
  305. /* NOTE: DEVIATION FROM MONGO SOURCE. WE'RE SUPPORTING SYNC AND ASYNC */
  306. if(callback) {
  307. return this._runAsync(callback);
  308. } else {
  309. return this._runSync();
  310. }
  311. };
  312. /**
  313. * Get the last document source in the pipeline
  314. * @method _getFinalSource
  315. * @return {Object} The DocumentSource at the end of the pipeline
  316. * @private
  317. **/
  318. proto._getFinalSource = function _getFinalSource() {
  319. return this.sources[this.sources.length - 1];
  320. };
  321. /**
  322. * Run the pipeline synchronously
  323. * @method _runSync
  324. * @return {Object} The results object {result:resultArray}
  325. * @private
  326. **/
  327. proto._runSync = function _runSync() {
  328. var resultArray = [],
  329. finalSource = this._getFinalSource(),
  330. next = finalSource.getNext();
  331. while(next !== DocumentSource.EOF) {
  332. // add the document to the result set
  333. resultArray.push(next);
  334. next = finalSource.getNext();
  335. }
  336. return {result:resultArray};
  337. };
  338. /**
  339. * Run the pipeline asynchronously
  340. * @method _runAsync
  341. * @param callback {Function} callback(err, resultObject)
  342. * @private
  343. **/
  344. proto._runAsync = function _runAsync(callback) {
  345. var resultArray = [],
  346. finalSource = this._getFinalSource(),
  347. gotNext = function(err, doc) {
  348. if(err) return callback(err);
  349. if(doc !== DocumentSource.EOF) {
  350. resultArray.push(doc);
  351. return setImmediate(function() { //setImmediate to avoid callstack size issues
  352. finalSource.getNext(gotNext);
  353. });
  354. } else {
  355. return callback(null, {result:resultArray});
  356. }
  357. };
  358. finalSource.getNext(gotNext);
  359. };
  360. /**
  361. * Get the pipeline explanation
  362. * @method writeExplainOps
  363. * @return {Array} An array of source explanations
  364. **/
  365. proto.writeExplainOps = function writeExplainOps() {
  366. var array = [];
  367. this.sources.forEach(function(source) {
  368. source.serializeToArray(array, /*explain=*/true);
  369. });
  370. return array;
  371. };
  372. /**
  373. * Set the source of documents for the pipeline
  374. * @method addInitialSource
  375. * @param source {DocumentSource}
  376. **/
  377. proto.addInitialSource = function addInitialSource(source) {
  378. this.sources.unshift(source);
  379. };