PipelineD.js 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. "use strict";
  2. /**
  3. * Pipeline helper for reading data
  4. * @class PipelineD
  5. * @namespace mungedb-aggregate.pipeline
  6. * @module mungedb-aggregate
  7. * @constructor
  8. **/
  9. var PipelineD = module.exports = function PipelineD(){
  10. if(this.constructor == PipelineD) throw new Error("Never create instances of this! Use the static helpers only.");
  11. }, klass = PipelineD, base = Object, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
  12. // DEPENDENCIES
  13. var DocumentSource = require('./documentSources/DocumentSource'),
  14. CursorDocumentSource = require('./documentSources/CursorDocumentSource'),
  15. SortDocumentSource = require('./documentSources/SortDocumentSource'),
  16. MatchDocumentSource = require('./documentSources/MatchDocumentSource'),
  17. Cursor = require('../query/ArrayRunner');
  18. /**
  19. * Create a Cursor wrapped in a DocumentSourceCursor, which is suitable to be the first source for a pipeline to begin with.
  20. * This source will feed the execution of the pipeline.
  21. *
  22. * //NOTE: Not doing anything here, as we don't use any of these cursor source features
  23. * //NOTE: DEVIATION FROM THE MONGO: We don't have special optimized cursors; You could support something similar by overriding `Pipeline#run` to call `DocumentSource#coalesce` on the `inputSource` if you really need it.
  24. *
  25. * This method looks for early pipeline stages that can be folded into
  26. * the underlying cursor, and when a cursor can absorb those, they
  27. * are removed from the head of the pipeline. For example, an
  28. * early match can be removed and replaced with a Cursor that will
  29. * do an index scan.
  30. *
  31. * @param pipeline {Pipeline} the logical "this" for this operation
  32. * @param ctx {Object} Context for expressions
  33. * @returns {CursorDocumentSource} the cursor that was created
  34. **/
  35. klass.prepareCursorSource = function prepareCursorSource(pipeline, expCtx){
  36. // get the full "namespace" name
  37. var data = expCtx.ns; //NOTE: ns will likely be either an array of documents or a document source in munge
  38. // We will be modifying the source vector as we go
  39. var sources = pipeline.sources;
  40. // Inject a MongodImplementation to sources that need them.
  41. // NOTE: SKIPPED
  42. // Don't modify the pipeline if we got a DocumentSourceMergeCursor
  43. // NOTE: SKIPPED
  44. // Look for an initial match. This works whether we got an initial query or not.
  45. // If not, it results in a "{}" query, which will be what we want in that case.
  46. var queryObj = pipeline.getInitialQuery();
  47. if (queryObj && queryObj instanceof Object && Object.keys(queryObj).length) {
  48. // This will get built in to the Cursor we'll create, so
  49. // remove the match from the pipeline
  50. // NOTE: SKIPPED
  51. }
  52. // Find the set of fields in the source documents depended on by this pipeline.
  53. var deps = pipeline.getDependencies(queryObj);
  54. // Passing query an empty projection since it is faster to use ParsedDeps::extractFields().
  55. // This will need to change to support covering indexes (SERVER-12015). There is an
  56. // exception for textScore since that can only be retrieved by a query projection.
  57. var projectionForQuery = deps.needTextScore ? deps.toProjection() : {};
  58. /*
  59. Look for an initial sort; we'll try to add this to the
  60. Cursor we create. If we're successful in doing that (further down),
  61. we'll remove the $sort from the pipeline, because the documents
  62. will already come sorted in the specified order as a result of the
  63. index scan.
  64. */
  65. var sortStorage,
  66. sortObj,
  67. sortInRunner = false;
  68. if (sources.length) {
  69. sortStage = sources[0] instanceof SortDocumentSource ? sources[0] : undefined;
  70. //need to check the next source since we are not deleting the initial match in munge
  71. if (!sortStorage && sources[0] instanceof MatchDocumentSource){
  72. sortStage = sources[1] instanceof SortDocumentSource ? sources[1] : undefined;
  73. }
  74. if (sortStage) {
  75. // build the sort key
  76. sortObj = sortStage.serializeSortKey(/*explain*/false);
  77. sortInRunner = true;
  78. }
  79. }
  80. // Create the Runner.
  81. // NOTE: the logic here is munge specific
  82. var runner;
  83. if (data.constructor === Array) {
  84. runner = new ArrayRunner(data);
  85. } else if (data instanceof DocumentSource) {
  86. //do something else here. TODO: make a new Runner Type?
  87. } else {
  88. throw new Error('unrecognized data source');
  89. }
  90. // DocumentSourceCursor expects a yielding Runner that has had its state saved.
  91. //runner->setYieldPolicy(Runner::YIELD_AUTO);
  92. runner.saveState();
  93. // Put the Runner into a DocumentSourceCursor and add it to the front of the pipeline.
  94. var source = new DocumentSourceCursor("", runner, pExpCtx);
  95. // Note the query, sort, and projection for explain.
  96. source.setQuery(queryObj);
  97. if (sortInRunner)
  98. source.setSort(sortObj);
  99. source.setProjection(deps.toProjection(), deps.toParsedDeps());
  100. while (sources.length && source.coalesce(sources[0])) {
  101. sources.shift();
  102. }
  103. pipeline.addInitialSource(source);
  104. return runner;
  105. var sources = pipeline.sources;
  106. // NOTE: SKIPPED: look for initial match
  107. // NOTE: SKIPPED: create a query object
  108. // Look for an initial simple project; we'll avoid constructing Values for fields that won't make it through the projection
  109. var projection = {};
  110. var dependencies;
  111. var deps = {};
  112. var status = DocumentSource.GetDepsReturn.SEE_NEXT;
  113. for (var i=0; i < sources.length && status !== DocumentSource.GetDepsReturn.EXHAUSTIVE; i++) {
  114. status = sources[i].getDependencies(deps);
  115. if(Object.keys(deps).length === 0) {
  116. status = DocumentSource.GetDepsReturn.NOT_SUPPORTED;
  117. }
  118. }
  119. if (status === DocumentSource.GetDepsReturn.EXHAUSTIVE) {
  120. projection = DocumentSource.depsToProjection(deps);
  121. dependencies = DocumentSource.parseDeps(deps);
  122. }
  123. // NOTE: SKIPPED: Look for an initial sort
  124. // NOTE: SKIPPED: Create the sort object
  125. //get the full "namespace" name
  126. // var fullName = dbName + "." + pipeline.collectionName;
  127. // NOTE: SKIPPED: if(DEV) log messages
  128. // Create the necessary context to use a Cursor
  129. // NOTE: SKIPPED: pSortedCursor bit
  130. // NOTE: SKIPPED: pUnsortedCursor bit
  131. // NOTE: Deviating from mongo here. We're passing in a source or set of documents instead of collection name in the ctx.ns field
  132. var source;
  133. if(expCtx.ns instanceof DocumentSource){
  134. source = expCtx.ns;
  135. } else {
  136. var cursorWithContext = new CursorDocumentSource.CursorWithContext(/*fullName*/);
  137. // Now add the Cursor to cursorWithContext
  138. cursorWithContext._cursor = new Cursor( expCtx.ns ); //NOTE: collectionName will likely be an array of documents in munge
  139. // wrap the cursor with a DocumentSource and return that
  140. source = new CursorDocumentSource( cursorWithContext, expCtx );
  141. // NOTE: SKIPPED: Note the query and sort
  142. if (Object.keys(projection).length) source.setProjection(projection, dependencies);
  143. while(sources.length > 0 && source.coalesce(sources[0])) { //Note: Attempting to coalesce into the cursor source
  144. sources.shift();
  145. }
  146. }
  147. pipeline.addInitialSource(source);
  148. };