Pipeline.js 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504
  1. "use strict";
  2. var async = require("neo-async"),
  3. DepsTracker = require("./DepsTracker"),
  4. documentSources = require("./documentSources/"),
  5. DocumentSource = documentSources.DocumentSource,
  6. LimitDocumentSource = documentSources.LimitDocumentSource,
  7. MatchDocumentSource = documentSources.MatchDocumentSource,
  8. ProjectDocumentSource = documentSources.ProjectDocumentSource,
  9. SkipDocumentSource = documentSources.SkipDocumentSource,
  10. UnwindDocumentSource = documentSources.UnwindDocumentSource,
  11. GroupDocumentSource = documentSources.GroupDocumentSource,
  12. OutDocumentSource = documentSources.OutDocumentSource,
  13. GeoNearDocumentSource = documentSources.GeoNearDocumentSource,
  14. RedactDocumentSource = documentSources.RedactDocumentSource,
  15. SortDocumentSource = documentSources.SortDocumentSource;
  16. /**
  17. * mongodb "commands" (sent via db.$cmd.findOne(...)) subclass to make a command. define a singleton object for it.
  18. * @class Pipeline
  19. * @namespace mungedb-aggregate.pipeline
  20. * @module mungedb-aggregate
  21. * @constructor
  22. **/
  23. var Pipeline = module.exports = function Pipeline(theCtx){
  24. this.sources = null;
  25. this.explain = false;
  26. this.splitMongodPipeline = false;
  27. this.ctx = theCtx;
  28. }, klass = Pipeline, proto = klass.prototype;
  29. klass.COMMAND_NAME = "aggregate";
  30. klass.PIPELINE_NAME = "pipeline";
  31. klass.EXPLAIN_NAME = "explain";
  32. klass.FROM_ROUTER_NAME = "fromRouter";
  33. klass.SERVER_PIPELINE_NAME = "serverPipeline";
  34. klass.MONGOS_PIPELINE_NAME = "mongosPipeline";
  35. klass.BATCH_SIZE_NAME = "batchSize";
  36. klass.stageDesc = {}; //NOTE: DEVIATION FROM MONGO: attaching to the class to make it easier to test and extend
  37. klass.stageDesc[GeoNearDocumentSource.geoNearName] = GeoNearDocumentSource.createFromJson;
  38. klass.stageDesc[GroupDocumentSource.groupName] = GroupDocumentSource.createFromJson;
  39. klass.stageDesc[LimitDocumentSource.limitName] = LimitDocumentSource.createFromJson;
  40. klass.stageDesc[MatchDocumentSource.matchName] = MatchDocumentSource.createFromJson;
  41. //SKIPPED: klass.stageDesc[MergeCursorsDocumentSource.name] = MergeCursorsDocumentSource.createFromJson;
  42. klass.stageDesc[OutDocumentSource.outName] = OutDocumentSource.createFromJson;
  43. klass.stageDesc[ProjectDocumentSource.projectName] = ProjectDocumentSource.createFromJson;
  44. klass.stageDesc[RedactDocumentSource.redactName] = ProjectDocumentSource.createFromJson;
  45. klass.stageDesc[SkipDocumentSource.skipName] = SkipDocumentSource.createFromJson;
  46. klass.stageDesc[SortDocumentSource.sortName] = SortDocumentSource.createFromJson;
  47. klass.stageDesc[UnwindDocumentSource.unwindName] = UnwindDocumentSource.createFromJson;
  48. // klass.nStageDesc = Object.keys(klass.stageDesc).length; //NOTE: DEVIATION FROM MONGO: not using this to bsearch these in `.parseCommand`
  49. klass.optimizations = {};
  50. klass.optimizations.local = {};
  51. klass.optimizations.sharded = {};
  52. /**
  53. * Moves $match before $sort when they are placed next to one another
  54. * @static
  55. * @method moveMatchBeforeSort
  56. * @param pipelineInst An instance of a Pipeline
  57. */
  58. klass.optimizations.local.moveMatchBeforeSort = function moveMatchBeforeSort(pipelineInst) {
  59. var sources = pipelineInst.sources;
  60. for(var srcn = sources.length, srci = 1; srci < srcn; ++srci) {
  61. var source = sources[srci];
  62. if(source.constructor === MatchDocumentSource) {
  63. var previous = sources[srci - 1];
  64. if(previous && previous.constructor === SortDocumentSource) { //Added check that previous exists
  65. /* swap this item with the previous */
  66. sources[srci] = previous;
  67. sources[srci-1] = source;
  68. }
  69. }
  70. }
  71. };
  72. /**
  73. * Moves $limit before $skip when they are placed next to one another
  74. * @static
  75. * @method moveLimitBeforeSkip
  76. * @param pipelineInst An instance of a Pipeline
  77. */
  78. klass.optimizations.local.moveLimitBeforeSkip = function moveLimitBeforeSkip(pipelineInst) {
  79. var sources = pipelineInst.sources;
  80. if(sources.length === 0) return;
  81. for(var i = sources.length - 1; i >= 1 /* not looking at 0 */; i--) {
  82. var limit = sources[i].constructor === LimitDocumentSource ? sources[i] : undefined,
  83. skip = sources[i-1].constructor === SkipDocumentSource ? sources[i-1] : undefined;
  84. if(limit && skip) {
  85. limit.setLimit(limit.getLimit() + skip.getSkip());
  86. sources[i-1] = limit;
  87. sources[i] = skip;
  88. // Start at back again. This is needed to handle cases with more than 1 $limit
  89. // (S means skip, L means limit)
  90. //
  91. // These two would work without second pass (assuming back to front ordering)
  92. // SL -> LS
  93. // SSL -> LSS
  94. //
  95. // The following cases need a second pass to handle the second limit
  96. // SLL -> LLS
  97. // SSLL -> LLSS
  98. // SLSL -> LLSS
  99. i = sources.length; // decremented before next pass
  100. }
  101. }
  102. };
  103. /**
  104. * Attempts to coalesce every pipeline stage into the previous pipeline stage, starting after the first
  105. * @static
  106. * @method coalesceAdjacent
  107. * @param pipelineInst An instance of a Pipeline
  108. */
  109. klass.optimizations.local.coalesceAdjacent = function coalesceAdjacent(pipelineInst) {
  110. var sources = pipelineInst.sources;
  111. if(sources.length === 0) return;
  112. // move all sources to a temporary list
  113. var moveSrc = sources.pop(),
  114. tempSources = [];
  115. while(moveSrc) {
  116. tempSources.unshift(moveSrc);
  117. moveSrc = sources.pop();
  118. }
  119. // move the first one to the final list
  120. sources.push(tempSources[0]);
  121. // run through the sources, coalescing them or keeping them
  122. for(var tempn = tempSources.length, tempi = 1; tempi < tempn; ++tempi) {
  123. // If we can't coalesce the source with the last, then move it
  124. // to the final list, and make it the new last. (If we succeeded,
  125. // then we're still on the same last, and there's no need to move
  126. // or do anything with the source -- the destruction of tempSources
  127. // will take care of the rest.)
  128. var lastSource = sources[sources.length-1],
  129. tempSrc = tempSources[tempi];
  130. if(!(lastSource && tempSrc)) {
  131. throw new Error("Must have a last and current source"); // verify(lastSource && tempSrc);
  132. }
  133. if(!lastSource.coalesce(tempSrc)) sources.push(tempSrc);
  134. }
  135. };
  136. /**
  137. * Iterates over sources in the pipelineInst, optimizing each
  138. * @static
  139. * @method optimizeEachDocumentSource
  140. * @param pipelineInst An instance of a Pipeline
  141. */
  142. klass.optimizations.local.optimizeEachDocumentSource = function optimizeEachDocumentSource(pipelineInst) {
  143. var sources = pipelineInst.sources;
  144. for(var srci = 0, srcn = sources.length; srci < srcn; ++srci) {
  145. sources[srci].optimize();
  146. }
  147. };
  148. /**
  149. * Auto-places a $match before a $redact when the $redact is the first item in a pipeline
  150. * @static
  151. * @method duplicateMatchBeforeInitalRedact
  152. * @param pipelineInst An instance of a Pipeline
  153. */
  154. klass.optimizations.local.duplicateMatchBeforeInitalRedact = function duplicateMatchBeforeInitalRedact(pipelineInst) {
  155. var sources = pipelineInst.sources;
  156. if(sources.length >= 2 && sources[0].constructor === RedactDocumentSource) {
  157. if(sources[1].constructor === MatchDocumentSource) {
  158. var match = sources[1],
  159. redactSafePortion = match.redactSafePortion();
  160. if(Object.keys(redactSafePortion).length > 0) {
  161. sources.shift(MatchDocumentSource.createFromJson(redactSafePortion, pipelineInst.ctx));
  162. }
  163. }
  164. }
  165. };
  166. //SKIPPED: addRequiredPrivileges
  167. /**
  168. * Perform optimizations for a pipeline through sharding
  169. * @method splitForSharded
  170. */
  171. proto.splitForSharded = function splitForSharded() {
  172. var shardPipeline = new Pipeline({});
  173. shardPipeline.explain = this.explain;
  174. klass.optimizations.sharded.findSplitPoint(shardPipeline, this);
  175. klass.optimizations.sharded.moveFinalUnwindFromShardsToMerger(shardPipeline, this);
  176. //klass.optimizations.sharded.limitFieldsSentFromShardsToMerger(shardPipeline, this);
  177. return shardPipeline;
  178. };
  179. /**
  180. * Split the source into Merge sources and Shard sources
  181. * @static
  182. * @method findSplitPoint
  183. * @param shardPipe Shard sources
  184. * @param mergePipe Merge sources
  185. */
  186. klass.optimizations.sharded.findSplitPoint = function findSplitPoint(shardPipe, mergePipe) {
  187. while(mergePipe.sources.length > 0) {
  188. var current = mergePipe.sources[0];
  189. mergePipe.sources.splice(0, 1);
  190. if (current.isSplittable && current.isSplittable()) {
  191. var shardSource = current.getShardSource(),
  192. mergeSource = current.getMergeSource();
  193. //if (typeof shardSource != "undefined") { shardPipe.sources.push(shardSource); } //push_back
  194. if (shardSource) { shardPipe.sources.push(shardSource); } //push_back
  195. //if (typeof mergeSource != "undefined") { mergePipe.sources.unshift(mergeSource); } //push_front
  196. if (mergeSource) { mergePipe.sources.unshift(mergeSource); } //push_front
  197. break;
  198. }
  199. else {
  200. if (!shardPipe.sources) { shardPipe.sources = []; }
  201. shardPipe.sources.push(current);
  202. }
  203. }
  204. };
  205. /**
  206. * Optimize pipeline through moving unwind to the end
  207. * @static
  208. * @method moveFinalUnwindFromShardsToMerger
  209. * @param shardPipe shard sources
  210. * @param mergePipe merge sources
  211. */
  212. klass.optimizations.sharded.moveFinalUnwindFromShardsToMerger = function moveFinalUnwindFromShardsToMerger(shardPipe, mergePipe) {
  213. if (true) {
  214. while (shardPipe.sources !== null &&
  215. shardPipe.sources.length > 0 &&
  216. shardPipe.sources[shardPipe.sources.length - 1] instanceof UnwindDocumentSource) {
  217. mergePipe.sources.unshift(shardPipe.sources.pop());
  218. }
  219. }
  220. };
  221. //SKIPPED: optimizations.sharded.limitFieldsSentFromShardsToMerger. Somehow what this produces is not handled by Expression.js (err 16404)
  222. /**
  223. * Create an `Array` of `DocumentSource`s from the given JSON pipeline
  224. * // NOTE: DEVIATION FROM MONGO: split out into a separate function to better allow extensions (was in parseCommand)
  225. * @static
  226. * @method parseDocumentSources
  227. * @param pipeline {Array} The JSON pipeline
  228. * @returns {Array} The parsed `DocumentSource`s
  229. */
  230. klass.parseDocumentSources = function parseDocumentSources(pipeline, ctx){
  231. var sources = [];
  232. for (var nSteps = pipeline.length, iStep = 0; iStep < nSteps; ++iStep) {
  233. // pull out the pipeline element as an object
  234. var pipeElement = pipeline[iStep];
  235. if (!(pipeElement instanceof Object)) throw new Error("pipeline element " + iStep + " is not an object; code 15942");
  236. var obj = pipeElement;
  237. // Parse a pipeline stage from 'obj'.
  238. if (Object.keys(obj).length !== 1)
  239. throw new Error("A pipeline stage specification object must contain exactly one field; code 16435");
  240. var stageName = Object.keys(obj)[0],
  241. stageSpec = obj[stageName];
  242. // Create a DocumentSource pipeline stage from 'stageSpec'.
  243. var desc = klass.stageDesc[stageName];
  244. if (!desc) throw new Error("Unrecognized pipeline stage name: '" + stageName + "'; uassert code 16436");
  245. // Parse the stage
  246. var stage = desc(stageSpec, ctx);
  247. if (!stage) throw new Error("Stage must not be undefined!"); // verify(stage)
  248. sources.push(stage);
  249. if(stage.constructor === OutDocumentSource && iStep !== nSteps - 1) {
  250. throw new Error("$out can only be the final stage in the pipeline; code 16991");
  251. }
  252. }
  253. return sources;
  254. };
  255. /**
  256. * Create a pipeline from the command.
  257. * @static
  258. * @method parseCommand
  259. * @param cmdObj {Object} The command object sent from the client
  260. * @param cmdObj.aggregate {Array} the thing to aggregate against // NOTE: DEVIATION FROM MONGO: not a collection name
  261. * @param cmdObj.pipeline {Object} the JSON pipeline of `DocumentSource` specs
  262. * @param cmdObj.explain {Boolean} should explain?
  263. * @param cmdObj.fromRouter {Boolean} is from router?
  264. * @param cmdObj.splitMongodPipeline {Boolean} should split?
  265. * @param ctx {Object} Not used yet in mungedb-aggregate
  266. * @returns {Array} the pipeline, if created, otherwise a NULL reference
  267. */
  268. klass.parseCommand = function parseCommand(cmdObj, ctx){
  269. var pipelineNamespace = require("./"),
  270. Pipeline = pipelineNamespace.Pipeline, // using require in case Pipeline gets replaced with an extension
  271. pipelineInst = new Pipeline(ctx);
  272. //gather the specification for the aggregation
  273. var pipeline;
  274. for (var fieldName in cmdObj) { //jshint ignore:line
  275. var cmdElement = cmdObj[fieldName];
  276. if (fieldName[0] === "$")
  277. continue;
  278. else if (fieldName === "cursor")
  279. continue;
  280. else if (fieldName === klass.COMMAND_NAME)
  281. continue; //look for the aggregation command
  282. else if (fieldName === klass.BATCH_SIZE_NAME)
  283. continue;
  284. else if (fieldName === klass.PIPELINE_NAME)
  285. pipeline = cmdElement; //check for the pipeline of JSON doc srcs
  286. else if (fieldName === klass.EXPLAIN_NAME)
  287. pipelineInst.explain = cmdElement; //check for explain option
  288. else if (fieldName === klass.FROM_ROUTER_NAME)
  289. ctx.inShard = cmdElement; //if the request came from the router, we're in a shard
  290. else if (fieldName === "allowDiskUsage") {
  291. if (typeof cmdElement !== "boolean")
  292. throw new Error("allowDiskUsage must be a bool, not a " + typeof allowDiskUsage + "; uassert code 16949");
  293. } else
  294. throw new Error("unrecognized field " + JSON.stringify(fieldName));
  295. }
  296. /**
  297. * If we get here, we've harvested the fields we expect for a pipeline
  298. * Set up the specified document source pipeline.
  299. */
  300. // NOTE: DEVIATION FROM MONGO: split this into a separate function to simplify facilitate extensions (now in parseDocumentSources)
  301. pipelineInst.sources = Pipeline.parseDocumentSources(pipeline, ctx);
  302. // The order in which optimizations are applied can have significant impact on the
  303. // efficiency of the final pipeline. Be Careful!
  304. klass.optimizations.local.moveMatchBeforeSort(pipelineInst);
  305. klass.optimizations.local.moveLimitBeforeSkip(pipelineInst);
  306. klass.optimizations.local.coalesceAdjacent(pipelineInst);
  307. klass.optimizations.local.optimizeEachDocumentSource(pipelineInst);
  308. klass.optimizations.local.duplicateMatchBeforeInitalRedact(pipelineInst);
  309. return pipelineInst;
  310. };
  311. /**
  312. * Gets the initial $match query when $match is the first pipeline stage
  313. * @method run
  314. * @param inputSource {DocumentSource} The input document source for the pipeline
  315. * @param [callback] {Function} Optional callback function if using async extensions
  316. * @return {Object} An empty object or the match spec
  317. */
  318. proto.getInitialQuery = function getInitialQuery() {
  319. var sources = this.sources;
  320. if (sources.length === 0)
  321. return {};
  322. /* look for an initial $match */
  323. var match = sources[0] instanceof MatchDocumentSource ? sources[0] : undefined;
  324. if (!match)
  325. return {};
  326. return match.getQuery();
  327. };
  328. /**
  329. * Creates the JSON representation of the pipeline
  330. * @method run
  331. * @param inputSource {DocumentSource} The input document source for the pipeline
  332. * @param [callback] {Function} Optional callback function if using async extensions
  333. * @return {Object} An empty object or the match spec
  334. */
  335. proto.serialize = function serialize() {
  336. var serialized = {},
  337. array = [];
  338. // create an array out of the pipeline operations
  339. if (this.sources) {
  340. for (var i = 0; i < this.sources.length; i++) {
  341. //this.sources.forEach(function(source) {
  342. this.sources[i].serializeToArray(array);
  343. }
  344. }
  345. serialized[klass.COMMAND_NAME] = this.ctx && this.ctx.ns && this.ctx.ns.coll ? this.ctx.ns.coll : "";
  346. serialized[klass.PIPELINE_NAME] = array;
  347. if(this.explain) serialized[klass.EXPLAIN_NAME] = this.explain;
  348. return serialized;
  349. };
  350. /**
  351. * Points each source at its previous source
  352. * @method stitch
  353. */
  354. proto.stitch = function stitch() {
  355. if(this.sources.length <= 0) throw new Error("should not have an empty pipeline; massert code 16600");
  356. /* chain together the sources we found */
  357. var prevSource = this.sources[0];
  358. for(var srci = 1, srcn = this.sources.length; srci < srcn; srci++) {
  359. var tempSource = this.sources[srci];
  360. tempSource.setSource(prevSource);
  361. prevSource = tempSource;
  362. }
  363. };
  364. /**
  365. * Run the pipeline
  366. * @method run
  367. * @param [isAsync] {Boolean} whether or not to use setImmediate to force async calls (to avoid stack overflows)
  368. * @param callback {Function} gets called once for each document result from the pipeline
  369. */
  370. proto.run = function run(isAsync, callback) {
  371. if (typeof isAsync === "function") callback = isAsync, isAsync = true;
  372. // should not get here in the explain case
  373. if (this.explain) return callback(new Error("Assertion error: don't run pipeline in explain mode"));
  374. var nextDoc = null,
  375. finalSource = this.sources[this.sources.length - 1];
  376. async.doWhilst(
  377. function iterator(next) {
  378. return finalSource.getNext(function(err, doc) {
  379. if (isAsync) {
  380. async.setImmediate(function() {
  381. nextDoc = doc;
  382. callback(err, nextDoc);
  383. next(err);
  384. });
  385. } else { // sync mode; only for small sets, stack overflow on large sets
  386. nextDoc = doc;
  387. callback(err, nextDoc);
  388. next(err);
  389. }
  390. });
  391. },
  392. function test() {
  393. return nextDoc !== null;
  394. },
  395. function done(err) {
  396. //nothing to do here
  397. }
  398. );
  399. };
  400. /**
  401. * Get the pipeline explanation
  402. * @method writeExplainOps
  403. * @return {Array} An array of source explanations
  404. */
  405. proto.writeExplainOps = function writeExplainOps() {
  406. var array = [];
  407. this.sources.forEach(function(source) {
  408. source.serializeToArray(array, /*explain=*/true);
  409. });
  410. return array;
  411. };
  412. /**
  413. * Set the source of documents for the pipeline
  414. * @method addInitialSource
  415. * @param source {DocumentSource}
  416. */
  417. proto.addInitialSource = function addInitialSource(source) {
  418. this.sources.unshift(source);
  419. };
  420. //SKIPPED: canRunInMongos
  421. //Note: Deviation from Mongo: Mongo 2.6.5 passes a param to getDependencies
  422. // to calculate TextScore. mungedb-aggregate doesn't do this, so no param is needed.
  423. proto.getDependencies = function getDependencies () {
  424. var deps = new DepsTracker(),
  425. knowAllFields = false;
  426. //NOTE: Deviation from Mongo -- We aren't using Meta and textscore
  427. for (var i = 0; i < this.sources.length && !knowAllFields; i++) {
  428. var localDeps = new DepsTracker(),
  429. status = this.sources[i].getDependencies(localDeps);
  430. if (status === DocumentSource.GetDepsReturn.NOT_SUPPORTED) {
  431. // Assume this stage needs everything. We may still know something about our
  432. // dependencies if an earlier stage returned either EXHAUSTIVE_FIELDS or
  433. // EXHAUSTIVE_META.
  434. break;
  435. }
  436. if (!knowAllFields) {
  437. for (var key in localDeps.fields) //jshint ignore:line
  438. deps.fields[key] = localDeps.fields[key];
  439. if (localDeps.needWholeDocument)
  440. deps.needWholeDocument = true;
  441. knowAllFields = status & DocumentSource.GetDepsReturn.EXHAUSTIVE_FIELDS;
  442. }
  443. }
  444. if (!knowAllFields)
  445. deps.needWholeDocument = true; // don't know all fields we need
  446. return deps;
  447. };