Просмотр исходного кода

Merge pull request #128 from RiveraGroup/feature/mongo_2.6.5_documentSource_PipelineD

Feature/mongo 2.6.5 document source pipeline d
Kyle P Davis 11 лет назад
Родитель
Сommit
5a3fb235a8

+ 1 - 0
lib/index.js

@@ -81,6 +81,7 @@ exports.aggregate = exports;
 //Expose these so that mungedb-aggregate can be extended.
 exports.Cursor = require("./Cursor");
 exports.pipeline = require("./pipeline/");
+exports.query = require("./query/");
 
 // version info
 exports.version = "r2.5.4";

+ 186 - 83
lib/pipeline/Pipeline.js

@@ -1,5 +1,5 @@
 "use strict";
-
+var async = require('async');
 /**
  * mongodb "commands" (sent via db.$cmd.findOne(...)) subclass to make a command.  define a singleton object for it.
  * @class Pipeline
@@ -13,7 +13,6 @@ var Pipeline = module.exports = function Pipeline(theCtx){
 	this.explain = false;
 	this.splitMongodPipeline = false;
 	this.ctx = theCtx;
-	this.SYNC_MODE = false;
 }, klass = Pipeline, base = Object, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
 
 var DocumentSource = require("./documentSources/DocumentSource"),
@@ -26,7 +25,8 @@ var DocumentSource = require("./documentSources/DocumentSource"),
 	OutDocumentSource = require('./documentSources/OutDocumentSource'),
 	GeoNearDocumentSource = require('./documentSources/GeoNearDocumentSource'),
 	RedactDocumentSource = require('./documentSources/RedactDocumentSource'),
-	SortDocumentSource = require('./documentSources/SortDocumentSource');
+	SortDocumentSource = require('./documentSources/SortDocumentSource'),
+	DepsTracker = require('./DepsTracker');
 
 klass.COMMAND_NAME = "aggregate";
 klass.PIPELINE_NAME = "pipeline";
@@ -50,13 +50,14 @@ klass.nStageDesc = Object.keys(klass.stageDesc).length;
 
 klass.optimizations = {};
 klass.optimizations.local = {};
+klass.optimizations.sharded = {};
 
 /**
  * Moves $match before $sort when they are placed next to one another
  * @static
  * @method moveMatchBeforeSort
  * @param pipelineInst An instance of a Pipeline
- **/
+ */
 klass.optimizations.local.moveMatchBeforeSort = function moveMatchBeforeSort(pipelineInst) {
 	var sources = pipelineInst.sources;
 	for(var srcn = sources.length, srci = 1; srci < srcn; ++srci) {
@@ -77,7 +78,7 @@ klass.optimizations.local.moveMatchBeforeSort = function moveMatchBeforeSort(pip
  * @static
  * @method moveLimitBeforeSkip
  * @param pipelineInst An instance of a Pipeline
- **/
+ */
 klass.optimizations.local.moveLimitBeforeSkip = function moveLimitBeforeSkip(pipelineInst) {
 	var sources = pipelineInst.sources;
 	if(sources.length === 0) return;
@@ -110,7 +111,7 @@ klass.optimizations.local.moveLimitBeforeSkip = function moveLimitBeforeSkip(pip
  * @static
  * @method coalesceAdjacent
  * @param pipelineInst An instance of a Pipeline
- **/
+ */
 klass.optimizations.local.coalesceAdjacent = function coalesceAdjacent(pipelineInst) {
 	var sources = pipelineInst.sources;
 	if(sources.length === 0) return;
@@ -147,7 +148,7 @@ klass.optimizations.local.coalesceAdjacent = function coalesceAdjacent(pipelineI
  * @static
  * @method optimizeEachDocumentSource
  * @param pipelineInst An instance of a Pipeline
- **/
+ */
 klass.optimizations.local.optimizeEachDocumentSource = function optimizeEachDocumentSource(pipelineInst) {
 	var sources = pipelineInst.sources;
 	for(var srci = 0, srcn = sources.length; srci < srcn; ++srci) {
@@ -160,7 +161,7 @@ klass.optimizations.local.optimizeEachDocumentSource = function optimizeEachDocu
  * @static
  * @method duplicateMatchBeforeInitalRedact
  * @param pipelineInst An instance of a Pipeline
- **/
+ */
 klass.optimizations.local.duplicateMatchBeforeInitalRedact = function duplicateMatchBeforeInitalRedact(pipelineInst) {
 	var sources = pipelineInst.sources;
 	if(sources.length >= 2 && sources[0].constructor === RedactDocumentSource) {
@@ -174,6 +175,106 @@ klass.optimizations.local.duplicateMatchBeforeInitalRedact = function duplicateM
 	}
 };
 
+//SKIPPED: addRequiredPrivileges
+
+/**
+ * Perform optimizations for a pipeline through sharding
+ * @method splitForSharded
+ */
+proto.splitForSharded = function splitForSharded() {
+	var shardPipeline = new Pipeline({});
+	shardPipeline.explain = this.explain;
+
+	klass.optimizations.sharded.findSplitPoint(shardPipeline, this);
+	klass.optimizations.sharded.moveFinalUnwindFromShardsToMerger(shardPipeline, this);
+	//klass.optimizations.sharded.limitFieldsSentFromShardsToMerger(shardPipeline, this);
+	return shardPipeline;
+};
+
+/**
+ * Split the source into Merge sources and Shard sources
+ * @static
+ * @method findSplitPoint
+ * @param shardPipe Shard sources
+ * @param mergePipe Merge sources
+ */
+klass.optimizations.sharded.findSplitPoint = function findSplitPoint(shardPipe, mergePipe) {
+	while(mergePipe.sources.length > 0) {
+		var current = mergePipe.sources[0];
+		mergePipe.sources.splice(0, 1);
+
+		if (current.isSplittable && current.isSplittable()) {
+			var shardSource = current.getShardSource(),
+				mergeSource = current.getMergeSource();
+			if (typeof shardSource != "undefined") { shardPipe.sources.push(shardSource); }		//push_back
+			if (typeof mergeSource != "undefined") { mergePipe.sources.unshift(mergeSource); }	//push_front
+			break;
+		}
+		else {
+			debugger
+			if (!shardPipe.sources) { shardPipe.sources = []; }
+			shardPipe.sources.push(current);
+		}
+	}
+};
+
+/**
+ * Optimize pipeline through moving unwind to the end
+ * @static
+ * @method moveFinalUnwindFromShardsToMerger
+ * @param shardPipe shard sources
+ * @param mergePipe merge sources
+ */
+klass.optimizations.sharded.moveFinalUnwindFromShardsToMerger = function moveFinalUnwindFromShardsToMerger(shardPipe, mergePipe) {
+	while((shardPipe.sources != null) && (!shardPipe.sources.length > 0 
+			&& shardPipe.sources[length-1].constructor === UnwindDocumentSource)) {
+		mergePipe.sources.unshift(shardPipe.sources[length-1]);
+		shardPipe.sources.pop();
+	}
+};
+
+//SKIPPED: optimizations.sharded.limitFieldsSentFromShardsToMerger.  Somehow what this produces is not handled by Expression.js (err 16404)
+/**
+ * Optimize pipeline by adding $project stage if shard fields are not exhaustive
+ * @static
+ * @method limitFieldsSentFromShardsToMerger
+ * @param shardPipe shard sources
+ * @param mergePipe merge sources
+ */
+// klass.optimizations.sharded.limitFieldsSentFromShardsToMerger = function limitFieldsSentFromShardsToMerger(shardPipe, mergePipe) {
+// 	var mergeDeps = mergePipe.getDependencies(shardPipe.getInitialQuery());
+// 	if (mergeDeps.needWholeDocument) {
+// 		return;
+// 	}
+// 	if (mergeDeps.fields == null) {
+// 		mergeDeps.fields = {};
+// 	}
+// 	if (mergeDeps.fields.length == 0) {
+// 		mergeDeps.fields["_id"] = 0;
+// 	}
+// 	if (shardPipe.sources == null) {
+// 		shardPipe.sources = {};
+// 	}
+// 	//NOTE: Deviation from Mongo: not setting mergeDeps.needTextScore because we aren't handling that (Document meta stuff)
+
+//     // HEURISTIC: only apply optimization if none of the shard stages have an exhaustive list of
+//     // field dependencies. While this may not be 100% ideal in all cases, it is simple and
+//     // avoids the worst cases by ensuring that:
+//     // 1) Optimization IS applied when the shards wouldn't have known their exhaustive list of
+//     //    dependencies. This situation can happen when a $sort is before the first $project or
+//     //    $group. Without the optimization, the shards would have to reify and transmit full
+//     //    objects even though only a subset of fields are needed.
+//     // 2) Optimization IS NOT applied immediately following a $project or $group since it would
+//     //    add an unnecessary project (and therefore a deep-copy).
+//     for (var i = 0; i < shardPipe.sources.length; i++) {
+//         if (shardPipe.sources.getDependencies() & DocumentSource.GetDepsReturn.EXHAUSTIVE_FIELDS)
+//             return;
+//     }
+
+//     // if we get here, add the project.
+//     shardPipe.sources.push(ProjectDocumentSource.createFromJson({$project: mergeDeps.toProjection()[0]}, shardPipe.ctx));
+// };
+
 /**
  * Create an `Array` of `DocumentSource`s from the given JSON pipeline
  * // NOTE: DEVIATION FROM MONGO: split out into a separate function to better allow extensions (was in parseCommand)
@@ -181,7 +282,7 @@ klass.optimizations.local.duplicateMatchBeforeInitalRedact = function duplicateM
  * @method parseDocumentSources
  * @param pipeline  {Array}  The JSON pipeline
  * @returns {Array}  The parsed `DocumentSource`s
- **/
+ */
 klass.parseDocumentSources = function parseDocumentSources(pipeline, ctx){
 	var sources = [];
 	for (var nSteps = pipeline.length, iStep = 0; iStep < nSteps; ++iStep) {
@@ -197,7 +298,7 @@ klass.parseDocumentSources = function parseDocumentSources(pipeline, ctx){
 
 		// Create a DocumentSource pipeline stage from 'stageSpec'.
 		var desc = klass.stageDesc[stageName];
-		if (!desc) throw new Error("Unrecognized pipeline stage name: '" + stageName + "'; code 16435");
+		if (!desc) throw new Error("Unrecognized pipeline stage name: '" + stageName + "'; uassert code 16436");
 
 		// Parse the stage
 		var stage = desc(stageSpec, ctx);
@@ -205,7 +306,7 @@ klass.parseDocumentSources = function parseDocumentSources(pipeline, ctx){
 		sources.push(stage);
 
 		if(stage.constructor === OutDocumentSource && iStep !== nSteps - 1) {
-			throw new Error("$out can only be the final stage in the pipeline; code 16435");
+			throw new Error("$out can only be the final stage in the pipeline; code 16991");
 		}
 	}
 	return sources;
@@ -223,7 +324,7 @@ klass.parseDocumentSources = function parseDocumentSources(pipeline, ctx){
  * @param   cmdObj.splitMongodPipeline	{Boolean}  should split?
  * @param ctx     {Object}  Not used yet in mungedb-aggregate
  * @returns	{Array}	the pipeline, if created, otherwise a NULL reference
- **/
+ */
 klass.parseCommand = function parseCommand(cmdObj, ctx){
 	var pipelineNamespace = require("./"),
 		Pipeline = pipelineNamespace.Pipeline,	// using require in case Pipeline gets replaced with an extension
@@ -248,10 +349,9 @@ klass.parseCommand = function parseCommand(cmdObj, ctx){
 	/**
 	 * If we get here, we've harvested the fields we expect for a pipeline
 	 * Set up the specified document source pipeline.
-	 **/
+	 */
 	// NOTE: DEVIATION FROM MONGO: split this into a separate function to simplify and better allow for extensions (now in parseDocumentSources)
-	var sources = pipelineInst.sources = Pipeline.parseDocumentSources(pipeline, ctx);
-
+	pipelineInst.sources = Pipeline.parseDocumentSources(pipeline, ctx);
 	klass.optimizations.local.moveMatchBeforeSort(pipelineInst);
 	klass.optimizations.local.moveLimitBeforeSkip(pipelineInst);
 	klass.optimizations.local.coalesceAdjacent(pipelineInst);
@@ -261,12 +361,6 @@ klass.parseCommand = function parseCommand(cmdObj, ctx){
 	return pipelineInst;
 };
 
-// sync callback for Pipeline#run if omitted
-klass.SYNC_CALLBACK = function(err, results){
-	if (err) throw err;
-	return results.result;
-};
-
 function ifError(err) {
 	if (err) throw err;
 }
@@ -277,7 +371,7 @@ function ifError(err) {
  * @param	inputSource		{DocumentSource}	The input document source for the pipeline
  * @param	[callback]		{Function}			Optional callback function if using async extensions
  * @return {Object}	An empty object or the match spec
-**/
+ */
 proto.getInitialQuery = function getInitialQuery() {
 	var sources = this.sources;
 	if(sources.length === 0) {
@@ -297,15 +391,18 @@ proto.getInitialQuery = function getInitialQuery() {
  * @param	inputSource		{DocumentSource}	The input document source for the pipeline
  * @param	[callback]		{Function}			Optional callback function if using async extensions
  * @return {Object}	An empty object or the match spec
-**/
+ */
 proto.serialize = function serialize() {
 	var serialized = {},
 		array = [];
 
 	// create an array out of the pipeline operations
-	this.sources.forEach(function(source) {
-		source.serializeToArray(array);
-	});
+	if (this.sources) {
+		for (var i = 0; i < this.sources.length; i++) {
+		//this.sources.forEach(function(source) {
+			this.sources[i].serializeToArray(array);
+		}
+	}
 
 	serialized[klass.COMMAND_NAME] = this.ctx && this.ctx.ns && this.ctx.ns.coll ? this.ctx.ns.coll : '';
 	serialized[klass.PIPELINE_NAME] = array;
@@ -318,7 +415,7 @@ proto.serialize = function serialize() {
 /**
  * Points each source at its previous source
  * @method stitch
-**/
+ */
 proto.stitch = function stitch() {
 	if(this.sources.length <= 0) throw new Error("should not have an empty pipeline; massert code 16600");
 
@@ -334,20 +431,31 @@ proto.stitch = function stitch() {
 /**
  * Run the pipeline
  * @method run
- * @param callback {Function} Optional. Run the pipeline in async mode; callback(err, result)
- * @return result {Object} The result of executing the pipeline
-**/
+ * @param callback {Function} gets called once for each document result from the pipeline
+ */
 proto.run = function run(callback) {
 	// should not get here in the explain case
 	if(this.explain) throw new Error("Should not be running a pipeline in explain mode!");
-
-	/* NOTE: DEVIATION FROM MONGO SOURCE. WE'RE SUPPORTING SYNC AND ASYNC */
-	if(this.SYNC_MODE) {
-		callback();
-		return this._runSync();
-	} else {
-		return this._runAsync(callback);
-	}
+	
+	var doc = null,
+		error = null,
+		finalSource = this._getFinalSource();
+	
+	async.doWhilst(
+		function iterator(next){
+			return finalSource.getNext(function (err, obj){
+				callback(err, obj);
+				doc = obj;
+				error = err;
+				next();
+			});
+		},
+		function test(){
+			return doc !== null && !error;
+		},
+		function done(err){
+			//nothing to do here
+		});
 };
 
 /**
@@ -355,58 +463,16 @@ proto.run = function run(callback) {
  * @method _getFinalSource
  * @return {Object}		The DocumentSource at the end of the pipeline
  * @private
-**/
+ */
 proto._getFinalSource = function _getFinalSource() {
 	return this.sources[this.sources.length - 1];
 };
 
-/**
- * Run the pipeline synchronously
- * @method _runSync
- * @return {Object}		The results object {result:resultArray}
- * @private
-**/
-proto._runSync = function _runSync(callback) {
-	var resultArray = [],
-		finalSource = this._getFinalSource(),
-		handleErr = function(err) {
-			if(err) throw err;
-		},
-		next;
-	while((next = finalSource.getNext(handleErr)) !== DocumentSource.EOF) {
-		resultArray.push(next);
-	}
-	return {result:resultArray};
-};
-
-/**
- * Run the pipeline asynchronously
- * @method _runAsync
- * @param callback {Function} callback(err, resultObject)
- * @private
-**/
-proto._runAsync = function _runAsync(callback) {
-	var resultArray = [],
-		finalSource = this._getFinalSource(),
-		gotNext = function(err, doc) {
-			if(err) return callback(err);
-			if(doc !== DocumentSource.EOF) {
-				resultArray.push(doc);
-				return setImmediate(function() { //setImmediate to avoid callstack size issues
-					finalSource.getNext(gotNext);
-				});
-			} else {
-				return callback(null, {result:resultArray});
-			}
-		};
-	finalSource.getNext(gotNext);
-};
-
 /**
  * Get the pipeline explanation
  * @method writeExplainOps
  * @return {Array}	An array of source explanations
-**/
+ */
 proto.writeExplainOps = function writeExplainOps() {
 	var array = [];
 	this.sources.forEach(function(source) {
@@ -419,7 +485,44 @@ proto.writeExplainOps = function writeExplainOps() {
  * Set the source of documents for the pipeline
  * @method addInitialSource
  * @param source {DocumentSource}
-**/
+ */
 proto.addInitialSource = function addInitialSource(source) {
 	this.sources.unshift(source);
 };
+
+//SKIPPED: canRunInMongos
+
+//Note: Deviation from Mongo: Mongo 2.6.5 passes a param to getDependencies
+//	to calculate TextScore.  mungedb-aggregate doesn't do this, so no param is needed.
+proto.getDependencies = function getDependencies () {
+    var deps = new DepsTracker(), 
+		knowAllFields = false;
+
+    //NOTE: Deviation from Mongo -- We aren't using Meta and textscore
+    for (var i=0; i < this.sources.length && !knowAllFields; i++) {
+        var localDeps = new DepsTracker(),
+        	status = this.sources[i].getDependencies(localDeps);
+
+        if (status === DocumentSource.GetDepsReturn.NOT_SUPPORTED) {
+            // Assume this stage needs everything. We may still know something about our
+            // dependencies if an earlier stage returned either EXHAUSTIVE_FIELDS or
+            // EXHAUSTIVE_META.
+            break;
+        }
+
+        if (!knowAllFields) {
+            for (var key in localDeps.fields)
+            	deps.fields[key] = localDeps.fields[key];
+
+            if (localDeps.needWholeDocument)
+                deps.needWholeDocument = true;
+            knowAllFields = status & DocumentSource.GetDepsReturn.EXHAUSTIVE_FIELDS;
+        }
+
+    }
+
+    if (!knowAllFields)
+        deps.needWholeDocument = true; // don't know all fields we need
+
+    return deps;
+};

+ 65 - 42
lib/pipeline/PipelineD.js

@@ -14,7 +14,9 @@ var PipelineD = module.exports = function PipelineD(){
 // DEPENDENCIES
 var DocumentSource = require('./documentSources/DocumentSource'),
 	CursorDocumentSource = require('./documentSources/CursorDocumentSource'),
-	Cursor = require('../Cursor');
+	SortDocumentSource = require('./documentSources/SortDocumentSource'),
+	MatchDocumentSource = require('./documentSources/MatchDocumentSource'),
+	getRunner = require('../query').getRunner;
 
 /**
  * Create a Cursor wrapped in a DocumentSourceCursor, which is suitable to be the first source for a pipeline to begin with.
@@ -35,60 +37,81 @@ var DocumentSource = require('./documentSources/DocumentSource'),
 **/
 klass.prepareCursorSource = function prepareCursorSource(pipeline, expCtx){
 
+	// We will be modifying the source vector as we go
 	var sources = pipeline.sources;
 
-	// NOTE: SKIPPED: look for initial match
-	// NOTE: SKIPPED: create a query object
-
-	// Look for an initial simple project; we'll avoid constructing Values for fields that won't make it through the projection
-	var projection = {};
-	var dependencies;
-	var deps = {};
-	var status = DocumentSource.GetDepsReturn.SEE_NEXT;
-	for (var i=0; i < sources.length && status !== DocumentSource.GetDepsReturn.EXHAUSTIVE; i++) {
-		status = sources[i].getDependencies(deps);
-		if(Object.keys(deps).length === 0) {
-			status = DocumentSource.GetDepsReturn.NOT_SUPPORTED;
-		}
-	}
-	if (status === DocumentSource.GetDepsReturn.EXHAUSTIVE) {
-		projection = DocumentSource.depsToProjection(deps);
-		dependencies = DocumentSource.parseDeps(deps);
-	}
+	// Inject a MongodImplementation to sources that need them.
+	// NOTE: SKIPPED
 
-	// NOTE: SKIPPED: Look for an initial sort
-	// NOTE: SKIPPED: Create the sort object
+	// Don't modify the pipeline if we got a DocumentSourceMergeCursor
+	// NOTE: SKIPPED
 
-	//get the full "namespace" name
-	// var fullName = dbName + "." + pipeline.collectionName;
 
-	// NOTE: SKIPPED: if(DEV) log messages
+	// Look for an initial match. This works whether we got an initial query or not.
+	// If not, it results in a "{}" query, which will be what we want in that case.
+	var queryObj = pipeline.getInitialQuery(),
+		match;
+	if (queryObj && queryObj instanceof Object && Object.keys(queryObj).length) {
+		// This will get built in to the Cursor we'll create, so
+		// remove the match from the pipeline
+		match = sources.shift();
+	}
 
-	// Create the necessary context to use a Cursor
-	// NOTE: SKIPPED: pSortedCursor bit
-	// NOTE: SKIPPED: pUnsortedCursor bit
+	// Find the set of fields in the source documents depended on by this pipeline.
+	var deps = pipeline.getDependencies(queryObj);
+
+	// Passing query an empty projection since it is faster to use ParsedDeps::extractFields().
+	// This will need to change to support covering indexes (SERVER-12015). There is an
+	// exception for textScore since that can only be retrieved by a query projection.
+	var projectionForQuery = deps.needTextScore ? deps.toProjection() : {};
+
+	/*
+	Look for an initial sort; we'll try to add this to the
+	Cursor we create.  If we're successful in doing that (further down),
+	we'll remove the $sort from the pipeline, because the documents
+	will already come sorted in the specified order as a result of the
+	index scan.
+	*/
+	var sortStage,
+		sortObj,
+		sortInRunner = false;
+	if (sources.length) {
+		sortStage = sources[0] instanceof SortDocumentSource ? sources[0] : undefined;
+		if (sortStage) {
+			// build the sort key
+			sortObj = sortStage.serializeSortKey(/*explain*/false);
+			sortInRunner = true;
+		}
+	}
+	
+	//munge deviation: the runner is (usually) not actually handling the initial query, so we need to add it back to the pipeline
+	if (match){
+		sources.unshift(match);
+	}
 
-	// NOTE: Deviating from mongo here. We're passing in a source or set of documents instead of collection name in the ctx.ns field
-	var source;
-	if(expCtx.ns instanceof DocumentSource){
-		source = expCtx.ns;
-	} else {
-		var cursorWithContext = new CursorDocumentSource.CursorWithContext(/*fullName*/);
+	// Create the Runner.
+	// NOTE: the logic here is simplified for munge
+	var runner = getRunner(expCtx.ns, queryObj, sortObj, projectionForQuery, sources);
 
-		// Now add the Cursor to cursorWithContext
-		cursorWithContext._cursor = new Cursor( expCtx.ns );	//NOTE: collectionName will likely be an array of documents in munge
+	// DocumentSourceCursor expects a yielding Runner that has had its state saved.
+	//runner.setYieldPolicy(Runner.RunnerState.YIELD_AUTO); //Skipped as we don't really support yielding yet
+	runner.saveState();
 
-		// wrap the cursor with a DocumentSource and return that
-		source = new CursorDocumentSource( cursorWithContext, expCtx );
+	// Put the Runner into a DocumentSourceCursor and add it to the front of the pipeline.
+	var source = new CursorDocumentSource("", runner, expCtx);
 
-		// NOTE: SKIPPED: Note the query and sort
+	// Note the query, sort, and projection for explain.
+	source.setQuery(queryObj);
+	if (sortInRunner)
+		source.setSort(sortObj);
 
-		if (Object.keys(projection).length) source.setProjection(projection, dependencies);
+	source.setProjection(deps.toProjection(), deps.toParsedDeps());
 
-		while(sources.length > 0 && source.coalesce(sources[0])) { //Note: Attempting to coalesce into the cursor source
-			sources.shift();
-		}
+	while (sources.length && source.coalesce(sources[0])) {
+		sources.shift();
 	}
 
 	pipeline.addInitialSource(source);
+
+	return runner;
 };

+ 5 - 5
lib/pipeline/documentSources/DocumentSource.js

@@ -152,11 +152,11 @@ proto.optimize = function optimize() {
 };
 
 klass.GetDepsReturn = {
-	NOT_SUPPORTED: "NOT_SUPPORTED", // This means the set should be ignored
-	SEE_NEXT: "SEE_NEXT", // Add the next Source's deps to the set
-	EXHAUSTIVE_FIELDS:"EXHAUSTIVE_FIELDS", // Later stages won"t need more fields from input
-	EXHAUSTIVE_META: "EXHAUSTIVE_META", // Later stages won"t need more metadata from input
-	EXHAUSTIVE_ALL: "EXHAUSTIVE_ALL" // Later stages won"t need either NOTE: This is an | of FIELDS and META in mongo C++
+	NOT_SUPPORTED: 0x0, // The full object and all metadata may be required
+	SEE_NEXT: 0x1, // Later stages could need either fields or metadata
+	EXHAUSTIVE_FIELDS: 0x2, // Later stages won't need more fields from input
+	EXHAUSTIVE_META: 0x4, // Later stages won't need more metadata from input
+	EXHAUSTIVE_ALL: 0x6 // Later stages won't need either
 };
 
 /**

+ 1 - 1
lib/pipeline/documentSources/GroupDocumentSource.js

@@ -370,7 +370,7 @@ proto.getDependencies = function getDependencies(deps) {
 		self.expressions[i].addDependencies(deps);
 	});
 
-	return DocumentSource.GetDepsReturn.EXHAUSTIVE;
+	return DocumentSource.GetDepsReturn.EXHAUSTIVE_ALL;
 };
 
 /**

+ 1 - 1
lib/pipeline/documentSources/LimitDocumentSource.js

@@ -78,7 +78,7 @@ element named $limit.
 klass.createFromJson = function createFromJson(jsonElement, ctx) {
 	if (typeof jsonElement !== "number") throw new Error("code 15957; the limit must be specified as a number");
 	var limit = jsonElement;
-	return this.create(ctx, limit);
+	return klass.create(ctx, limit);
 };
 
 klass.create = function create(ctx, limit){

+ 11 - 1
lib/pipeline/documentSources/OutDocumentSource.js

@@ -48,12 +48,22 @@ klass.createFromJson = function(jsonElement, ctx) {
 };
 
 // SplittableDocumentSource implementation.
+// Mongo doesn't fully implement SplittableDocumentSource on DocumentSourceOut.
+//	It doesn't implement getShardSource or getMergeSource
 klass.isSplittableDocumentSource = true;
 
+proto.getShardSource = function getShardSource() {
+	return null;
+};
+
+proto.getMergeSource = function getMergeSource() {
+	return this;
+};
+
 //NeedsMongodDocumentSource implementation
 klass.needsMongodDocumentSource = true;
 
 proto.getDependencies = function(deps) {
 	deps.needWholeDocument = true;
-	return DocumentSource.GetDepsReturn.EXHAUSTIVE;
+	return DocumentSource.GetDepsReturn.EXHAUSTIVE_ALL;
 };

+ 1 - 1
lib/pipeline/documentSources/ProjectDocumentSource.js

@@ -124,7 +124,7 @@ klass.createFromJson = function(elem, expCtx) {
 proto.getDependencies = function getDependencies(deps) {
 	var path = [];
 	this.OE.addDependencies(deps, path);
-	return base.GetDepsReturn.EXHAUSTIVE;
+	return base.GetDepsReturn.EXHAUSTIVE_FIELDS;
 };
 
 /**

+ 93 - 0
lib/query/DocumentSourceRunner.js

@@ -0,0 +1,93 @@
+"use strict";
+
+var Runner = require('./Runner'),
+	DocumentSource = require('../pipeline/documentSources/DocumentSource');
+
+/**
+ * This class is a runner used to wrap a document source
+ * @param	{Array}	items	The array source of the data
+ **/
+var klass = module.exports = function DocumentSourceRunner(docSrc, pipeline){
+	base.call(this);
+
+	if (!docSrc || !(docSrc instanceof DocumentSource) ) throw new Error('DocumentSource runner requires a DocumentSource');
+	if (pipeline && pipeline.constructor != Array ) throw new Error('DocumentSource runner requires pipeline to be an Array');
+	
+	this._docSrc = docSrc;
+	this._pipeline = pipeline || [];
+	
+	while (this._pipeline.length && this._docSrc.coalesce(this._pipeline[0])) {
+		this._pipeline.shift();
+	}
+	
+	this._state = Runner.RunnerState.RUNNER_ADVANCED;
+}, base = Runner, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
+
+/**
+ * Get the next result from the array.
+ * 
+ * @method getNext
+ * @param [callback] {Function}
+ */
+proto.getNext = function getNext(callback) {
+	var self = this;
+	if (self._state === Runner.RunnerState.RUNNER_ADVANCED) {
+		return self._docSrc.getNext(function (err, obj){
+			if (err){
+				self._state = Runner.RunnerState.RUNNER_ERROR;
+			}
+			if (obj === null){
+				self._state = Runner.RunnerState.RUNNER_EOF;
+			}
+			return callback(err, obj, self._state);
+		});
+	}
+	return callback(null, null, self._state);
+};
+
+/**
+ * Save any state required to yield.
+ * 
+ * @method saveState
+ */
+proto.saveState = function saveState() {
+	//nothing to do here
+};
+
+/**
+ * Restore saved state, possibly after a yield.  Return true if the runner is OK, false if
+ * it was killed.
+ * 
+ * @method restoreState
+ */
+proto.restoreState = function restoreState() {
+	//nothing to do here
+};
+
+/**
+ * Returns a description of the Runner
+ * 
+ * @method getInfo
+ * @param [explain]
+ * @param [planInfo]
+ */
+proto.getInfo = function getInfo(explain) {
+	if (explain){
+		return {
+			type: this.constructor.name,
+			docSrc: this._docSrc.serialize(explain),
+			state: this._state
+		};
+	}
+	return undefined;
+};
+
+/**
+ * dispose of the Runner.
+ * 
+ * @method reset
+ */
+proto.reset = function reset(){
+	this._docSrc.dispose();
+	this._state = Runner.RunnerState.RUNNER_DEAD;
+};

+ 18 - 2
lib/query/index.js

@@ -1,5 +1,21 @@
 "use strict";
+
+var DocumentSource = require('../pipeline/documentSources/DocumentSource'),
+	Runner = require("./Runner.js"),
+	ArrayRunner = require("./ArrayRunner.js"),
+	DocumentSourceRunner = require("./DocumentSourceRunner.js");
+
 module.exports = {
-	Runner: require("./Runner.js"),
-	ArrayRunner: require("./ArrayRunner.js")
+	Runner: Runner,
+	ArrayRunner: ArrayRunner,
+	DocumentSourceRunner: DocumentSourceRunner,
+	getRunner: function(data, queryObj, sortObj, projectionForQuery, sources){
+		if (data && data.constructor === Array){
+			return new ArrayRunner(data);
+		} else if (data && data instanceof DocumentSource){
+			return new DocumentSourceRunner(data, sources);
+		} else {
+			throw new Error('could not construct Runner from given data');
+		}
+	}
 };

+ 92 - 41
test/lib/pipeline/Pipeline.js

@@ -2,7 +2,35 @@
 var assert = require("assert"),
 	Pipeline = require("../../../lib/pipeline/Pipeline"),
 	FieldPath = require("../../../lib/pipeline/FieldPath"),
-	DocumentSource = require('../../../lib/pipeline/documentSources/DocumentSource');
+	DocumentSource = require('../../../lib/pipeline/documentSources/DocumentSource'),
+	CursorDocumentSource = require("../../../lib/pipeline/documentSources/CursorDocumentSource"),
+	ProjectDocumentSource = require("../../../lib/pipeline/documentSources/ProjectDocumentSource"),
+	ArrayRunner = require("../../../lib/query/ArrayRunner");
+
+var addSource = function addSource(match, data) {
+	var cds = new CursorDocumentSource(null, new ArrayRunner(data), null);
+	match.setSource(cds);
+};
+
+var shardedTest = function(inputPipeString, expectedMergePipeString, expectedShardPipeString) {
+	inputPipeString = '{"pipeline": ' + inputPipeString + '}';
+	expectedMergePipeString = '{"pipeline": ' + expectedMergePipeString + '}';
+	expectedShardPipeString = '{"pipeline": ' + expectedShardPipeString + '}';
+	var inputPipe = JSON.parse(inputPipeString),
+		expectedMergePipe = JSON.parse(expectedMergePipeString),
+		expectedShardPipe = JSON.parse(expectedShardPipeString);
+
+	var mergePipe = Pipeline.parseCommand(inputPipe, {});
+	assert.notEqual(mergePipe, null);
+
+	var shardPipe = mergePipe.splitForSharded();
+	assert.notEqual(shardPipe, null);
+
+	assert.deepEqual(shardPipe.serialize()["pipeline"], 
+		expectedShardPipe["pipeline"]);
+	assert.deepEqual(mergePipe.serialize()["pipeline"],
+		expectedMergePipe["pipeline"]);
+};
 
 module.exports = {
 
@@ -36,7 +64,7 @@ module.exports = {
 				};
 
 				proto.getNext = function(callback){
-					var answer = this.current > 0 ? {val:this.current--} : DocumentSource.EOF,
+					var answer = this.current > 0 ? {val:this.current--} : null,
 						err = null;
 
 					if (!this.works)
@@ -90,8 +118,8 @@ module.exports = {
 					{$sort: {"xyz": 1}},
 					{$match: {}}
 				]});
-				assert.equal(p.sourceVector[0].constructor.matchName, "$match");
-				assert.equal(p.sourceVector[1].constructor.sortName, "$sort");
+				assert.equal(p.sources[0].constructor.matchName, "$match");
+				assert.equal(p.sources[1].constructor.sortName, "$sort");
 			},
 
 			"should attempt to coalesce all sources": function () {
@@ -101,8 +129,8 @@ module.exports = {
 					{$test: {coalesce: false}},
 					{$test: {coalesce: false}}
 				]});
-				assert.equal(p.sourceVector.length, 3);
-				p.sourceVector.slice(0, -1).forEach(function (source) {
+				assert.equal(p.sources.length, 3);
+				p.sources.slice(0, -1).forEach(function (source) {
 					assert.equal(source.coalesceWasCalled, true);
 				});
 				assert.equal(p.sources[p.sources.length -1].coalesceWasCalled, false);
@@ -113,10 +141,35 @@ module.exports = {
 					{$test: {coalesce: false}},
 					{$test: {coalesce: false}}
 				]});
-				p.sourceVector.forEach(function (source) {
+				p.sources.forEach(function (source) {
 					assert.equal(source.optimizeWasCalled, true);
 				});
 			}
+		},
+
+		"sharded": {
+
+			"should handle empty pipeline for sharded": function () {
+				var inputPipe = "[]",
+					expectedMergePipe = "[]",
+					expectedShardPipe = "[]";
+				shardedTest(inputPipe, expectedMergePipe, expectedShardPipe);
+			},
+
+			"should handle one unwind": function () {
+				var inputPipe = '[{"$unwind":"$a"}]',
+					expectedMergePipe = "[]",
+					expectedShardPipe = '[{"$unwind":"$a"}]';
+				shardedTest(inputPipe, expectedMergePipe, expectedShardPipe);
+			},
+
+			"should handle two unwinds": function () {
+				var inputPipe = '[{"$unwind":"$a"}, {"$unwind":"$b"}]',
+					expectedMergePipe = "[]",
+					expectedShardPipe = '[{"$unwind": "$a"}, {"$unwind": "$b"}]';
+				shardedTest(inputPipe, expectedMergePipe, expectedShardPipe);
+
+			}
 
 		},
 
@@ -126,45 +179,31 @@ module.exports = {
 				p.stitch();
 				assert.equal(p.sources[1].source, p.sources[0]);
 			}
-			},
+		},
 
-		"#_runSync": {
+		"#run": {
 
-			"should iterate through sources and return resultant array": function () {
+			"should iterate through sources and return resultant array": function (done) {
 				var p = Pipeline.parseCommand({pipeline:[{$test:{coalesce:false}}, {$test:{coalesce:false}}, {$test:{coalesce:false}}]}),
-					results = p.run(function(err, results) {
-						assert.deepEqual(results.result, [ { val: 5 }, { val: 4 }, { val: 3 }, { val: 2 }, { val: 1 } ]);
-				});
-			},
-
-			"should catch parse errors": function () {
-				// The $foo part is invalid and causes a throw.
-				assert.throws(function () {
-					Pipeline.parseCommand({pipeline: [
-						{$match: {$foo: {bar: "baz"}}}
-					]});
+					results = [];
+				p.run(function(err, doc) {
+					if (err) throw err;
+					if (!doc){
+						assert.deepEqual(results, [ { val: 5 }, { val: 4 }, { val: 3 }, { val: 2 }, { val: 1 } ]);
+						done();
+					} else {
+						results.push(doc);
+					}
 				});
 			},
-
-			"should call callback with errors from pipeline components": function (next) {
-				var p = Pipeline.parseCommand({pipeline: [
-					{$match: {foo: {bar: "baz"}}}
-				]});
-				p.run(new DocumentSource({}), function (err, results) {
-					assert(err instanceof Error);
-					return next();
+			"should handle sources that return errors": function (done) {
+				var p = Pipeline.parseCommand({pipeline:[{$test:{works:false}}]}),
+					results = [];
+				p.run(function(err, doc) {
+					assert(err);
+					done();
 				});
 			}
-
-		},
-
-		"#_runAsync": {
-			"should iterate through sources and return resultant array asynchronously": function () {
-				var p = Pipeline.parseCommand({pipeline:[{$test:{coalesce:false}}, {$test:{coalesce:false}}, {$test:{coalesce:false}}]}),
-					results = p.run(function(err, results) {
-						assert.deepEqual(results.result, [ { val: 5 }, { val: 4 }, { val: 3 }, { val: 2 }, { val: 1 } ]);
-					});
-		}
 		},
 
 		"#addInitialSource": {
@@ -181,9 +220,21 @@ module.exports = {
 				p.addInitialSource(initialSource);
 				p.stitch();
 				assert.equal(p.sources[1].source, p.sources[0]);
-	}
-		}
+			}
+		},
+
+		"#getDependencies()": {
 
+			"should properly detect dependencies": function testGetDependencies() {
+				var p = Pipeline.parseCommand({pipeline: [
+					{$sort: {"xyz": 1}},
+					{$project: {"a":"$xyz"}}
+				]});
+				var depsTracker = p.getDependencies();
+				assert.equal(Object.keys(depsTracker.fields).length, 2);
+			}
+
+		}
 	}
 
 };

+ 67 - 67
test/lib/pipeline/PipelineD.js

@@ -10,78 +10,19 @@ module.exports = {
 
 	"PipelineD": {
 
-		before: function(){
-
-			Pipeline.stageDesc.$test = (function(){
-
-				var klass = function TestDocumentSource(options, ctx){
-					base.call(this, ctx);
-
-					this.shouldCoalesce = options.coalesce;
-					this.coalesceWasCalled = false;
-					this.optimizeWasCalled = false;
-					this.resetWasCalled = false;
-
-					this.current = 5;
-				}, TestDocumentSource = klass, base = DocumentSource, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
-
-				proto.coalesce = function(){
-					this.coalesceWasCalled = true;
-					var c = this.shouldCoalesce;//only coalesce with the first thing we find
-					this.shouldCoalesce = false;
-					return c;
-				};
-
-				proto.optimize = function(){
-					this.optimizeWasCalled = true;
-				};
-
-				proto.eof = function(){
-					return this.current < 0;
-				};
-
-				proto.advance = function(){
-					this.current = this.current - 1;
-					return !this.eof();
-				};
-
-				proto.getCurrent = function(){
-					return this.current;
-				};
-
-				proto.reset = function(){
-					this.resetWasCalled = true;
-				};
-
-				proto.getDependencies = function(deps){
-					if (!deps.testDep){
-						deps.testDep = 1;
-						return DocumentSource.GetDepsReturn.EXHAUSTIVE;
-					}
-					return DocumentSource.GetDepsReturn.SEE_NEXT;
-				};
-
-				klass.createFromJson = function(options, ctx){
-					return new TestDocumentSource(options, ctx);
-				};
-
-				return klass;
-			})().createFromJson;
-
-		},
-
 		"prepareCursorSource": {
 
 			"should place a CursorDocumentSource in pipeline": function () {
-				var p = Pipeline.parseCommand({pipeline:[{$test:{coalesce:false}}, {$test:{coalesce:false}}], aggregate:[]}),
+				var p = Pipeline.parseCommand({pipeline:[{$match:{a:true}}], aggregate:[]}),
 					cs = PipelineD.prepareCursorSource(p, {ns:[1,2,3,4,5]});
 				assert.equal(p.sources[0].constructor, CursorDocumentSource);
 			},
 
 			"should get projection from all sources": function () {
-				var p = Pipeline.parseCommand({pipeline:[{$test:{coalesce:false}}, {$test:{coalesce:false}}], aggregate:[]}),
+				var p = Pipeline.parseCommand({pipeline:[{$project:{a:"$x"}}], aggregate:[]}),
 					cs = PipelineD.prepareCursorSource(p, {ns:[1,2,3,4,5]});
-				assert.deepEqual(p.sources[0]._projection, {"_id":0,"testDep":1});
+				assert.deepEqual(p.sources[0]._projection, {x:1, _id:1});
+				assert.deepEqual(p.sources[0]._dependencies, {_fields:{_id:true, x:true}});
 			},
 
 			"should get projection's deps": function () {
@@ -105,9 +46,9 @@ module.exports = {
 				};
 				var p = Pipeline.parseCommand(cmdObj),
 					cs = PipelineD.prepareCursorSource(p, {ns:[1,2,3,4,5]});
-				assert.equal(JSON.stringify(p.sources[0]._projection), JSON.stringify({'a.b.c': 1, d: 1, 'e.f.g': 1, _id: 1}));
+				assert.deepEqual(p.sources[0]._projection, {'a.b.c': 1, d: 1, 'e.f.g': 1, _id: 1});
+				assert.deepEqual(p.sources[0]._dependencies, {"_fields":{"_id":true,"a":{"b":{"c":true}},"d":true,"e":{"f":{"g":true}}}});
 			},
-
 			"should get group's deps": function(){
 				var cmdObj = {
 					aggregate: [],
@@ -131,8 +72,67 @@ module.exports = {
 				};
 				var p = Pipeline.parseCommand(cmdObj),
 					cs = PipelineD.prepareCursorSource(p, {ns:[1,2,3,4,5]});
-				assert.equal(JSON.stringify(p.sources[0]._projection), JSON.stringify({ _id: 0, a: 1, b: 1, 'x.y.z': 1 }));
-			}
+				assert.equal(JSON.stringify(p.sources[0]._projection), JSON.stringify({ a: 1, b: 1, 'x.y.z': 1, _id: 0 }));
+				assert.deepEqual(p.sources[0]._dependencies, {"_fields":{"a":true,"b":true,"x":{"y":{"z":true}}}});
+			},
+			"should set the queryObj on the Cursor": function(){
+				var cmdObj = {
+					aggregate: [],
+					pipeline: [
+						{$match:{
+							x:{$exists:true},
+							y:{$exists:false}
+						}}
+					]
+				};
+				var p = Pipeline.parseCommand(cmdObj),
+					cs = PipelineD.prepareCursorSource(p, {ns:[1,2,3,4,5]});
+				assert.deepEqual(p.sources[0]._query, {x:{$exists: true}, y:{$exists:false}});
+			},
+			"should set the sort on the Cursor": function(){
+				var cmdObj = {
+					aggregate: [],
+					pipeline: [
+						{$sort:{
+							x:1,
+							y:-1
+						}}
+					]
+				};
+				var p = Pipeline.parseCommand(cmdObj),
+					cs = PipelineD.prepareCursorSource(p, {ns:[1,2,3,4,5]});
+				assert.deepEqual(p.sources[0]._sort, {x:1, y:-1});
+			},
+			"should set the sort on the Cursor if there is a match first": function(){
+				var cmdObj = {
+					aggregate: [],
+					pipeline: [
+						{$match:{
+							x:{$exists:true},
+							y:{$exists:false}
+						}},
+						{$sort:{
+							x:1,
+							y:-1
+						}}
+					]
+				};
+				var p = Pipeline.parseCommand(cmdObj),
+					cs = PipelineD.prepareCursorSource(p, {ns:[1,2,3,4,5]});
+				assert.deepEqual(p.sources[0]._sort, {x:1, y:-1});
+			},
+			"should coalesce the Cursor with the rest of the pipeline": function(){
+				var cmdObj = {
+					aggregate: [],
+					pipeline: [
+						{$limit:1}
+					]
+				};
+				var p = Pipeline.parseCommand(cmdObj),
+					cs = PipelineD.prepareCursorSource(p, {ns:[1,2,3,4,5]});
+				assert.equal(p.sources[0].getLimit(), 1);
+				assert.equal(p.sources.length, 1);
+			},
 		}
 	}
 

+ 19 - 5
test/lib/pipeline/documentSources/CursorDocumentSource.js

@@ -5,6 +5,8 @@ var assert = require("assert"),
 	CursorDocumentSource = require("../../../../lib/pipeline/documentSources/CursorDocumentSource"),
 	LimitDocumentSource = require("../../../../lib/pipeline/documentSources/LimitDocumentSource"),
 	SkipDocumentSource = require("../../../../lib/pipeline/documentSources/SkipDocumentSource"),
+	ProjectDocumentSource = require("../../../../lib/pipeline/documentSources/ProjectDocumentSource"),
+	DepsTracker = require("../../../../lib/pipeline/DepsTracker"),
 	ArrayRunner = require("../../../../lib/query/ArrayRunner");
 
 var getCursorDocumentSource = function(values) {
@@ -136,11 +138,23 @@ module.exports = {
 
 		"#setProjection": {
 
-			"should set a projection": function() {
-				var cds = getCursorDocumentSource();
-				cds.setProjection({a:1}, {a:true});
-				assert.deepEqual(cds._projection, {a:1});
-				assert.deepEqual(cds._dependencies, {a:true});
+			"should set a projection": function(next) {
+				var cds = getCursorDocumentSource([{a:1, b:2},{a:2, b:3}]),
+					deps = new DepsTracker(),
+					project = ProjectDocumentSource.createFromJson({"a":1});
+				project.getDependencies(deps);
+				cds.setProjection(deps.toProjection(), deps.toParsedDeps());
+				
+				async.series([
+						cds.getNext.bind(cds),
+						cds.getNext.bind(cds),
+						cds.getNext.bind(cds)
+					],
+					function(err,res) {
+						assert.deepEqual([{a:1},{a:2},null], res);
+						next();
+					}
+				);
 			}
 
 		}

+ 1 - 1
test/lib/pipeline/documentSources/ProjectDocumentSource.js

@@ -261,7 +261,7 @@ module.exports = {
 			};
 			var pds = createProject(input);
 			var dependencies = new DepsTracker();
-			assert.equal(DocumentSource.GetDepsReturn.EXHAUSTIVE, pds.getDependencies(dependencies));
+			assert.equal(DocumentSource.GetDepsReturn.EXHAUSTIVE_FIELDS, pds.getDependencies(dependencies));
 			assert.equal(5, Object.keys(dependencies.fields).length);
 			assert.ok(dependencies.fields._id);
 			assert.ok(dependencies.fields.a);

+ 148 - 0
test/lib/query/DocumentSourceRunner.js

@@ -0,0 +1,148 @@
+"use strict";
+var assert = require("assert"),
+	Runner = require("../../../lib/query/Runner"),
+	CursorDocumentSource = require("../../../lib/pipeline/documentSources/CursorDocumentSource"),
+	LimitDocumentSource = require("../../../lib/pipeline/documentSources/LimitDocumentSource"),
+	MatchDocumentSource = require("../../../lib/pipeline/documentSources/MatchDocumentSource"),
+	ArrayRunner = require("../../../lib/query/ArrayRunner"),
+	DocumentSourceRunner = require("../../../lib/query/DocumentSourceRunner");
+
+
+module.exports = {
+
+	"ArrayRunner": {
+		"#constructor": {
+			"should accept an array of data": function(){
+				var cds = new CursorDocumentSource(null, new ArrayRunner([]), null),
+					pipeline = [];
+				assert.doesNotThrow(function(){
+					var ar = new DocumentSourceRunner(cds, pipeline);
+				});
+			},
+			"should fail if not given a document source or pipeline": function(){
+				var cds = new CursorDocumentSource(null, new ArrayRunner([]), null);
+				
+				assert.throws(function(){
+					var ar = new DocumentSourceRunner();
+				});
+				assert.throws(function(){
+					var ar = new DocumentSourceRunner(123);
+				});
+				assert.throws(function(){
+					var ar = new DocumentSourceRunner(cds, 123);
+				});
+			},
+			"should coalesce the pipeline into the given documentsource": function(){
+				var cds = new CursorDocumentSource(null, new ArrayRunner([]), null),
+					pipeline = [new LimitDocumentSource(3), new MatchDocumentSource({"a":true})],
+					expected = [{$match:{a:true}}];
+				
+				var ds = new DocumentSourceRunner(cds, pipeline);
+				var actual = pipeline.map(function(d){return d.serialize();});
+				
+				assert.deepEqual(expected, actual);
+			}
+		},
+		"#getNext": {
+			"should return the next item in the given documentsource": function(done){
+				var cds = new CursorDocumentSource(null, new ArrayRunner([1,2,3]), null),
+					pipeline = [new LimitDocumentSource(3)];
+				
+				var ds = new DocumentSourceRunner(cds, pipeline);
+				
+				ds.getNext(function(err, out, state){
+					assert.strictEqual(state, Runner.RunnerState.RUNNER_ADVANCED);
+					assert.strictEqual(out, 1);
+					ds.getNext(function(err, out, state){
+						assert.strictEqual(state, Runner.RunnerState.RUNNER_ADVANCED);
+						assert.strictEqual(out, 2);
+						ds.getNext(function(err, out, state){
+							assert.strictEqual(state, Runner.RunnerState.RUNNER_ADVANCED);
+							assert.strictEqual(out, 3);
+							done();
+						});
+					});
+				});
+			},
+			"should return EOF if there is nothing left in the given documentsource": function(done){
+				var cds = new CursorDocumentSource(null, new ArrayRunner([1,2,3]), null),
+					pipeline = [new LimitDocumentSource({}, 1)];
+				
+				var ds = new DocumentSourceRunner(cds, pipeline);
+				
+				ds.getNext(function(err, out, state){
+					assert.strictEqual(state, Runner.RunnerState.RUNNER_ADVANCED);
+					assert.strictEqual(out, 1);
+					ds.getNext(function(err, out, state){
+						assert.strictEqual(state, Runner.RunnerState.RUNNER_EOF);
+						assert.strictEqual(out, null);
+						done();
+					});
+				});
+			}
+		},
+		"#getInfo": {
+			"should return nothing if explain flag is not set": function(){
+				var cds = new CursorDocumentSource(null, new ArrayRunner([1,2,3]), null),
+					pipeline = [new LimitDocumentSource({}, 1)];
+				
+				var ds = new DocumentSourceRunner(cds, pipeline);
+				assert.strictEqual(ds.getInfo(), undefined);
+			},
+			"should return information about the runner if explain flag is set": function(){
+				var cds = new CursorDocumentSource(null, new ArrayRunner([1,2,3]), null),
+					pipeline = [new LimitDocumentSource({}, 1)];
+				var ds = new DocumentSourceRunner(cds, pipeline);
+				
+				assert.deepEqual(ds.getInfo(true), {
+					"type": "DocumentSourceRunner",
+					"docSrc": {
+						"$cursor": {
+							"query": undefined,
+							"sort": null,
+							"limit": 1,
+							"fields": null,
+							"plan": {
+								"type": "ArrayRunner",
+								"nDocs": 3,
+								"position": 0,
+								"state": "RUNNER_ADVANCED"
+							}
+						}
+					},
+					"state": "RUNNER_ADVANCED"
+				});
+			}
+		},
+		"#reset": {
+			"should dispose of the documentSource": function(){
+				var cds = new CursorDocumentSource(null, new ArrayRunner([1,2,3]), null),
+					pipeline = [new LimitDocumentSource({}, 1)];
+				var ds = new DocumentSourceRunner(cds, pipeline);
+				
+				ds.reset();
+				assert.deepEqual(ds.getInfo(true), {
+					"type": "DocumentSourceRunner",
+					"docSrc": {
+						"$cursor": {
+							"query": undefined,
+							"sort": null,
+							"limit": 1,
+							"fields": null,
+							"plan": {
+								"type": "ArrayRunner",
+								"nDocs": 0,
+								"position": 0,
+								"state": "RUNNER_DEAD"
+							}
+						}
+					},
+					"state": "RUNNER_DEAD"
+				});
+			}
+		}
+	}
+
+};
+
+if (!module.parent)(new(require("mocha"))()).ui("exports").reporter("spec").addFile(__filename).run();