PipelineD.js 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. "use strict";
  2. /**
  3. * Pipeline helper for reading data
  4. * @class PipelineD
  5. * @namespace mungedb-aggregate.pipeline
  6. * @module mungedb-aggregate
  7. * @constructor
  8. **/
  9. var PipelineD = module.exports = function PipelineD(){
  10. if(this.constructor == PipelineD) throw new Error("Never create instances of this! Use the static helpers only.");
  11. }, klass = PipelineD, base = Object, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
  12. // DEPENDENCIES
  13. var DocumentSource = require('./documentSources/DocumentSource'),
  14. CursorDocumentSource = require('./documentSources/CursorDocumentSource'),
  15. SortDocumentSource = require('./documentSources/SortDocumentSource'),
  16. MatchDocumentSource = require('./documentSources/MatchDocumentSource'),
  17. getRunner = require('../query').getRunner;
  18. /**
  19. * Create a Cursor wrapped in a DocumentSourceCursor, which is suitable to be the first source for a pipeline to begin with.
  20. * This source will feed the execution of the pipeline.
  21. *
  22. * //NOTE: Not doing anything here, as we don't use any of these cursor source features
  23. * //NOTE: DEVIATION FROM THE MONGO: We don't have special optimized cursors; You could support something similar by overriding `Pipeline#run` to call `DocumentSource#coalesce` on the `inputSource` if you really need it.
  24. *
  25. * This method looks for early pipeline stages that can be folded into
  26. * the underlying cursor, and when a cursor can absorb those, they
  27. * are removed from the head of the pipeline. For example, an
  28. * early match can be removed and replaced with a Cursor that will
  29. * do an index scan.
  30. *
  31. * @param pipeline {Pipeline} the logical "this" for this operation
  32. * @param ctx {Object} Context for expressions
  33. * @returns {CursorDocumentSource} the cursor that was created
  34. **/
  35. klass.prepareCursorSource = function prepareCursorSource(pipeline, expCtx){
  36. // We will be modifying the source vector as we go
  37. var sources = pipeline.sources;
  38. // Inject a MongodImplementation to sources that need them.
  39. // NOTE: SKIPPED
  40. // Don't modify the pipeline if we got a DocumentSourceMergeCursor
  41. // NOTE: SKIPPED
  42. // Look for an initial match. This works whether we got an initial query or not.
  43. // If not, it results in a "{}" query, which will be what we want in that case.
  44. var queryObj = pipeline.getInitialQuery(),
  45. match;
  46. if (queryObj && queryObj instanceof Object && Object.keys(queryObj).length) {
  47. // This will get built in to the Cursor we'll create, so
  48. // remove the match from the pipeline
  49. match = sources.shift();
  50. }
  51. // Find the set of fields in the source documents depended on by this pipeline.
  52. var deps = pipeline.getDependencies(queryObj);
  53. // Passing query an empty projection since it is faster to use ParsedDeps::extractFields().
  54. // This will need to change to support covering indexes (SERVER-12015). There is an
  55. // exception for textScore since that can only be retrieved by a query projection.
  56. var projectionForQuery = deps.needTextScore ? deps.toProjection() : {};
  57. /*
  58. Look for an initial sort; we'll try to add this to the
  59. Cursor we create. If we're successful in doing that (further down),
  60. we'll remove the $sort from the pipeline, because the documents
  61. will already come sorted in the specified order as a result of the
  62. index scan.
  63. */
  64. var sortStage,
  65. sortObj,
  66. sortInRunner = false;
  67. if (sources.length) {
  68. sortStage = sources[0] instanceof SortDocumentSource ? sources[0] : undefined;
  69. if (sortStage) {
  70. // build the sort key
  71. sortObj = sortStage.serializeSortKey(/*explain*/false);
  72. sortInRunner = true;
  73. }
  74. }
  75. //munge deviation: the runner is (usually) not actually handling the initial query, so we need to add it back to the pipeline
  76. if (match){
  77. sources.unshift(match);
  78. }
  79. // Create the Runner.
  80. // NOTE: the logic here is simplified for munge
  81. var runner = getRunner(expCtx.ns, queryObj, sortObj, projectionForQuery, sources);
  82. // DocumentSourceCursor expects a yielding Runner that has had its state saved.
  83. //runner.setYieldPolicy(Runner.RunnerState.YIELD_AUTO); //Skipped as we don't really support yielding yet
  84. runner.saveState();
  85. // Put the Runner into a DocumentSourceCursor and add it to the front of the pipeline.
  86. var source = new CursorDocumentSource("", runner, expCtx);
  87. // Note the query, sort, and projection for explain.
  88. source.setQuery(queryObj);
  89. if (sortInRunner)
  90. source.setSort(sortObj);
  91. source.setProjection(deps.toProjection(), deps.toParsedDeps());
  92. while (sources.length && source.coalesce(sources[0])) {
  93. sources.shift();
  94. }
  95. pipeline.addInitialSource(source);
  96. return runner;
  97. };