Browse Source

Added CursorDocumentSource, Cursor, PipelineCommand, Pipeline, and PipelineD classes. Also removed some old code and fixed the top level munge to use PipelineCommand. refs #998, #1615, #1616

http://source.rd.rcg.local/trac/eagle6/changeset/1330/Eagle6_SVN
Philip Murray 12 years ago
parent
commit
9f58d6c842

+ 2 - 1
README.md

@@ -23,7 +23,8 @@ Here is a list of the major items where I have deviated from the MongoDB code an
     * DESIGN: The `{FOO}Expression` classes do not provide `create` statics since calling new is easy enough
       * DESIGN: To further this, the `CompareExpression` class doesn't provide any of it's additional `create{FOO}` helpers so instead I'm binding the appropriate args to the ctor
     * TESTING: Most of the expression tests have been written without the expression test base classes
-
+  * Document sources
+  	* we have implemented a 'reset' method for all document sources so that we can reuse them against different streams of data
 
 
 TODO

+ 70 - 0
lib/Cursor.js

@@ -0,0 +1,70 @@
+var Cursor = module.exports = (function(){
+	// CONSTRUCTOR
+	/**
+	 * This class is the munge equivalent version of the mongo cursor.  Not everything is implemented
+	 * since we only need bits and pieces of their functionality, but the methods that exist
+	 * should be have the same as they do in mongo.  
+	 * 
+	 * stream-utils.throughStream should eventually be supported, but for now it will probably break things (so dont use it)
+	 * 
+	 * @param	{Array}	throughStreamOrArray	The array source of the data
+	**/
+	var klass = function Cursor(throughStreamOrArray){
+		var self = this;
+		
+		if (!throughStreamOrArray){
+			throw new Error("Cursor requires a stream-utils.ThroughStream or Array object.");
+		}
+		
+		if (throughStreamOrArray.constructor === su.ThroughStream){
+			this.throughStream = throughStreamOrArray;
+			this.cachedData = [];
+			
+			throughStreamOrArray.on('data', function(data){
+				self.cachedData.push(data);
+			});
+		} else if (throughStreamOrArray.constructor === Array){
+			this.cachedData = throughStreamOrArray.splice(0);
+		} else {
+			throw new Error("Cursor requires a stream-utils.ThroughStream or Array object.");
+		}
+		
+	}, base = Object, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
+	
+	var su = require("stream-utils");
+
+
+	proto.ok = function ok(){
+		if (this.throughStream && this.throughStream.readable){
+			return true;
+		}
+		return this.cachedData.length > 0 || this.hasOwnProperty("curr");
+	};
+
+
+	proto.advance = function advance(){
+		if (this.cachedData.length === 0){
+			delete this.curr;
+			return false;
+		}
+		this.curr = this.cachedData.splice(0,1)[0];
+		
+		//TODO: CHANGE ME!!!!!
+		//Note: !!BLOCKING CODE!!  need to coerce our async stream of objects into a synchronous cursor to mesh with mongos c++ish code
+		while (!this.curr && this.throughStream && this.throughStream.readable){
+			this.curr = this.cachedData.splice(0,1)[0];
+		}
+		
+		return this.curr;
+	};
+	
+	proto.current = function current(){
+		if (!this.hasOwnProperty("curr")){
+			this.advance();
+		}
+		return this.curr;
+	};
+	
+	
+	return klass;
+})();

+ 0 - 20
lib/Op.js

@@ -1,20 +0,0 @@
-var su = require("stream-utils");
-
-/** A base class for all pipeline operators; Handles top-level pipeline operator definitions to provide a Stream that transforms Objects **/
-var Op = module.exports = (function(){
-	// CONSTRUCTOR
-	var base = su.ThroughStream, proto, klass = function Op(opts){
-		this.opts = opts;
-		base.call(this, {write:this.write, end:this.end, reset:this.reset});
-	};
-	proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
-
-	// PROTOTYPE MEMBERS
-	//NOTE: see the stream-utils's through() docs for more info
-	//proto.write = function(obj){ this.queue(obj); }
-	//proto.end = function(){ this.queue("LAST"); }
-	//proto.reset = function(){ this.queue("LAST"); }
-
-	return klass;
-})();
-

+ 41 - 0
lib/commands/PipelineCommand.js

@@ -0,0 +1,41 @@
+var PipelineCommand = module.exports = (function(){
+	// CONSTRUCTOR
+	var klass = function PipelineCommand(cmdObj){
+        /* try to parse the command; if this fails, then we didn't run */
+        
+        //NOTE: this is different from the mongo implementation.  It used to be in the 'run' method that we would parse the pipeline, 
+        //but we decided it would be better to be able to save the parsed command
+        this.pPipeline = Pipeline.parseCommand(cmdObj);
+	}, base = Object, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
+
+	var Pipeline = require('../pipeline/Pipeline'),
+		PipelineD = require('../pipeline/PipelineD');
+
+	/**
+	 * Execute the pipeline.
+	**/
+	proto.executePipeline = function executePipeline(result, pPipeline, pSource){
+		return pPipeline.run(result, pSource);
+	};
+	
+	/**
+	 * Documents are retrieved until the
+	 * cursor is exhausted (or another termination condition occurs).
+	**/
+	proto.runExecute = function runExecute(result, db){
+		var pSource = PipelineD.prepareCursorSource(this.pPipeline, db);
+        return this.executePipeline(result, this.pPipeline, pSource);
+	};
+	
+	/**
+	 * run the command
+	**/
+	proto.run = function run(db, result){
+		if (!this.pPipeline){
+			return false;
+		}
+        return this.runExecute(result, db);
+	};
+
+	return klass;
+})();

+ 12 - 40
lib/munge.js

@@ -1,58 +1,30 @@
-var su = require("stream-utils");
+var PipelineCommand = require('./commands/PipelineCommand');
 
 var Munger = (function(){
 	// CONSTRUCTOR
 	var base = Object, proto, klass = function Munger(ops){
-		this.ops = typeof(ops) == "object" && typeof(ops.length) === "number" ? ops : Array.prototype.slice.call(arguments, 0);
-		this.opStreams = this.ops.map(function opCompiler(op, i){	//TODO: demote to local only?
-			if(typeof(op) !== "object")
-				throw new Error("pipeline element " + i + " is not an object");
-			for(var opName in op) break;	// get first key
-			if(typeof(op) === "function")
-				return su.through(op);
-			if(!(opName in klass.ops))
-				throw new Error("Unrecognized pipeline op: " + JSON.stringify({opName:opName}));
-			var IOp = klass.ops[opName];
-			return new IOp(op[opName], i);
-		});
-console.log("OPS:", this.ops);
-		this.pipeline = new su.PipelineStream(this.opStreams);
+		if (!ops){
+			throw new Error("munge requires a pipeline!");
+		}
+		if (typeof ops.length !== "number"){
+			ops = [ops];
+		}
+		this.pipeline = new PipelineCommand(ops);
 	};
 	proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
 
-	// STATIC MEMBERS
-//	klass.ops = {
-//		$skip: SkipOp,
-//		$limit: LimitOp,
-//		$match: MatchOp,
-//		$project: ProjectOp,
-//		$unwind: UnwindOp,
-//		$group: GroupOp,
-//		$sort: SortOp
-//	};
-
 	// PROTOTYPE MEMBERS
 	proto.execute = function execute(inputs){
-console.debug("\n#execute called with:", inputs);
-		var outputs = [];
-//TODO: why does this break things??
-this.pipeline.reset();
-		this.pipeline.on("data", function(data){
-console.debug("PIPELINE WRITE TO OUTPUTS:", data);
-			outputs.push(data);
-		});
-		inputs.forEach(this.pipeline.write);
-console.debug("PIPELINE ENDING...");
-		this.pipeline.end();
-		this.pipeline.reset();
-		return outputs;
+		var result = {};
+		result.ok = this.pipeline.run(inputs, result);
+		return result;
 	};
 
 	return klass;
 })();
 
 
-module.exports = function mung(ops, inputs) {
+module.exports = function munge(ops, inputs) {
 	var munger = new Munger(ops);
 	if(inputs)
 		return munger.execute(inputs);

+ 0 - 16
lib/ops/GroupOp.js

@@ -1,16 +0,0 @@
-var Op = require("../Op");
-
-//TODO: ...write this...
-var GroupOp = module.exports = (function(){
-	// CONSTRUCTOR
-	var base = Op, proto, klass = function GroupOp(opts){
-		base.call(this, opts);
-	};
-	proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
-
-	// PROTOTYPE MEMBERS
-	proto.write = function writeProjected(obj){
-	};
-
-	return klass;
-})();

+ 0 - 23
lib/ops/LimitOp.js

@@ -1,23 +0,0 @@
-var Op = require("../Op");
-
-/** The $limit operator; opts is the number of Objects to allow before preventing further data to pass through. **/
-var LimitOp = module.exports = (function(){
-	// CONSTRUCTOR
-	var base = Op, proto, klass = function LimitOp(opts){
-		this.n = 0;
-		base.call(this, opts);
-	};
-	proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
-
-	// PROTOTYPE MEMBERS
-	proto.write = function writeUnlessLimitted(obj){
-		if(this.n++ < this.opts)
-			this.queue(obj);
-		//else this.end();	//TODO: this should work but we need to hook up the end event to preceeding things in the pipeline for it to function
-	};
-	proto.reset = function resetLimitter(){
-		this.n = 0;
-	};
-
-	return klass;
-})();

+ 0 - 20
lib/ops/MatchOp.js

@@ -1,20 +0,0 @@
-var Op = require("../Op"),
-	sift = require("sift");
-
-/** The $match operator; opts is the expression to be used when matching Objects. **/
-var MatchOp = module.exports = (function(){
-	// CONSTRUCTOR
-	var base = Op, proto, klass = function MatchOp(opts){
-		this.sifter = sift(opts);
-		base.call(this, opts);
-	};
-	proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
-
-	// PROTOTYPE MEMBERS
-	proto.write = function writeIfMatches(obj){
-		if(this.sifter.test(obj))
-			this.queue(obj);
-	};
-
-	return klass;
-})();

+ 0 - 19
lib/ops/ProjectOp/ProjectOp.js

@@ -1,19 +0,0 @@
-var Op = require("../../Op");
-
-//TODO: ...write this...
-var ProjectOp = module.exports = (function(){
-	// CONSTRUCTOR
-	var base = Op, proto, klass = function ProjectOp(opts){
-		base.call(this, opts);
-	};
-	proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
-
-	// STATIC MEMBERS
-	klass.expressions = undefined; //TODO: ...
-
-	// PROTOTYPE MEMBERS
-	proto.write = function writeProjected(obj){
-	};
-
-	return klass;
-})();

+ 0 - 1
lib/ops/ProjectOp/index.js

@@ -1 +0,0 @@
-module.exports = require("./ProjectOp.js");

+ 0 - 25
lib/ops/SkipOp.js

@@ -1,25 +0,0 @@
-var Op = require("../Op");
-
-/** The $skip operator; opts is the number of Objects to skip. **/
-var SkipOp = module.exports = (function(){
-	// CONSTRUCTOR
-	var base = Op, proto, klass = function SkipOp(opts){
-		this.n = 0;
-		base.call(this, opts);
-	};
-	proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
-
-	// PROTOTYPE MEMBERS
-	proto.write = function writeUnlessSkipped(obj){
-//console.debug("$skip write:", {opIndex:this.idx, skip:this.opts, n:this.n, isSkip:(this.n < this.opts), obj:obj});
-		if(this.n++ >= this.opts)
-			this.queue(obj);
-	};
-
-	proto.reset = function resetSkipper(){
-		this.n = 0;
-	};
-
-	return klass;
-})();
-

+ 0 - 72
lib/ops/SortOp.js

@@ -1,72 +0,0 @@
-var Op = require("../Op"),
-	traverse = require("traverse");
-
-//TODO: ...write this...
-var SortOp = module.exports = (function(){
-	// CONSTRUCTOR
-	var base = Op, proto, klass = function SortOp(opts){
-			// Parse sorts from options object
-			if(typeof(opts) !== "object") throw new Error("the $sort key specification must be an object");
-			this.sorts = [];
-			for(var p in opts){
-				if(p[0] === "$") throw new Error("$sort: FieldPath field names may not start with '$'.; code 16410");
-				if(p === "") throw new Error("$sort: FieldPath field names may not be empty strings.; code 15998");
-				this.sorts.push({path:p.split("."), direction:opts[p]});
-			}
-console.log("SORTS FOR $sort OP:", this.sorts);
-			this.objs = [];
-			base.call(this, opts);
-	};
-	proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
-
-	// PRIVATE STUFF
-	// Helpers for sorting
-	var types = ["undefined", "null", "NaN", "number", "string", "object", "boolean", "Date"];
-	function getTypeOf(o){
-		if(o === undefined) return "undefined";
-		if(o === null) return "null";
-		if(isNaN(o)) return "NaN";
-		if(o.constructor === Date) return "Date";
-		return typeof(o);
-	}
-
-	// PROTOTYPE MEMBERS
-	proto.write = function writeDeferredForSorting(obj){
-console.log("$sort deferring:", obj);
-		this.objs.push(obj);
-	};
-
-	proto.end = function endSort(obj){
-console.log("$sort end event");
-		if(this.objs.length){
-			console.log("OBJS TO BE SORTED:", this.objs);
-			this.objs.sort(function(a, b){
-				for(var i = 0, l = this.sorts.length; i < l; i++){
-					//TODO: this probably needs to compareDeep using traverse(a).forEach(...check b...) or similar
-					var sort = this.sorts[i],
-						aVal = traverse(a).get(sort.path), aType = getTypeOf(aVal),
-						bVal = traverse(b).get(sort.path), bType = getTypeOf(bVal);
-					// null and undefined go first
-					if(aType !== bType){
-						return (types.indexOf(aType) - types.indexOf(bType)) * sort.direction;
-					}else{
-						// don't trust type cohersion
-						if(aType == "number") bVal = parseFloat(bVal);
-						if(isNaN(bVal)) return 1;
-						if(aType == "string") bVal = bVal.toString();
-						// return sort value only if it can be determined at this level
-						if(aVal < bVal) return -1 * sort.direction;
-						if(aVal > bVal) return 1 * sort.direction;
-					}
-				}
-				return 0;
-			});
-console.log("$sort has sorted");
-			for(var i = 0, l = this.objs.length; i < l; i++)
-				this.queue(this.objs[i]);
-		}
-		this.end();
-	};
-
-	return klass;
-})();

+ 0 - 34
lib/ops/UnwindOp.js

@@ -1,34 +0,0 @@
-var Op = require("../Op");
-
-/** The $unwind operator; opts is the $-prefixed path to the Array to be unwound. **/
-var UnwindOp = module.exports = (function(){
-	// CONSTRUCTOR
-	var base = Op, proto, klass = function UnwindOp(opts){
-		if(!opts || opts[0] != "$")
-			throw new Error("$unwind: field path references must be prefixed with a '$' (" + JSON.stringify(opts) + "); code 15982");
-		this.path = opts.substr(1).split(".");
-		base.call(this, opts);
-	};
-	proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
-
-	// PROTOTYPE MEMBERS
-	proto.write = function writeUnwound(obj){
-		var t = traverse(obj),
-			val = t.get(this.path);
-		if(val !== undefined){
-			if(val.constructor.name !== "Array")
-				throw new Error("$unwind: value at end of field path must be an array; code 15978");
-			else{
-				t.set(this.path, null);	// temporarily set this to null to avoid needlessly cloning it below
-				for(var i = 0, l = val.length; i < l; i++){
-					var o = t.clone();
-					traverse(o).set(this.path, val[i]);
-					this.queue(o);
-				}
-				t.set(this.path, val);	// be nice and put this back on the original just in case somebody cares
-			}
-		}
-	};
-
-	return klass;
-})();

+ 192 - 0
lib/pipeline/Pipeline.js

@@ -0,0 +1,192 @@
+var Pipeline = module.exports = (function(){
+	// CONSTRUCTOR
+	var klass = function Pipeline(){
+		
+		this.sourceVector = [];//should be provate?
+		
+	}, base = Object, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
+	
+//	var GroupDocumentSource = require('./documentSources/GroupDocumentSource'),
+//		LimitDocumentSource = require('./documentSources/LimitDocumentSource'),
+//		MatchDocumentSource = require('./documentSources/MatchDocumentSource'),
+//		ProjectDocumentSource = require('./documentSources/ProjectDocumentSource'),
+//		SkipDocumentSource = require('./documentSources/SkipDocumentSource'),
+//		SortDocumentSource = require('./documentSources/SortDocumentSource'),
+//		UnwindDocumentSource = require('./documentSources/UnwindDocumentSource');
+	
+	klass.StageDesc = {};//attaching this to the class for test cases
+//	StageDesc[GroupDocumentSource.groupName] = GroupDocumentSource;
+//	StageDesc[LimitDocumentSource.limitName] = LimitDocumentSource;
+//	StageDesc[MatchDocumentSource.matchName] = MatchDocumentSource;
+//	StageDesc[ProjectDocumentSource.projectName] = ProjectDocumentSource;
+//	StageDesc[SkipDocumentSource.skipName] = SkipDocumentSource;
+//	StageDesc[SortDocumentSource.sortName] = SortDocumentSource;
+//	StageDesc[UnwindDocumentSource.unwindName] = UnwindDocumentSource;
+	
+    /**
+     * Create a pipeline from the command.
+	 *
+     * @param	{Object} cmdObj the command object sent from the client
+     * @returns	{Array}	the pipeline, if created, otherwise a NULL reference
+     **/
+	klass.parseCommand = function parseCommand(cmdObj){
+		var pipelineInstance = new Pipeline(),
+			pipeline = cmdObj;//munge: skipping the command parsing since all we care about is the pipeline
+		
+		var sourceVector = pipelineInstance.sourceVector,
+			nSteps = pipeline.length;
+		for( var iStep = 0; iStep<nSteps; ++iStep){
+            /* pull out the pipeline element as an object */
+			var pipeElement = pipeline[iStep];
+			if (!(pipeElement instanceof Object)){
+				throw new Error("pipeline element " + iStep + " is not an object; code 15942" );
+			}
+			
+            // Parse a pipeline stage from 'obj'.
+			var obj = pipeElement;
+			if (Object.keys(obj).length !== 1){
+				throw new Error("A pipeline stage specification object must contain exactly one field; code 16435" );
+			}
+            // Create a DocumentSource pipeline stage from 'stageSpec'.
+            var stageName = Object.keys(obj)[0],
+				stageSpec = obj[stageName],
+				Desc = klass.StageDesc[stageName];
+				
+			if (!Desc){
+				throw new Error("Unrecognized pipeline stage name: '" + stageName + "'; code 16435" );
+			}
+			
+            var stage = new Desc(stageSpec);
+            //verify(stage);
+            stage.setPipelineStep(iStep);
+            sourceVector.push(stage);
+		}
+		
+        /* if there aren't any pipeline stages, there's nothing more to do */
+		if (!sourceVector.length){
+			return pipelineInstance;
+		}
+		
+		/*
+          Move filters up where possible.
+
+          CW TODO -- move filter past projections where possible, and noting
+          corresponding field renaming.
+        */
+
+        /*
+          Wherever there is a match immediately following a sort, swap them.
+          This means we sort fewer items.  Neither changes the documents in
+          the stream, so this transformation shouldn't affect the result.
+
+          We do this first, because then when we coalesce operators below,
+          any adjacent matches will be combined.
+         */
+        for(var srcn = sourceVector.length, srci = 1; srci < srcn; ++srci) {
+            var source = sourceVector[srci];
+            if (source.constructor === klass.MatchDocumentSource) { //TODO: remove 'klass.' once match is implemented!!!
+                var previous = sourceVector[srci - 1];
+                if (previous.constructor === klass.SortDocumentSource) { //TODO: remove 'sort.' once match is implemented!!!
+                    /* swap this item with the previous */
+                    sourceVector[srci - 1] = source;
+                    sourceVector[srci] = previous;
+                }
+            }
+        }
+        
+		/*
+          Coalesce adjacent filters where possible.  Two adjacent filters
+          are equivalent to one filter whose predicate is the conjunction of
+          the two original filters' predicates.  For now, capture this by
+          giving any DocumentSource the option to absorb it's successor; this
+          will also allow adjacent projections to coalesce when possible.
+
+          Run through the DocumentSources, and give each one the opportunity
+          to coalesce with its successor.  If successful, remove the
+          successor.
+
+          Move all document sources to a temporary list.
+        */
+        var tempVector = sourceVector.slice(0);
+        sourceVector.length = 0;
+
+        /* move the first one to the final list */
+        sourceVector.push(tempVector[0]);
+
+        /* run through the sources, coalescing them or keeping them */
+        for(var tempn = tempVector.length, tempi = 1; tempi < tempn; ++tempi) {
+            /*
+              If we can't coalesce the source with the last, then move it
+              to the final list, and make it the new last.  (If we succeeded,
+              then we're still on the same last, and there's no need to move
+              or do anything with the source -- the destruction of tempVector
+              will take care of the rest.)
+            */
+            var lastSource = sourceVector[sourceVector.length - 1];
+            var temp = tempVector[tempi];
+            if (!temp || !lastSource){
+				throw new Error("null document sources found");
+            }
+            if (!lastSource.coalesce(temp)){
+                sourceVector.push(temp);
+            }
+        }
+
+        /* optimize the elements in the pipeline */
+        for(var i = 0, l = sourceVector.length; i<l; i++) {
+			var iter = sourceVector[i];
+            if (!iter) {
+                throw new Error("Pipeline received empty document as argument");
+            }
+
+            iter.optimize();
+        }
+
+        return pipelineInstance;
+	};
+	/**
+	 * Run the pipeline
+	 * 
+	 * @param	{Object}	result	the results of running the pipeline will be stored on this object
+	 * @param	{CursorDocumentSource}	source	the primary document source of the data
+	**/
+	proto.run = function run(result, source){
+		
+        for(var i = 0, l = this.sourceVector.length; i<l; i++) {
+			var temp = this.sourceVector[i];
+            temp.setSource(source);
+            source = temp;
+        }
+        /* source is left pointing at the last source in the chain */
+
+        /*
+          Iterate through the resulting documents, and add them to the result.
+          We do this even if we're doing an explain, in order to capture
+          the document counts and other stats.  However, we don't capture
+          the result documents for explain.
+          */
+        // the array in which the aggregation results reside
+        // cant use subArrayStart() due to error handling
+        var resultArray = [];
+        for(var hasDoc = !source.eof(); hasDoc; hasDoc = source.advance()) {
+            var document = source.getCurrent();
+
+            /* add the document to the result set */
+            resultArray.push(document);
+            
+            //Commenting out this assertion for munge.  MUHAHAHA!!!
+            
+            // object will be too large, assert. the extra 1KB is for headers
+//            uassert(16389,
+//                    str::stream() << "aggregation result exceeds maximum document size ("
+//                                  << BSONObjMaxUserSize / (1024 * 1024) << "MB)",
+//                    resultArray.len() < BSONObjMaxUserSize - 1024);
+        }
+
+        result.result = resultArray;
+        
+		return true;
+	};
+	
+	return klass;
+})();

+ 61 - 0
lib/pipeline/PipelineD.js

@@ -0,0 +1,61 @@
+var PipelineD = module.exports = (function(){
+	// CONSTRUCTOR
+	var klass = function PipelineD(){
+		if(this.constructor == PipelineD) throw new Error("Never create instances of this! Use the static helpers only.");
+	}, base = Object, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
+
+	var DocumentSource = require('./documentSources/DocumentSource'),
+		CursorDocumentSource = require('./documentSources/CursorDocumentSource'),
+		Cursor = require('../Cursor');
+		
+	/**
+	 * Create a Cursor wrapped in a DocumentSourceCursor, which is suitable
+	 * to be the first source for a pipeline to begin with.  This source
+	 * will feed the execution of the pipeline.
+	 * 
+	 * <<Note: Below does not happen in munge>>
+	 * This method looks for early pipeline stages that can be folded into
+	 * the underlying cursor, and when a cursor can absorb those, they
+	 * are removed from the head of the pipeline.  For example, an
+	 * early match can be removed and replaced with a Cursor that will
+	 * do an index scan.
+	 * 
+	 * @param {Pipeline}	pPipeline the logical "this" for this operation
+	 * @param {Array}	db the data we are going to be munging
+	 * @returns {CursorDocumentSource} the cursor that was created
+	**/
+	klass.prepareCursorSource = function prepareCursorSource(pPipeline, db){
+	
+        var sources = pPipeline.sourceVector;
+
+		//note that this is a deviation from the mongo implementation to facilitate pipeline reuse
+		sources.forEach(function(source){
+			source.reset();
+		});
+
+		//TODO: should this go earlier in the execution so that we dont need to do it every time?
+        var projection = {};
+        var deps = {};
+        var status = DocumentSource.GetDepsReturn.SEE_NEXT;
+        for (var i=0; i < sources.length && status == DocumentSource.GetDepsReturn.SEE_NEXT; i++) {
+            status = sources[i].getDependencies(deps);
+        }
+        if (status == DocumentSource.GetDepsReturn.EXHAUSTIVE) {
+            projection = DocumentSource.depsToProjection(deps);
+        }
+
+        var cursorWithContext = new CursorDocumentSource.CursorWithContext( );
+
+        cursorWithContext._cursor = new Cursor( db );
+
+        /* wrap the cursor with a DocumentSource and return that */
+        var source = new CursorDocumentSource( cursorWithContext );
+        
+        if (projection && Object.keys(projection).length)
+            source.setProjection(projection);
+
+        return source;
+	};
+
+	return klass;
+})();

+ 251 - 0
lib/pipeline/documentSources/CursorDocumentSource.js

@@ -0,0 +1,251 @@
+var CursorDocumentSource = module.exports = (function(){
+	// CONSTRUCTOR
+    /**
+     * Constructs and returns Documents from the objects produced by a supplied Cursor.
+     * An object of this type may only be used by one thread, see SERVER-6123.
+	 * 
+     * This is usually put at the beginning of a chain of document sources
+     * in order to fetch data from the database.
+     * 
+     * @param	{CursorDocumentSource.CursorWithContext}	cursorWithContext the cursor to use to fetch data
+    **/
+	var klass = module.exports = CursorDocumentSource = function CursorDocumentSource(cursorWithContext/*, pExpCtx*/){
+		base.call(this/*, pExpCtx*/);
+		
+		this.current = null;
+		
+//		this.ns = null;
+//		/*
+//		The bson dependencies must outlive the Cursor wrapped by this
+//		source.  Therefore, bson dependencies must appear before pCursor
+//		in order cause its destructor to be called *after* pCursor's.
+//		*/
+//		this.pQuery = null;
+//		this.pSort = null;
+		
+		this._projection = null;
+		
+		this._cursorWithContext = cursorWithContext;
+		if (!this._cursorWithContext || !this._cursorWithContext._cursor){
+			throw new Error("CursorDocumentSource requires a valid cursor");
+		}
+		
+	}, base = require('./DocumentSource'), proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
+	
+	
+	// DEPENDENCIES
+	//var Document = require("../Document");
+	
+	klass.CursorWithContext = (function (){
+		/**
+         * Holds a Cursor and all associated state required to access the cursor.  An object of this
+         * type may only be used by one thread.
+        **/
+		var klass = function CursorWithContext(){
+			this._cursor = null;
+		};
+		
+		return klass;
+	})();
+
+	/**
+     * Release the Cursor and the read lock it requires, but without changing the other data.
+     * Releasing the lock is required for proper concurrency, see SERVER-6123.  This
+     * functionality is also used by the explain version of pipeline execution.
+	 * 
+	 * @method	dispose
+	**/
+	proto.dispose = function dispose() {
+        this._cursorWithContext = null;
+	};
+		
+//	/**
+//     * Record the namespace.  Required for explain.
+//	 * 
+//	 * @method	setNamespace
+//	 * @param	{String}	ns	the namespace
+//	**/
+//	proto.setNamespace = function setNamespace(ns) {}
+//
+//	/**
+//	 * Record the query that was specified for the cursor this wraps, if any.
+//	 * This should be captured after any optimizations are applied to
+//	 * the pipeline so that it reflects what is really used.
+//	 * This gets used for explain output.
+//	 * 
+//	 * @method	setQuery
+//	 * @param	{Object}	pBsonObj	the query to record
+//	**/
+//	proto.setQuery = function setQuery(pBsonObj) {};
+//
+//
+//	/**
+//	 * Record the sort that was specified for the cursor this wraps, if any.
+//	 * This should be captured after any optimizations are applied to
+//	 * the pipeline so that it reflects what is really used.
+//	 * This gets used for explain output.
+//	 * 
+//	 * @method	setSort
+//	 * @param	{Object}	pBsonObj	the query to record
+//	**/
+//	proto.setSort = function setSort(pBsonObj) {};
+	
+	/**
+	 * setProjection method
+	 * 
+	 * @method	setProjection
+	 * @param	{Object}	projection
+	**/
+	proto.setProjection = function setProjection(projection) {
+		
+		if (this._projection){
+			throw new Error("projection is already set");
+		}
+		
+		
+		//dont think we need this yet
+		
+//		this._projection = new Projection();
+//		this._projection.init(projection);
+//		
+//        this.cursor().fields = this._projection;
+
+		this._projection = projection;  //just for testing
+	};
+	
+	//----------------virtuals from DocumentSource--------------
+    /**
+     * Is the source at EOF?
+     * 
+     * @method	eof
+    **/
+    proto.eof = function eof() {
+        /* if we haven't gotten the first one yet, do so now */
+        if (!this.current){
+            this.findNext();
+        }
+
+        return (this.current === null);
+    };
+    /**
+     * Advance the state of the DocumentSource so that it will return the next Document.  
+     * The default implementation returns false, after checking for interrupts. 
+     * Derived classes can call the default implementation in their own implementations in order to check for interrupts.
+     * 
+     * @method	advance
+     * @returns	{Boolean}	whether there is another document to fetch, i.e., whether or not getCurrent() will succeed.  This default implementation always returns false.
+    **/
+    proto.advance = function advance() {
+        base.prototype.advance.call(this); // check for interrupts
+        
+        /* if we haven't gotten the first one yet, do so now */
+        if (!this.current){
+            this.findNext();
+        }
+
+        this.findNext();
+        return (this.current !== null);
+    };
+    /**
+     * some implementations do the equivalent of verify(!eof()) so check eof() first
+     * 
+     * @method	getCurrent
+     * @returns	{Document}	the current Document without advancing
+    **/
+    proto.getCurrent = function getCurrent() {
+        /* if we haven't gotten the first one yet, do so now */
+        if (!this.current){
+            this.findNext();
+        }
+
+        return this.current;
+    };
+    /**
+     * Set the underlying source this source should use to get Documents
+     * from.
+     * It is an error to set the source more than once.  This is to
+     * prevent changing sources once the original source has been started;
+     * this could break the state maintained by the DocumentSource.
+     * This pointer is not reference counted because that has led to
+     * some circular references.  As a result, this doesn't keep
+     * sources alive, and is only intended to be used temporarily for
+     * the lifetime of a Pipeline::run().
+     * 
+     * @method	setSource
+     * @param	{DocumentSource}	pSource	the underlying source to use
+    **/
+    proto.setSource = function setSource(pTheSource) {
+		throw new Error("CursorDocumentSource doesn't take a source");
+    };
+    /**
+     * Create an object that represents the document source.  The object
+     * will have a single field whose name is the source's name.  This
+     * will be used by the default implementation of addToBsonArray()
+     * to add this object to a pipeline being represented in BSON.
+     * 
+     * @method	sourceToJson
+     * @param	{Object} pBuilder	BSONObjBuilder: a blank object builder to write to
+     * @param	{Boolean}	explain	create explain output
+    **/
+    proto.sourceToJson = function sourceToJson(pBuilder, explain) {
+
+        /* this has no analog in the BSON world, so only allow it for explain */
+        if (explain)
+        {
+			//we are not currently supporting explain in munge
+        }
+    };
+
+	//----------------private--------------
+	
+	proto.findNext = function findNext(){
+
+        if ( !this._cursorWithContext ) {
+            this.current = null;
+            return;
+        }
+
+        for( ; this.cursor().ok(); this.cursor().advance() ) {
+
+            //yieldSometimes();
+//            if ( !this.cursor().ok() ) {
+//                // The cursor was exhausted during the yield.
+//                break;
+//            }
+
+//            if ( !this.cursor().currentMatches() || this.cursor().currentIsDup() )
+//                continue;
+
+			
+            // grab the matching document
+            var documentObj;
+//          if (this.canUseCoveredIndex()) { ...  Dont need any of this, I think
+
+			documentObj = this.cursor().current();
+			this.current = documentObj;
+            this.cursor().advance();
+            return;
+        }
+
+        // If we got here, there aren't any more documents.
+        // The CursorWithContext (and its read lock) must be released, see SERVER-6123.
+        this.dispose();
+        this.current = null;
+	};
+
+	proto.cursor = function cursor(){
+        if( this._cursorWithContext && this._cursorWithContext._cursor){
+			return this._cursorWithContext._cursor;
+        }
+        throw new Error("cursor not defined");
+	};
+	
+//	proto.chunkMgr = function chunkMgr(){};
+
+//	proto.canUseCoveredIndex = function canUseCoveredIndex(){};
+	
+//	proto.yieldSometimes = function yieldSometimes(){};
+	
+	
+	return klass;
+})();

+ 92 - 0
test/lib/Cursor.js

@@ -0,0 +1,92 @@
+var assert = require("assert"),
+	Cursor = require("../../lib/Cursor");
+
+module.exports = {
+
+	"Cursor": {
+
+		"constructor(data)": {
+			"should throw an exception if it does not get a valid array or stream": function(){
+				assert.throws(function(){
+					var c = new Cursor();
+				});
+				assert.throws(function(){
+					var c = new Cursor(5);
+				});
+			}
+		},
+
+		"#ok": {
+			"should return true if there is still data in the array": function(){
+				var c = new Cursor([1,2,3,4,5]);
+				assert.equal(c.ok(), true);
+			},
+			"should return false if there is no data left in the array": function(){
+				var c = new Cursor([]);
+				assert.equal(c.ok(), false);
+			},
+			"should return true if there is no data left in the array, but there is still a current value": function(){
+				var c = new Cursor([1,2]);
+				c.advance();
+				c.advance();
+				assert.equal(c.ok(), true);
+				c.advance();
+				assert.equal(c.ok(), false);
+			}
+//			,
+//			"should return true if there is still data in the stream": function(){
+//				
+//			},
+//			"should return false if there is no data left in the stream": function(){
+//				
+//			}
+
+		},
+		
+		"#advance": {
+			"should return true if there is still data in the array": function(){
+				var c = new Cursor([1,2,3,4,5]);
+				assert.equal(c.advance(), true);
+			},
+			"should return false if there is no data left in the array": function(){
+				var c = new Cursor([1]);
+				c.advance();
+				assert.equal(c.advance(), false);
+			},
+			"should update the current object to the next item in the array": function(){
+				var c = new Cursor([1,"2"]);
+				c.advance();
+				assert.strictEqual(c.current(), 1);
+				c.advance();
+				assert.strictEqual(c.current(), "2");
+				c.advance();
+				assert.strictEqual(c.current(), undefined);
+			}
+//,			"should return true if there is still data in the stream": function(){
+//				
+//			},
+//			"should return false if there is no data left in the stream": function(){
+//				
+//			},
+//			"should update the current object to the next item in the stream": function(){
+//				
+//			}
+		},
+		
+		"#current": {
+			"should return the first value if the cursor has not been advanced yet": function(){
+				var c = new Cursor([1,2,3,4,5]);
+				assert.equal(c.current(), 1);
+			},
+			"should return the first value if the cursor has been advanced once": function(){
+				var c = new Cursor([1,2,3,4,5]);
+				c.advance();
+				assert.equal(c.current(), 1);
+			}
+		}
+
+	}
+
+};
+
+if (!module.parent)(new(require("mocha"))()).ui("exports").reporter("spec").addFile(__filename).run();

+ 125 - 0
test/lib/pipeline/Pipeline.js

@@ -0,0 +1,125 @@
+var assert = require("assert"),
+	Pipeline = require("../../../lib/pipeline/Pipeline");
+
+
+
+module.exports = {
+
+	"Pipeline": {
+		before: function(){
+			Pipeline.StageDesc.$test = (function(){
+			var klass = function TestDocumentSource(options){
+				base.call(this);
+				
+				this.shouldCoalesce = options.coalesce;
+				this.coalesceWasCalled = false;
+				this.optimizeWasCalled = false;
+				
+				this.current = 5;
+				
+			}, base = require('../../../lib/pipeline/documentSources/DocumentSource'), proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
+			
+			
+			proto.coalesce = function(){
+				this.coalesceWasCalled = true;
+				
+				var c = this.shouldCoalesce;//only coalesce with the first thing we find
+				this.shouldCoalesce = false;
+				return c;
+			};
+			proto.optimize = function(){
+				this.optimizeWasCalled = true;
+			};
+			
+			proto.eof = function(){
+				return this.current < 0;
+			};
+			proto.advance = function(){
+				this.current = this.current - 1;
+				return !this.eof();
+			};
+			proto.getCurrent = function(){
+				return this.current;
+			};
+			
+			return klass;
+		})();
+		
+		//TODO:remove this once Match is implemented!!!
+		Pipeline.MatchDocumentSource = (function(){
+			var klass = function MatchDocumentSource(){
+				
+			}, base = require('../../../lib/pipeline/documentSources/DocumentSource'), proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
+			klass.matchName = "$match";
+			return klass;
+		})();
+		Pipeline.StageDesc.$match = Pipeline.MatchDocumentSource;
+		
+		//TODO:remove this once Sort is implemented!!!
+		Pipeline.SortDocumentSource = (function(){
+			var klass = function SortDocumentSource(){
+				
+			}, base = require('../../../lib/pipeline/documentSources/DocumentSource'), proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
+			klass.sortName = "$sort";
+			return klass;
+		})();
+		Pipeline.StageDesc.$sort = Pipeline.SortDocumentSource;
+			
+		},
+		"parseCommand": {
+			"should fail if given non-objects in the array":function(){
+				assert.throws(function(){
+					Pipeline.parseCommand([5]);
+				});
+			},
+			"should fail if given objects with more/less than one field":function(){
+				assert.throws(function(){
+					Pipeline.parseCommand([{}]);
+				});
+			},
+			"should fail if given objects that dont match any known document sources":function(){
+				assert.throws(function(){
+					Pipeline.parseCommand([{$foo:"$sdfdf"}]);
+				});
+			},
+			"should swap $match and $sort if the $match immediately follows the $sort":function(){
+				var p = Pipeline.parseCommand([{$sort:{"xyz":1}}, {$match:{}}]);
+				assert.equal(p.sourceVector[0].constructor.matchName, "$match");
+				assert.equal(p.sourceVector[1].constructor.sortName, "$sort");
+			},
+			"should attempt to coalesce all sources":function(){
+				var p = Pipeline.parseCommand([{$test:{coalesce:false}}, {$test:{coalesce:true}}, {$test:{coalesce:false}}, {$test:{coalesce:false}}]);
+				assert.equal(p.sourceVector.length, 3);
+				p.sourceVector.slice(0,-1).forEach(function(source){
+					assert.equal(source.coalesceWasCalled, true);
+				});
+				assert.equal(p.sourceVector[p.sourceVector.length -1].coalesceWasCalled, false);
+			},
+			"should optimize all sources":function(){
+				var p = Pipeline.parseCommand([{$test:{coalesce:false}}, {$test:{coalesce:false}}]);
+				p.sourceVector.forEach(function(source){
+					assert.equal(source.optimizeWasCalled, true);
+				});
+			}
+		},
+
+		"#run": {
+			"should set the parent source for all sources in the pipeline except the first one":function(){
+				var p = Pipeline.parseCommand([{$test:{coalesce:false}}, {$test:{coalesce:false}}, {$test:{coalesce:false}}]);
+				p.run({}, []);
+				assert.equal(p.sourceVector[1].pSource, p.sourceVector[0]);
+				assert.equal(p.sourceVector[2].pSource, p.sourceVector[1]);
+			},
+			"should iterate through sources and return resultant array":function(){
+				var p = Pipeline.parseCommand([{$test:{coalesce:false}}, {$test:{coalesce:false}}, {$test:{coalesce:false}}]),
+					result = {};
+				p.run(result, []);
+				assert.deepEqual(result.result, [5,4,3,2,1,0]);//see the test source for why this should be so
+			}
+		}
+
+	}
+
+};
+
+if (!module.parent)(new(require("mocha"))()).ui("exports").reporter("spec").addFile(__filename).run();

+ 112 - 0
test/lib/pipeline/PipelineD.js

@@ -0,0 +1,112 @@
+var assert = require("assert"),
+	Pipeline = require("../../../lib/pipeline/Pipeline"),
+	PipelineD = require("../../../lib/pipeline/PipelineD"),
+	DocumentSource = require('../../../lib/pipeline/documentSources/DocumentSource'),
+	CursorDocumentSource = require('../../../lib/pipeline/documentSources/CursorDocumentSource');
+
+
+
+module.exports = {
+
+	"Pipeline": {
+		before: function(){
+			Pipeline.StageDesc.$test = (function(){
+			var klass = function TestDocumentSource(options){
+				base.call(this);
+				
+				this.shouldCoalesce = options.coalesce;
+				this.coalesceWasCalled = false;
+				this.optimizeWasCalled = false;
+				this.resetWasCalled = false;
+				
+				this.current = 5;
+				
+			}, base = DocumentSource, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
+			
+			
+			proto.coalesce = function(){
+				this.coalesceWasCalled = true;
+				
+				var c = this.shouldCoalesce;//only coalesce with the first thing we find
+				this.shouldCoalesce = false;
+				return c;
+			};
+			proto.optimize = function(){
+				this.optimizeWasCalled = true;
+			};
+			proto.eof = function(){
+				return this.current < 0;
+			};
+			proto.advance = function(){
+				this.current = this.current - 1;
+				return !this.eof();
+			};
+			proto.getCurrent = function(){
+				return this.current;
+			};
+			
+			proto.reset = function(){	
+				this.resetWasCalled = true; 
+			};
+			proto.getDependencies = function(deps){
+				if (!deps.testDep){
+					deps.testDep = 1;
+					return DocumentSource.GetDepsReturn.SEE_NEXT;
+				}
+				return DocumentSource.GetDepsReturn.EXHAUSTIVE;
+			};
+			
+			return klass;
+		})();
+		
+		//TODO:remove this once Match is implemented!!!
+		Pipeline.MatchDocumentSource = (function(){
+			var klass = function MatchDocumentSource(){
+				
+			}, base = require('../../../lib/pipeline/documentSources/DocumentSource'), proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
+			klass.matchName = "$match";
+			return klass;
+		})();
+		Pipeline.StageDesc.$match = Pipeline.MatchDocumentSource;
+		
+		//TODO:remove this once Sort is implemented!!!
+		Pipeline.SortDocumentSource = (function(){
+			var klass = function SortDocumentSource(){
+				
+			}, base = require('../../../lib/pipeline/documentSources/DocumentSource'), proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
+			klass.sortName = "$sort";
+			return klass;
+		})();
+		Pipeline.StageDesc.$sort = Pipeline.SortDocumentSource;
+			
+		},
+		"prepareCursorSource": {
+			"should call reset on all sources":function(){
+				var p = Pipeline.parseCommand([{$test:{coalesce:false}}, {$test:{coalesce:false}}]);
+				
+				PipelineD.prepareCursorSource(p, [1,2,3,4,5]);
+				
+				p.sourceVector.forEach(function(source){
+					assert.equal(source.resetWasCalled, true);
+				});
+			},
+			
+			"should return a CursorDocumentSource":function(){
+				var p = Pipeline.parseCommand([{$test:{coalesce:false}}, {$test:{coalesce:false}}]),
+					cs = PipelineD.prepareCursorSource(p, [1,2,3,4,5]);
+				
+				assert.equal(cs.constructor, CursorDocumentSource);
+			},
+			
+			"should get projection from all sources":function(){
+				var p = Pipeline.parseCommand([{$test:{coalesce:false}}, {$test:{coalesce:false}}]),
+					cs = PipelineD.prepareCursorSource(p, [1,2,3,4,5]);
+				
+				assert.deepEqual(cs._projection, {"_id":0,"testDep":1});
+			}
+		}
+	}
+
+};
+
+if (!module.parent)(new(require("mocha"))()).ui("exports").reporter("spec").addFile(__filename).run();

+ 98 - 0
test/lib/pipeline/documentSources/CursorDocumentSource.js

@@ -0,0 +1,98 @@
+var assert = require("assert"),
+	CursorDocumentSource = require("../../../../lib/pipeline/documentsources/CursorDocumentSource"),
+	Cursor = require("../../../../lib/Cursor");
+
+module.exports = {
+
+	"CursorDocumentSource": {
+
+		"constructor(data)": {
+			"should fail if CursorWithContext is not provided": function(){
+				assert.throws(function(){
+					var cds = new CursorDocumentSource();
+				});
+			},
+			"should get a accept a CursorWithContext and set it internally": function(){
+				var cwc = new CursorDocumentSource.CursorWithContext();
+				cwc._cursor = new Cursor( [] );
+				
+				var cds = new CursorDocumentSource(cwc);
+				
+				assert.ok(cds._cursorWithContext);
+			}
+		},
+
+		"#eof": {
+			"should return true if the cursor is empty": function(){
+				var cwc = new CursorDocumentSource.CursorWithContext();
+				cwc._cursor = new Cursor( [] );
+				
+				var cds = new CursorDocumentSource(cwc);
+				
+				assert.equal(cds.eof(), true);
+			},
+			"should return false if the cursor is non-empty": function(){
+				var cwc = new CursorDocumentSource.CursorWithContext();
+				cwc._cursor = new Cursor( [1,2,3] );
+				
+				var cds = new CursorDocumentSource(cwc);
+				
+				assert.equal(cds.eof(), false);
+			}
+		},
+		"#advance": {
+			"should return true if the cursor was advanced": function(){
+				var cwc = new CursorDocumentSource.CursorWithContext();
+				cwc._cursor = new Cursor( [1,2,3] );
+				
+				var cds = new CursorDocumentSource(cwc);
+				
+				assert.equal(cds.advance(), true);
+			},
+			"should return false if the cursor is empty": function(){
+				var cwc = new CursorDocumentSource.CursorWithContext();
+				cwc._cursor = new Cursor( [1,2,3] );
+				
+				var cds = new CursorDocumentSource(cwc);
+				cds.advance();cds.advance();cds.advance();
+				assert.equal(cds.advance(), false);
+			}
+		},
+		"#getCurrent": {
+			"should return the current cursor value": function(){
+				var cwc = new CursorDocumentSource.CursorWithContext();
+				cwc._cursor = new Cursor( [1,2,3,4] );
+				
+				var cds = new CursorDocumentSource(cwc);
+				assert.equal(cds.getCurrent(), 1);
+				cds.advance();
+				assert.equal(cds.getCurrent(), 2);
+				cds.advance();
+				assert.equal(cds.getCurrent(), 3);
+				cds.advance();
+				assert.equal(cds.getCurrent(), 4);
+				cds.advance();
+				assert.equal(cds.getCurrent(), undefined);
+			}
+		},
+		"#dispose": {
+			"should empty the current cursor": function(){
+				var cwc = new CursorDocumentSource.CursorWithContext();
+				cwc._cursor = new Cursor( [1,2,3] );
+				
+				var cds = new CursorDocumentSource(cwc);
+				assert.equal(cds.getCurrent(), 1);
+				cds.advance();
+				assert.equal(cds.getCurrent(), 2);
+				
+				cds.dispose();
+				assert.equal(cds.advance(), false);
+				assert.equal(cds.eof(), true);
+			}
+		}
+
+	}
+
+};
+
+if (!module.parent)(new(require("mocha"))()).ui("exports").reporter("spec").addFile(__filename).run();