Browse Source

Merge branch 'feature/mongo_2.6.5_documentSource_PipelineD' into feature/mongo_2.6.5_documentSource_Pipeline

Jason Walton 11 years ago
parent
commit
91d638eba3
2 changed files with 196 additions and 1 deletions
  1. 104 1
      lib/pipeline/PipelineD.js
  2. 92 0
      lib/query/DocumentSourceRunner.js

+ 104 - 1
lib/pipeline/PipelineD.js

@@ -14,7 +14,9 @@ var PipelineD = module.exports = function PipelineD(){
 // DEPENDENCIES
 var DocumentSource = require('./documentSources/DocumentSource'),
 	CursorDocumentSource = require('./documentSources/CursorDocumentSource'),
-	Cursor = require('../Cursor');
+	SortDocumentSource = require('./documentSources/SortDocumentSource'),
+	MatchDocumentSource = require('./documentSources/MatchDocumentSource'),
+	Cursor = require('../query/ArrayRunner');
 
 /**
  * Create a Cursor wrapped in a DocumentSourceCursor, which is suitable to be the first source for a pipeline to begin with.
@@ -35,6 +37,107 @@ var DocumentSource = require('./documentSources/DocumentSource'),
 **/
 klass.prepareCursorSource = function prepareCursorSource(pipeline, expCtx){
 
+	// get the full "namespace" name
+	var data = expCtx.ns; //NOTE: ns will likely be either an array of documents or a document source in munge
+
+	// We will be modifying the source vector as we go
+	var sources = pipeline.sources;
+
+	// Inject a MongodImplementation to sources that need them.
+	// NOTE: SKIPPED
+
+	// Don't modify the pipeline if we got a DocumentSourceMergeCursor
+	// NOTE: SKIPPED
+
+
+	// Look for an initial match. This works whether we got an initial query or not.
+	// If not, it results in a "{}" query, which will be what we want in that case.
+	var queryObj = pipeline.getInitialQuery();
+	if (queryObj && queryObj instanceof Object && Object.keys(queryObj).length) {
+		// This will get built in to the Cursor we'll create, so
+		// remove the match from the pipeline
+		// NOTE: SKIPPED
+	}
+
+	// Find the set of fields in the source documents depended on by this pipeline.
+	var deps = pipeline.getDependencies(queryObj);
+
+	// Passing query an empty projection since it is faster to use ParsedDeps::extractFields().
+	// This will need to change to support covering indexes (SERVER-12015). There is an
+	// exception for textScore since that can only be retrieved by a query projection.
+	var projectionForQuery = deps.needTextScore ? deps.toProjection() : {};
+
+	/*
+	  Look for an initial sort; we'll try to add this to the
+	  Cursor we create.  If we're successful in doing that (further down),
+	  we'll remove the $sort from the pipeline, because the documents
+	  will already come sorted in the specified order as a result of the
+	  index scan.
+	*/
+	var sortStorage,
+		sortObj,
+		sortInRunner = false;
+	if (sources.length) {
+		sortStage = sources[0] instanceof SortDocumentSource ? sources[0] : undefined;
+		
+		//need to check the next source since we are not deleting the initial match in munge
+		if (!sortStorage && sources[0] instanceof MatchDocumentSource){
+			sortStage = sources[1] instanceof SortDocumentSource ? sources[1] : undefined;
+		}
+		if (sortStage) {
+			// build the sort key
+			sortObj = sortStage.serializeSortKey(/*explain*/false);
+			sortInRunner = true;
+		}
+	}
+
+	// Create the Runner.
+	// NOTE: the logic here is munge specific
+	var runner;
+	if (data.constructor === Array) {
+		runner = new ArrayRunner(data);
+	} else if (data instanceof DocumentSource) {
+		//do something else here.  TODO: make a new Runner Type?
+	} else {
+		throw new Error('unrecognized data source');
+	}
+
+
+	// DocumentSourceCursor expects a yielding Runner that has had its state saved.
+	//runner->setYieldPolicy(Runner::YIELD_AUTO);
+	runner.saveState();
+
+	// Put the Runner into a DocumentSourceCursor and add it to the front of the pipeline.
+	var source = new DocumentSourceCursor("", runner, pExpCtx);
+
+	// Note the query, sort, and projection for explain.
+	source.setQuery(queryObj);
+	if (sortInRunner)
+		source.setSort(sortObj);
+
+	source.setProjection(deps.toProjection(), deps.toParsedDeps());
+
+	while (sources.length && source.coalesce(sources[0])) {
+		sources.shift();
+	}
+
+	pipeline.addInitialSource(source);
+
+	return runner;
+
+
+
+
+
+
+
+
+
+
+
+
+
+
 	var sources = pipeline.sources;
 
 	// NOTE: SKIPPED: look for initial match

+ 92 - 0
lib/query/DocumentSourceRunner.js

@@ -0,0 +1,92 @@
+"use strict";
+
+var Runner = require('./Runner'),
+	DocumentSource = require('../pipeline/documentSources/DocumentSource');
+
+/**
+ * This class is a runner used to wrap a document source
+ * @param	{Array}	items	The array source of the data
+ **/
+var klass = module.exports = function DocumentSourceRunner(docSrc, pipeline){
+	base.call(this);
+
+	if (!docSrc || !(docSrc instanceof DocumentSource) ) throw new Error('DocumentSource runner requires a DocumentSource');
+	if (pipeline && pipeline.constructor != Array ) throw new Error('DocumentSource runner requires pipeline to be an Array');
+	
+	this._docSrc = docSrc;
+	this._pipeline = pipeline || [];
+	
+	while (this._pipeline.length && this._docSrc.coalesce(this._pipeline[0])) {
+		this._pipeline.shift();
+	}
+	
+	this._state = Runner.RunnerState.RUNNER_ADVANCED;
+}, base = Runner, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
+
+/**
+ * Get the next result from the array.
+ * 
+ * @method getNext
+ * @param [callback] {Function}
+ */
+proto.getNext = function getNext(callback) {
+	if (this._state === Runner.RunnerState.RUNNER_ADVANCED) {
+		return this._docSrc.getNext(function (err, obj){
+			if (err){
+				this._state = Runner.RunnerState.RUNNER_ERROR;
+			}
+			if (obj === null){
+				this._state = Runner.RunnerState.RUNNER_EOF;
+			}
+			return callback(err, obj, this._state);
+		});
+	}
+	return callback(null, null, this._state);
+};
+
+/**
+ * Save any state required to yield.
+ * 
+ * @method saveState
+ */
+proto.saveState = function saveState() {
+	//nothing to do here
+};
+
+/**
+ * Restore saved state, possibly after a yield.  Return true if the runner is OK, false if
+ * it was killed.
+ * 
+ * @method restoreState
+ */
+proto.restoreState = function restoreState() {
+	//nothing to do here
+};
+
+/**
+ * Returns a description of the Runner
+ * 
+ * @method getInfo
+ * @param [explain]
+ * @param [planInfo]
+ */
+proto.getInfo = function getInfo(explain) {
+	if (explain){
+		return {
+			type: this.constructor.name,
+			docSrc: this._docSrc.serialize(explain),
+			state: this._state
+		};
+	}
+	return undefined;
+};
+
+/**
+ * dispose of the Runner.
+ * 
+ * @method reset
+ */
+proto.reset = function reset(){
+	this._docSrc.dispose();
+	this._state = Runner.RunnerState.RUNNER_DEAD;
+};