Pipeline.js 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492
  1. "use strict";
  2. var async = require('async');
  3. /**
  4. * mongodb "commands" (sent via db.$cmd.findOne(...)) subclass to make a command. define a singleton object for it.
  5. * @class Pipeline
  6. * @namespace mungedb-aggregate.pipeline
  7. * @module mungedb-aggregate
  8. * @constructor
  9. **/
  10. // CONSTRUCTOR
  11. var Pipeline = module.exports = function Pipeline(theCtx){
  12. this.sources = null;
  13. this.explain = false;
  14. this.splitMongodPipeline = false;
  15. this.ctx = theCtx;
  16. }, klass = Pipeline, base = Object, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
  17. var DocumentSource = require("./documentSources/DocumentSource"),
  18. LimitDocumentSource = require('./documentSources/LimitDocumentSource'),
  19. MatchDocumentSource = require('./documentSources/MatchDocumentSource'),
  20. ProjectDocumentSource = require('./documentSources/ProjectDocumentSource'),
  21. SkipDocumentSource = require('./documentSources/SkipDocumentSource'),
  22. UnwindDocumentSource = require('./documentSources/UnwindDocumentSource'),
  23. GroupDocumentSource = require('./documentSources/GroupDocumentSource'),
  24. OutDocumentSource = require('./documentSources/OutDocumentSource'),
  25. GeoNearDocumentSource = require('./documentSources/GeoNearDocumentSource'),
  26. RedactDocumentSource = require('./documentSources/RedactDocumentSource'),
  27. SortDocumentSource = require('./documentSources/SortDocumentSource'),
  28. DepsTracker = require('./DepsTracker');
  29. klass.COMMAND_NAME = "aggregate";
  30. klass.PIPELINE_NAME = "pipeline";
  31. klass.EXPLAIN_NAME = "explain";
  32. klass.FROM_ROUTER_NAME = "fromRouter";
  33. klass.SERVER_PIPELINE_NAME = "serverPipeline";
  34. klass.MONGOS_PIPELINE_NAME = "mongosPipeline";
  35. klass.BATCH_SIZE_NAME = "batchSize";
  36. klass.stageDesc = {};//attaching this to the class for test cases
  37. klass.stageDesc[GeoNearDocumentSource.geoNearName] = GeoNearDocumentSource.createFromJson;
  38. klass.stageDesc[GroupDocumentSource.groupName] = GroupDocumentSource.createFromJson;
  39. klass.stageDesc[LimitDocumentSource.limitName] = LimitDocumentSource.createFromJson;
  40. klass.stageDesc[MatchDocumentSource.matchName] = MatchDocumentSource.createFromJson;
  41. klass.stageDesc[OutDocumentSource.outName] = OutDocumentSource.createFromJson;
  42. klass.stageDesc[ProjectDocumentSource.projectName] = ProjectDocumentSource.createFromJson;
  43. klass.stageDesc[RedactDocumentSource.redactName] = ProjectDocumentSource.createFromJson;
  44. klass.stageDesc[SkipDocumentSource.skipName] = SkipDocumentSource.createFromJson;
  45. klass.stageDesc[SortDocumentSource.sortName] = SortDocumentSource.createFromJson;
  46. klass.stageDesc[UnwindDocumentSource.unwindName] = UnwindDocumentSource.createFromJson;
  47. klass.nStageDesc = Object.keys(klass.stageDesc).length;
  48. klass.optimizations = {};
  49. klass.optimizations.local = {};
  50. klass.optimizations.sharded = {};
  51. /**
  52. * Moves $match before $sort when they are placed next to one another
  53. * @static
  54. * @method moveMatchBeforeSort
  55. * @param pipelineInst An instance of a Pipeline
  56. */
  57. klass.optimizations.local.moveMatchBeforeSort = function moveMatchBeforeSort(pipelineInst) {
  58. var sources = pipelineInst.sources;
  59. for(var srcn = sources.length, srci = 1; srci < srcn; ++srci) {
  60. var source = sources[srci];
  61. if(source.constructor === MatchDocumentSource) {
  62. var previous = sources[srci - 1];
  63. if(previous && previous.constructor === SortDocumentSource) { //Added check that previous exists
  64. /* swap this item with the previous */
  65. sources[srci] = previous;
  66. sources[srci-1] = source;
  67. }
  68. }
  69. }
  70. };
  71. /**
  72. * Moves $limit before $skip when they are placed next to one another
  73. * @static
  74. * @method moveLimitBeforeSkip
  75. * @param pipelineInst An instance of a Pipeline
  76. */
  77. klass.optimizations.local.moveLimitBeforeSkip = function moveLimitBeforeSkip(pipelineInst) {
  78. var sources = pipelineInst.sources;
  79. if(sources.length === 0) return;
  80. for(var i = sources.length - 1; i >= 1 /* not looking at 0 */; i--) {
  81. var limit = sources[i].constructor === LimitDocumentSource ? sources[i] : undefined,
  82. skip = sources[i-1].constructor === SkipDocumentSource ? sources[i-1] : undefined;
  83. if(limit && skip) {
  84. limit.setLimit(limit.getLimit() + skip.getSkip());
  85. sources[i-1] = limit;
  86. sources[i] = skip;
  87. // Start at back again. This is needed to handle cases with more than 1 $limit
  88. // (S means skip, L means limit)
  89. //
  90. // These two would work without second pass (assuming back to front ordering)
  91. // SL -> LS
  92. // SSL -> LSS
  93. //
  94. // The following cases need a second pass to handle the second limit
  95. // SLL -> LLS
  96. // SSLL -> LLSS
  97. // SLSL -> LLSS
  98. i = sources.length; // decremented before next pass
  99. }
  100. }
  101. };
  102. /**
  103. * Attempts to coalesce every pipeline stage into the previous pipeline stage, starting after the first
  104. * @static
  105. * @method coalesceAdjacent
  106. * @param pipelineInst An instance of a Pipeline
  107. */
  108. klass.optimizations.local.coalesceAdjacent = function coalesceAdjacent(pipelineInst) {
  109. var sources = pipelineInst.sources;
  110. if(sources.length === 0) return;
  111. // move all sources to a temporary list
  112. var moveSrc = sources.pop(),
  113. tempSources = [];
  114. while(moveSrc) {
  115. tempSources.unshift(moveSrc);
  116. moveSrc = sources.pop();
  117. }
  118. // move the first one to the final list
  119. sources.push(tempSources[0]);
  120. // run through the sources, coalescing them or keeping them
  121. for(var tempn = tempSources.length, tempi = 1; tempi < tempn; ++tempi) {
  122. // If we can't coalesce the source with the last, then move it
  123. // to the final list, and make it the new last. (If we succeeded,
  124. // then we're still on the same last, and there's no need to move
  125. // or do anything with the source -- the destruction of tempSources
  126. // will take care of the rest.)
  127. var lastSource = sources[sources.length-1],
  128. tempSrc = tempSources[tempi];
  129. if(!(lastSource && tempSrc)) {
  130. throw new Error('Must have a last and current source'); // verify(lastSource && tempSrc);
  131. }
  132. if(!lastSource.coalesce(tempSrc)) sources.push(tempSrc);
  133. }
  134. };
  135. /**
  136. * Iterates over sources in the pipelineInst, optimizing each
  137. * @static
  138. * @method optimizeEachDocumentSource
  139. * @param pipelineInst An instance of a Pipeline
  140. */
  141. klass.optimizations.local.optimizeEachDocumentSource = function optimizeEachDocumentSource(pipelineInst) {
  142. var sources = pipelineInst.sources;
  143. for(var srci = 0, srcn = sources.length; srci < srcn; ++srci) {
  144. sources[srci].optimize();
  145. }
  146. };
  147. /**
  148. * Auto-places a $match before a $redact when the $redact is the first item in a pipeline
  149. * @static
  150. * @method duplicateMatchBeforeInitalRedact
  151. * @param pipelineInst An instance of a Pipeline
  152. */
  153. klass.optimizations.local.duplicateMatchBeforeInitalRedact = function duplicateMatchBeforeInitalRedact(pipelineInst) {
  154. var sources = pipelineInst.sources;
  155. if(sources.length >= 2 && sources[0].constructor === RedactDocumentSource) {
  156. if(sources[1].constructor === MatchDocumentSource) {
  157. var match = sources[1],
  158. redactSafePortion = match.redactSafePortion();
  159. if(Object.keys(redactSafePortion).length > 0) {
  160. sources.shift(MatchDocumentSource.createFromJson(redactSafePortion, pipelineInst.ctx));
  161. }
  162. }
  163. }
  164. };
  165. //SKIPPED: addRequiredPrivileges
  166. /**
  167. * Perform optimizations for a pipeline through sharding
  168. * @method splitForSharded
  169. */
  170. proto.splitForSharded = function splitForSharded() {
  171. var shardPipeline = new Pipeline({});
  172. shardPipeline.explain = this.explain;
  173. klass.optimizations.sharded.findSplitPoint(shardPipeline, this);
  174. klass.optimizations.sharded.moveFinalUnwindFromShardsToMerger(shardPipeline, this);
  175. //klass.optimizations.sharded.limitFieldsSentFromShardsToMerger(shardPipeline, this);
  176. return shardPipeline;
  177. };
  178. /**
  179. * Split the source into Merge sources and Shard sources
  180. * @static
  181. * @method findSplitPoint
  182. * @param shardPipe Shard sources
  183. * @param mergePipe Merge sources
  184. */
  185. klass.optimizations.sharded.findSplitPoint = function findSplitPoint(shardPipe, mergePipe) {
  186. while(mergePipe.sources.length > 0) {
  187. var current = mergePipe.sources[0];
  188. mergePipe.sources.splice(0, 1);
  189. if (current.isSplittable && current.isSplittable()) {
  190. var shardSource = current.getShardSource(),
  191. mergeSource = current.getMergeSource();
  192. //if (typeof shardSource != "undefined") { shardPipe.sources.push(shardSource); } //push_back
  193. if (shardSource) { shardPipe.sources.push(shardSource); } //push_back
  194. //if (typeof mergeSource != "undefined") { mergePipe.sources.unshift(mergeSource); } //push_front
  195. if (mergeSource) { mergePipe.sources.unshift(mergeSource); } //push_front
  196. break;
  197. }
  198. else {
  199. if (!shardPipe.sources) { shardPipe.sources = []; }
  200. shardPipe.sources.push(current);
  201. }
  202. }
  203. };
  204. /**
  205. * Optimize pipeline through moving unwind to the end
  206. * @static
  207. * @method moveFinalUnwindFromShardsToMerger
  208. * @param shardPipe shard sources
  209. * @param mergePipe merge sources
  210. */
  211. klass.optimizations.sharded.moveFinalUnwindFromShardsToMerger = function moveFinalUnwindFromShardsToMerger(shardPipe, mergePipe) {
  212. if (true) {
  213. while(shardPipe.sources !== null && (shardPipe.sources.length > 0 && shardPipe.sources[shardPipe.sources.length-1] instanceof UnwindDocumentSource)) {
  214. mergePipe.sources.unshift(shardPipe.sources.pop());
  215. }
  216. }
  217. };
  218. //SKIPPED: optimizations.sharded.limitFieldsSentFromShardsToMerger. Somehow what this produces is not handled by Expression.js (err 16404)
  219. /**
  220. * Create an `Array` of `DocumentSource`s from the given JSON pipeline
  221. * // NOTE: DEVIATION FROM MONGO: split out into a separate function to better allow extensions (was in parseCommand)
  222. * @static
  223. * @method parseDocumentSources
  224. * @param pipeline {Array} The JSON pipeline
  225. * @returns {Array} The parsed `DocumentSource`s
  226. */
  227. klass.parseDocumentSources = function parseDocumentSources(pipeline, ctx){
  228. var sources = [];
  229. for (var nSteps = pipeline.length, iStep = 0; iStep < nSteps; ++iStep) {
  230. // pull out the pipeline element as an object
  231. var pipeElement = pipeline[iStep];
  232. if (!(pipeElement instanceof Object)) throw new Error("pipeline element " + iStep + " is not an object; code 15942");
  233. var obj = pipeElement;
  234. // Parse a pipeline stage from 'obj'.
  235. if (Object.keys(obj).length !== 1) throw new Error("A pipeline stage specification object must contain exactly one field; code 16435");
  236. var stageName = Object.keys(obj)[0],
  237. stageSpec = obj[stageName];
  238. // Create a DocumentSource pipeline stage from 'stageSpec'.
  239. var desc = klass.stageDesc[stageName];
  240. if (!desc) throw new Error("Unrecognized pipeline stage name: '" + stageName + "'; uassert code 16436");
  241. // Parse the stage
  242. var stage = desc(stageSpec, ctx);
  243. if (!stage) throw new Error("Stage must not be undefined!"); // verify(stage)
  244. sources.push(stage);
  245. if(stage.constructor === OutDocumentSource && iStep !== nSteps - 1) {
  246. throw new Error("$out can only be the final stage in the pipeline; code 16991");
  247. }
  248. }
  249. return sources;
  250. };
  251. /**
  252. * Create a pipeline from the command.
  253. * @static
  254. * @method parseCommand
  255. * @param cmdObj {Object} The command object sent from the client
  256. * @param cmdObj.aggregate {Array} the thing to aggregate against; // NOTE: DEVIATION FROM MONGO: expects an Array of inputs rather than a collection name
  257. * @param cmdObj.pipeline {Object} the JSON pipeline of `DocumentSource` specs
  258. * @param cmdObj.explain {Boolean} should explain?
  259. * @param cmdObj.fromRouter {Boolean} is from router?
  260. * @param cmdObj.splitMongodPipeline {Boolean} should split?
  261. * @param ctx {Object} Not used yet in mungedb-aggregate
  262. * @returns {Array} the pipeline, if created, otherwise a NULL reference
  263. */
  264. klass.parseCommand = function parseCommand(cmdObj, ctx){
  265. var pipelineNamespace = require("./"),
  266. Pipeline = pipelineNamespace.Pipeline, // using require in case Pipeline gets replaced with an extension
  267. pipelineInst = new Pipeline(ctx);
  268. //gather the specification for the aggregation
  269. var pipeline;
  270. for(var fieldName in cmdObj){
  271. var cmdElement = cmdObj[fieldName];
  272. if(fieldName[0] == "$") continue;
  273. else if(fieldName == "cursor") continue;
  274. else if(fieldName == klass.COMMAND_NAME) continue; //look for the aggregation command
  275. else if(fieldName == klass.BATCH_SIZE_NAME) continue;
  276. else if(fieldName == klass.PIPELINE_NAME) pipeline = cmdElement; //check for the pipeline of JSON doc srcs
  277. else if(fieldName == klass.EXPLAIN_NAME) pipelineInst.explain = cmdElement; //check for explain option
  278. else if(fieldName == klass.FROM_ROUTER_NAME) ctx.inShard = cmdElement; //if the request came from the router, we're in a shard
  279. else if(fieldName == "allowDiskUsage") {
  280. if(typeof cmdElement !== 'boolean') throw new Error("allowDiskUsage must be a bool, not a " + typeof allowDiskUsage+ "; uassert code 16949");
  281. }
  282. else throw new Error("unrecognized field " + JSON.stringify(fieldName));
  283. }
  284. /**
  285. * If we get here, we've harvested the fields we expect for a pipeline
  286. * Set up the specified document source pipeline.
  287. */
  288. // NOTE: DEVIATION FROM MONGO: split this into a separate function to simplify and better allow for extensions (now in parseDocumentSources)
  289. pipelineInst.sources = Pipeline.parseDocumentSources(pipeline, ctx);
  290. klass.optimizations.local.moveMatchBeforeSort(pipelineInst);
  291. klass.optimizations.local.moveLimitBeforeSkip(pipelineInst);
  292. klass.optimizations.local.coalesceAdjacent(pipelineInst);
  293. klass.optimizations.local.optimizeEachDocumentSource(pipelineInst);
  294. klass.optimizations.local.duplicateMatchBeforeInitalRedact(pipelineInst);
  295. return pipelineInst;
  296. };
  297. function ifError(err) {
  298. if (err) throw err;
  299. }
  300. /**
  301. * Gets the initial $match query when $match is the first pipeline stage
  302. * @method run
  303. * @param inputSource {DocumentSource} The input document source for the pipeline
  304. * @param [callback] {Function} Optional callback function if using async extensions
  305. * @return {Object} An empty object or the match spec
  306. */
  307. proto.getInitialQuery = function getInitialQuery() {
  308. var sources = this.sources;
  309. if(sources.length === 0) {
  310. return {};
  311. }
  312. /* look for an initial $match */
  313. var match = sources[0].constructor === MatchDocumentSource ? sources[0] : undefined;
  314. if(!match) return {};
  315. return match.getQuery();
  316. };
  317. /**
  318. * Creates the JSON representation of the pipeline
  319. * @method run
  320. * @param inputSource {DocumentSource} The input document source for the pipeline
  321. * @param [callback] {Function} Optional callback function if using async extensions
  322. * @return {Object} An empty object or the match spec
  323. */
  324. proto.serialize = function serialize() {
  325. var serialized = {},
  326. array = [];
  327. // create an array out of the pipeline operations
  328. if (this.sources) {
  329. for (var i = 0; i < this.sources.length; i++) {
  330. //this.sources.forEach(function(source) {
  331. this.sources[i].serializeToArray(array);
  332. }
  333. }
  334. serialized[klass.COMMAND_NAME] = this.ctx && this.ctx.ns && this.ctx.ns.coll ? this.ctx.ns.coll : '';
  335. serialized[klass.PIPELINE_NAME] = array;
  336. if(this.explain) serialized[klass.EXPLAIN_NAME] = this.explain;
  337. return serialized;
  338. };
  339. /**
  340. * Points each source at its previous source
  341. * @method stitch
  342. */
  343. proto.stitch = function stitch() {
  344. if(this.sources.length <= 0) throw new Error("should not have an empty pipeline; massert code 16600");
  345. /* chain together the sources we found */
  346. var prevSource = this.sources[0];
  347. for(var srci = 1, srcn = this.sources.length; srci < srcn; srci++) {
  348. var tempSource = this.sources[srci];
  349. tempSource.setSource(prevSource);
  350. prevSource = tempSource;
  351. }
  352. };
  353. /**
  354. * Run the pipeline
  355. * @method run
  356. * @param callback {Function} gets called once for each document result from the pipeline
  357. */
  358. proto.run = function run(callback) {
  359. // should not get here in the explain case
  360. if(this.explain) throw new Error("Should not be running a pipeline in explain mode!");
  361. var doc = null,
  362. error = null,
  363. finalSource = this._getFinalSource();
  364. async.doWhilst(
  365. function iterator(next){
  366. return finalSource.getNext(function (err, obj){
  367. callback(err, obj);
  368. doc = obj;
  369. error = err;
  370. next();
  371. });
  372. },
  373. function test(){
  374. return doc !== null && !error;
  375. },
  376. function done(err){
  377. //nothing to do here
  378. });
  379. };
  380. /**
  381. * Get the last document source in the pipeline
  382. * @method _getFinalSource
  383. * @return {Object} The DocumentSource at the end of the pipeline
  384. * @private
  385. */
  386. proto._getFinalSource = function _getFinalSource() {
  387. return this.sources[this.sources.length - 1];
  388. };
  389. /**
  390. * Get the pipeline explanation
  391. * @method writeExplainOps
  392. * @return {Array} An array of source explanations
  393. */
  394. proto.writeExplainOps = function writeExplainOps() {
  395. var array = [];
  396. this.sources.forEach(function(source) {
  397. source.serializeToArray(array, /*explain=*/true);
  398. });
  399. return array;
  400. };
  401. /**
  402. * Set the source of documents for the pipeline
  403. * @method addInitialSource
  404. * @param source {DocumentSource}
  405. */
  406. proto.addInitialSource = function addInitialSource(source) {
  407. this.sources.unshift(source);
  408. };
  409. //SKIPPED: canRunInMongos
  410. //Note: Deviation from Mongo: Mongo 2.6.5 passes a param to getDependencies
  411. // to calculate TextScore. mungedb-aggregate doesn't do this, so no param is needed.
  412. proto.getDependencies = function getDependencies () {
  413. var deps = new DepsTracker(),
  414. knowAllFields = false;
  415. //NOTE: Deviation from Mongo -- We aren't using Meta and textscore
  416. for (var i=0; i < this.sources.length && !knowAllFields; i++) {
  417. var localDeps = new DepsTracker(),
  418. status = this.sources[i].getDependencies(localDeps);
  419. if (status === DocumentSource.GetDepsReturn.NOT_SUPPORTED) {
  420. // Assume this stage needs everything. We may still know something about our
  421. // dependencies if an earlier stage returned either EXHAUSTIVE_FIELDS or
  422. // EXHAUSTIVE_META.
  423. break;
  424. }
  425. if (!knowAllFields) {
  426. for (var key in localDeps.fields)
  427. deps.fields[key] = localDeps.fields[key];
  428. if (localDeps.needWholeDocument)
  429. deps.needWholeDocument = true;
  430. knowAllFields = status & DocumentSource.GetDepsReturn.EXHAUSTIVE_FIELDS;
  431. }
  432. }
  433. if (!knowAllFields)
  434. deps.needWholeDocument = true; // don't know all fields we need
  435. return deps;
  436. };