| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114 |
- "use strict";
- var CursorDocumentSource = require("./documentSources/CursorDocumentSource"),
- SortDocumentSource = require("./documentSources/SortDocumentSource"),
- getRunner = require("../query").getRunner;
- /**
- * Pipeline helper for reading data
- * @class PipelineD
- * @namespace mungedb-aggregate.pipeline
- * @module mungedb-aggregate
- * @constructor
- **/
- var PipelineD = module.exports = function PipelineD(){
- throw new Error("Never create instances of this! Use the static helpers only.");
- }, klass = PipelineD;
- /**
- * Create a Cursor wrapped in a DocumentSourceCursor, which is suitable to be the first source for a pipeline to begin with.
- * This source will feed the execution of the pipeline.
- *
- * //NOTE: Not doing anything here, as we don't use any of these cursor source features
- * //NOTE: DEVIATION FROM THE MONGO: we don't have special optimized cursors but could do it if needed
- *
- * This method looks for early pipeline stages that can be folded into
- * the underlying cursor, and when a cursor can absorb those, they
- * are removed from the head of the pipeline. For example, an
- * early match can be removed and replaced with a Cursor that will
- * do an index scan.
- *
- * @param pipeline {Pipeline} the logical "this" for this operation
- * @param ctx {Object} Context for expressions
- * @returns {CursorDocumentSource} the cursor that was created
- **/
- klass.prepareCursorSource = function prepareCursorSource(pipeline, expCtx){
- // We will be modifying the source vector as we go
- var sources = pipeline.sources;
- // Inject a MongodImplementation to sources that need them.
- // NOTE: SKIPPED
- // Don't modify the pipeline if we got a DocumentSourceMergeCursor
- // NOTE: SKIPPED
- // Look for an initial match. This works whether we got an initial query or not.
- // If not, it results in a "{}" query, which will be what we want in that case.
- var queryObj = pipeline.getInitialQuery(),
- match;
- if (queryObj !== undefined && queryObj !== null && queryObj.constructor === Object && Object.keys(queryObj).length > 0) {
- // This will get built in to the Cursor we'll create, so
- // remove the match from the pipeline
- match = sources.shift();
- }
- // Find the set of fields in the source documents depended on by this pipeline.
- var deps = pipeline.getDependencies(queryObj);
- // Passing query an empty projection since it is faster to use ParsedDeps::extractFields().
- // This will need to change to support covering indexes (SERVER-12015). There is an
- // exception for textScore since that can only be retrieved by a query projection.
- var projectionForQuery = deps.needTextScore ? deps.toProjection() : {};
- /*
- Look for an initial sort; we'll try to add this to the
- Cursor we create. If we're successful in doing that (further down),
- we'll remove the $sort from the pipeline, because the documents
- will already come sorted in the specified order as a result of the
- index scan.
- */
- var sortStage,
- sortObj,
- sortInRunner = false;
- if (sources.length) {
- sortStage = sources[0] instanceof SortDocumentSource ? sources[0] : undefined;
- if (sortStage) {
- // build the sort key
- sortObj = sortStage.serializeSortKey(/*explain*/false);
- sortInRunner = true;
- }
- }
- //munge deviation: the runner is (usually) not actually handling the initial query, so we need to add it back to the pipeline
- if (match){
- sources.unshift(match);
- }
- // Create the Runner.
- // NOTE: the logic here is simplified for munge
- var runner = expCtx.ns instanceof CursorDocumentSource ?
- expCtx.ns._runner : getRunner(expCtx.ns, queryObj, sortObj, projectionForQuery, sources);
- // DocumentSourceCursor expects a yielding Runner that has had its state saved.
- //runner.setYieldPolicy(Runner.RunnerState.YIELD_AUTO); //Skipped as we don't really support yielding yet
- runner.saveState();
- // Put the Runner into a DocumentSourceCursor and add it to the front of the pipeline.
- var source = expCtx.ns instanceof CursorDocumentSource ? expCtx.ns : new CursorDocumentSource("", runner, expCtx);
- // Note the query, sort, and projection for explain.
- source.setQuery(queryObj);
- if (sortInRunner)
- source.setSort(sortObj);
- source.setProjection(deps.toProjection(), deps.toParsedDeps());
- while (sources.length && source.coalesce(sources[0])) {
- sources.shift();
- }
- pipeline.addInitialSource(source);
- return runner;
- };
|