|
|
@@ -14,7 +14,9 @@ var PipelineD = module.exports = function PipelineD(){
|
|
|
// DEPENDENCIES
|
|
|
var DocumentSource = require('./documentSources/DocumentSource'),
|
|
|
CursorDocumentSource = require('./documentSources/CursorDocumentSource'),
|
|
|
- Cursor = require('../Cursor');
|
|
|
+ SortDocumentSource = require('./documentSources/SortDocumentSource'),
|
|
|
+ MatchDocumentSource = require('./documentSources/MatchDocumentSource'),
|
|
|
+ Cursor = require('../query/ArrayRunner');
|
|
|
|
|
|
/**
|
|
|
* Create a Cursor wrapped in a DocumentSourceCursor, which is suitable to be the first source for a pipeline to begin with.
|
|
|
@@ -35,6 +37,107 @@ var DocumentSource = require('./documentSources/DocumentSource'),
|
|
|
**/
|
|
|
klass.prepareCursorSource = function prepareCursorSource(pipeline, expCtx){
|
|
|
|
|
|
+ // get the full "namespace" name
|
|
|
+ var data = expCtx.ns; //NOTE: ns will likely be either an array of documents or a document source in munge
|
|
|
+
|
|
|
+ // We will be modifying the source vector as we go
|
|
|
+ var sources = pipeline.sources;
|
|
|
+
|
|
|
+ // Inject a MongodImplementation to sources that need them.
|
|
|
+ // NOTE: SKIPPED
|
|
|
+
|
|
|
+ // Don't modify the pipeline if we got a DocumentSourceMergeCursor
|
|
|
+ // NOTE: SKIPPED
|
|
|
+
|
|
|
+
|
|
|
+ // Look for an initial match. This works whether we got an initial query or not.
|
|
|
+ // If not, it results in a "{}" query, which will be what we want in that case.
|
|
|
+ var queryObj = pipeline.getInitialQuery();
|
|
|
+ if (queryObj && queryObj instanceof Object && Object.keys(queryObj).length) {
|
|
|
+ // This will get built in to the Cursor we'll create, so
|
|
|
+ // remove the match from the pipeline
|
|
|
+ // NOTE: SKIPPED
|
|
|
+ }
|
|
|
+
|
|
|
+ // Find the set of fields in the source documents depended on by this pipeline.
|
|
|
+ var deps = pipeline.getDependencies(queryObj);
|
|
|
+
|
|
|
+ // Passing query an empty projection since it is faster to use ParsedDeps::extractFields().
|
|
|
+ // This will need to change to support covering indexes (SERVER-12015). There is an
|
|
|
+ // exception for textScore since that can only be retrieved by a query projection.
|
|
|
+ var projectionForQuery = deps.needTextScore ? deps.toProjection() : {};
|
|
|
+
|
|
|
+ /*
|
|
|
+ Look for an initial sort; we'll try to add this to the
|
|
|
+ Cursor we create. If we're successful in doing that (further down),
|
|
|
+ we'll remove the $sort from the pipeline, because the documents
|
|
|
+ will already come sorted in the specified order as a result of the
|
|
|
+ index scan.
|
|
|
+ */
|
|
|
+ var sortStorage,
|
|
|
+ sortObj,
|
|
|
+ sortInRunner = false;
|
|
|
+ if (sources.length) {
|
|
|
+ sortStage = sources[0] instanceof SortDocumentSource ? sources[0] : undefined;
|
|
|
+
|
|
|
+ //need to check the next source since we are not deleting the initial match in munge
|
|
|
+ if (!sortStorage && sources[0] instanceof MatchDocumentSource){
|
|
|
+ sortStage = sources[1] instanceof SortDocumentSource ? sources[1] : undefined;
|
|
|
+ }
|
|
|
+ if (sortStage) {
|
|
|
+ // build the sort key
|
|
|
+ sortObj = sortStage.serializeSortKey(/*explain*/false);
|
|
|
+ sortInRunner = true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Create the Runner.
|
|
|
+ // NOTE: the logic here is munge specific
|
|
|
+ var runner;
|
|
|
+ if (data.constructor === Array) {
|
|
|
+ runner = new ArrayRunner(data);
|
|
|
+ } else if (data instanceof DocumentSource) {
|
|
|
+ //do something else here. TODO: make a new Runner Type?
|
|
|
+ } else {
|
|
|
+ throw new Error('unrecognized data source');
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ // DocumentSourceCursor expects a yielding Runner that has had its state saved.
|
|
|
+ //runner->setYieldPolicy(Runner::YIELD_AUTO);
|
|
|
+ runner.saveState();
|
|
|
+
|
|
|
+ // Put the Runner into a DocumentSourceCursor and add it to the front of the pipeline.
|
|
|
+ var source = new DocumentSourceCursor("", runner, pExpCtx);
|
|
|
+
|
|
|
+ // Note the query, sort, and projection for explain.
|
|
|
+ source.setQuery(queryObj);
|
|
|
+ if (sortInRunner)
|
|
|
+ source.setSort(sortObj);
|
|
|
+
|
|
|
+ source.setProjection(deps.toProjection(), deps.toParsedDeps());
|
|
|
+
|
|
|
+ while (sources.length && source.coalesce(sources[0])) {
|
|
|
+ sources.shift();
|
|
|
+ }
|
|
|
+
|
|
|
+ pipeline.addInitialSource(source);
|
|
|
+
|
|
|
+ return runner;
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
var sources = pipeline.sources;
|
|
|
|
|
|
// NOTE: SKIPPED: look for initial match
|