Selaa lähdekoodia

refs #5123: Updated port of Pipeline js

Jared Hall 11 vuotta sitten
vanhempi
commit
64aba4647a

+ 300 - 148
lib/pipeline/Pipeline.js

@@ -1,5 +1,4 @@
 "use strict";
-var async = require("async");
 
 /**
  * mongodb "commands" (sent via db.$cmd.findOne(...)) subclass to make a command.  define a singleton object for it.
@@ -10,8 +9,7 @@ var async = require("async");
  **/
 // CONSTRUCTOR
 var Pipeline = module.exports = function Pipeline(theCtx){
-	this.collectionName = null;
-	this.sourceVector = null;
+	this.sources = null;
 	this.explain = false;
 	this.splitMongodPipeline = false;
 	this.ctx = theCtx;
@@ -24,24 +22,156 @@ var DocumentSource = require("./documentSources/DocumentSource"),
 	SkipDocumentSource = require('./documentSources/SkipDocumentSource'),
 	UnwindDocumentSource = require('./documentSources/UnwindDocumentSource'),
 	GroupDocumentSource = require('./documentSources/GroupDocumentSource'),
+	OutDocumentSource = require('./documentSources/OutDocumentSource'),
+	GeoNearDocumentSource = require('./documentSources/GeoNearDocumentSource'),
+	RedactDocumentSource = require('./documentSources/RedactDocumentSource'),
 	SortDocumentSource = require('./documentSources/SortDocumentSource');
 
 klass.COMMAND_NAME = "aggregate";
 klass.PIPELINE_NAME = "pipeline";
 klass.EXPLAIN_NAME = "explain";
 klass.FROM_ROUTER_NAME = "fromRouter";
-klass.SPLIT_MONGOD_PIPELINE_NAME = "splitMongodPipeline";
 klass.SERVER_PIPELINE_NAME = "serverPipeline";
 klass.MONGOS_PIPELINE_NAME = "mongosPipeline";
 
 klass.stageDesc = {};//attaching this to the class for test cases
+klass.stageDesc[GeoNearDocumentSource.geoNearName] = GeoNearDocumentSource.createFromJson;
+klass.stageDesc[GroupDocumentSource.groupName] = GroupDocumentSource.createFromJson;
 klass.stageDesc[LimitDocumentSource.limitName] = LimitDocumentSource.createFromJson;
 klass.stageDesc[MatchDocumentSource.matchName] = MatchDocumentSource.createFromJson;
+klass.stageDesc[OutDocumentSource.outName] = OutDocumentSource.createFromJson;
 klass.stageDesc[ProjectDocumentSource.projectName] = ProjectDocumentSource.createFromJson;
+klass.stageDesc[RedactDocumentSource.redactName] = ProjectDocumentSource.createFromJson;
 klass.stageDesc[SkipDocumentSource.skipName] = SkipDocumentSource.createFromJson;
-klass.stageDesc[UnwindDocumentSource.unwindName] = UnwindDocumentSource.createFromJson;
-klass.stageDesc[GroupDocumentSource.groupName] = GroupDocumentSource.createFromJson;
 klass.stageDesc[SortDocumentSource.sortName] = SortDocumentSource.createFromJson;
+klass.stageDesc[UnwindDocumentSource.unwindName] = UnwindDocumentSource.createFromJson;
+klass.nStageDesc = Object.keys(klass.stageDesc).length;
+
+klass.optimizations = {};
+klass.optimizations.local = {};
+
+/**
+ * Moves $match before $sort when they are placed next to one another
+ * @static
+ * @method moveMatchBeforeSort
+ * @param pipelineInst An instance of a Pipeline
+ **/
+klass.optimizations.local.moveMatchBeforeSort = function moveMatchBeforeSort(pipelineInst) {
+	var sources = pipelineInst.sources;
+	for(var srcn = sources.length, srci = 1; srci < srcn; ++srci) {
+		var source = sources[srci];
+		if(source.constructor === MatchDocumentSource) {
+			var previous = sources[srci - 1];
+			if(previous && previous.constructor === SortDocumentSource) { //Added check that previous exists
+				/* swap this item with the previous */
+				sources[srci] = previous;
+				sources[srci-1] = source;
+			}
+		}
+	}
+};
+
+/**
+ * Moves $limit before $skip when they are placed next to one another
+ * @static
+ * @method moveLimitBeforeSkip
+ * @param pipelineInst An instance of a Pipeline
+ **/
+klass.optimizations.local.moveLimitBeforeSkip = function moveLimitBeforeSkip(pipelineInst) {
+	var sources = pipelineInst.sources;
+	if(sources.length === 0) return;
+	for(var i = sources.length - 1; i >= 1 /* not looking at 0 */; i--) {
+		var limit = sources[i].constructor === LimitDocumentSource ? sources[i] : undefined,
+			skip = sources[i-1].constructor === SkipDocumentSource ? sources[i-1] : undefined;
+		if(limit && skip) {
+			limit.setLimit(limit.getLimit + skip.getSkip());
+			sources[i-1] = limit;
+			sources[i] = skip;
+			
+			// Start at back again. This is needed to handle cases with more than 1 $limit
+			// (S means skip, L means limit)
+			//
+			// These two would work without second pass (assuming back to front ordering)
+			// SL   -> LS
+			// SSL  -> LSS
+			//
+			// The following cases need a second pass to handle the second limit
+			// SLL  -> LLS
+			// SSLL -> LLSS
+			// SLSL -> LLSS
+			i = sources.length; // decremented before next pass
+		}
+	}
+};
+
+/**
+ * Attempts to coalesce every pipeline stage into the previous pipeline stage, starting after the first
+ * @static
+ * @method coalesceAdjacent
+ * @param pipelineInst An instance of a Pipeline
+ **/
+klass.optimizations.local.coalesceAdjacent = function coalesceAdjacent(pipelineInst) {
+	var sources = pipelineInst.sources;
+	if(sources.length === 0) return;
+	
+	// move all sources to a temporary list
+	var moveSrc = sources.pop(),
+		tempSources = [];
+	while(moveSrc) {
+		tempSources.unshift(moveSrc);
+		moveSrc = sources.pop();
+	}	
+
+	// move the first one to the final list
+	sources.push(tempSources[0]);
+
+	// run through the sources, coalescing them or keeping them
+	for(var tempn = tempSources.length, tempi = 1; tempi < tempn; ++tempi) {
+		// If we can't coalesce the source with the last, then move it
+		// to the final list, and make it the new last.  (If we succeeded,
+		// then we're still on the same last, and there's no need to move
+		// or do anything with the source -- the destruction of tempSources
+		// will take care of the rest.)
+		var lastSource = sources[sources.length-1],
+			tempSrc = tempSources[tempi];
+		if(!(lastSource && tempSrc)) {
+			throw new Error('Must have a last and current source'); // verify(lastSource && tempSrc);
+		}
+		if(!lastSource.coalesce(tempSrc)) sources.push(tempSrc);
+	}
+};
+
+/**
+ * Iterates over sources in the pipelineInst, optimizing each
+ * @static
+ * @method optimizeEachDocumentSource
+ * @param pipelineInst An instance of a Pipeline
+ **/
+klass.optimizations.local.optimizeEachDocumentSource = function optimizeEachDocumentSource(pipelineInst) {
+	var sources = pipelineInst.sources;
+	for(var srci = 0, srcn = sources.length; srci < srcn; ++srci) {
+		sources[srci].optimize();	
+	}
+};
+
+/**
+ * Auto-places a $match before a $redact when the $redact is the first item in a pipeline
+ * @static
+ * @method duplicateMatchBeforeInitalRedact
+ * @param pipelineInst An instance of a Pipeline
+ **/
+klass.optimizations.local.duplicateMatchBeforeInitalRedact = function duplicateMatchBeforeInitalRedact(pipelineInst) {
+	var sources = pipelineInst.sources;
+	if(sources.length >= 2 && sources[0].constructor === RedactDocumentSource) {
+		if(sources[1].constructor === MatchDocumentSource) {
+			var match = sources[1],
+				redactSafePortion = match.redactSafePortion();	
+			if(Object.keys(redactSafePortion).length > 0) {
+				sources.shift(MatchDocumentSource.createFromJson(redactSafePortion, pipelineInst.ctx));
+			}
+		}
+	}
+};
 
 /**
  * Create an `Array` of `DocumentSource`s from the given JSON pipeline
@@ -52,7 +182,7 @@ klass.stageDesc[SortDocumentSource.sortName] = SortDocumentSource.createFromJson
  * @returns {Array}  The parsed `DocumentSource`s
  **/
 klass.parseDocumentSources = function parseDocumentSources(pipeline, ctx){
-	var sourceVector = [];
+	var sources = [];
 	for (var nSteps = pipeline.length, iStep = 0; iStep < nSteps; ++iStep) {
 		// pull out the pipeline element as an object
 		var pipeElement = pipeline[iStep];
@@ -70,11 +200,14 @@ klass.parseDocumentSources = function parseDocumentSources(pipeline, ctx){
 
 		// Parse the stage
 		var stage = desc(stageSpec, ctx);
-		if (!stage) throw new Error("Stage must not be undefined!");
-		stage.setPipelineStep(iStep);
-		sourceVector.push(stage);
+		if (!stage) throw new Error("Stage must not be undefined!"); // verify(stage)
+		sources.push(stage);
+
+		if(stage.constructor === OutDocumentSource && iStep !== nSteps - 1) {
+			throw new Error("$out can only be the final stage in the pipeline; code 16435");
+		}
 	}
-	return sourceVector;
+	return sources;
 };
 
 /**
@@ -99,12 +232,15 @@ klass.parseCommand = function parseCommand(cmdObj, ctx){
 	var pipeline;
 	for(var fieldName in cmdObj){
 		var cmdElement = cmdObj[fieldName];
-		if(fieldName == klass.COMMAND_NAME)						pipelineInst.collectionName = cmdElement;		//look for the aggregation command
+		if(fieldName[0] == "$")									continue;
+		else if(fieldName == "cursor")							continue;
+		else if(fieldName == klass.COMMAND_NAME)				continue;										//look for the aggregation command
 		else if(fieldName == klass.PIPELINE_NAME)				pipeline = cmdElement;							//check for the pipeline of JSON doc srcs
 		else if(fieldName == klass.EXPLAIN_NAME)				pipelineInst.explain = cmdElement;				//check for explain option
-		else if(fieldName == klass.FROM_ROUTER_NAME)			pipelineInst.fromRouter = cmdElement;			//if the request came from the router, we're in a shard
-		else if(fieldName == klass.SPLIT_MONGOD_PIPELINE_NAME)	pipelineInst.splitMongodPipeline = cmdElement;	//check for debug options
-		// NOTE: DEVIATION FROM MONGO: Not implementing: "Ignore $auth information sent along with the command. The authentication system will use it, it's not a part of the pipeline."
+		else if(fieldName == klass.FROM_ROUTER_NAME)			ctx.inShard = cmdElement;						//if the request came from the router, we're in a shard
+		else if(fieldName == "allowDiskUsage") {
+			if(typeof cmdElement !== 'boolean') throw new Error("allowDiskUsage must be a bool, not a " + typeof allowDiskUsage+ "; uassert code 16949");
+		}
 		else throw new Error("unrecognized field " + JSON.stringify(fieldName));
 	}
 
@@ -113,64 +249,13 @@ klass.parseCommand = function parseCommand(cmdObj, ctx){
 	 * Set up the specified document source pipeline.
 	 **/
 	// NOTE: DEVIATION FROM MONGO: split this into a separate function to simplify and better allow for extensions (now in parseDocumentSources)
-	var sourceVector = pipelineInst.sourceVector = Pipeline.parseDocumentSources(pipeline, ctx);
-
-	/* if there aren't any pipeline stages, there's nothing more to do */
-	if (!sourceVector.length) return pipelineInst;
-
-	/* Move filters up where possible.
-	CW TODO -- move filter past projections where possible, and noting corresponding field renaming.
-	*/
-
-	/*
-	Wherever there is a match immediately following a sort, swap them.
-	This means we sort fewer items.  Neither changes the documents in the stream, so this transformation shouldn't affect the result.
-	We do this first, because then when we coalesce operators below, any adjacent matches will be combined.
-	*/
-	for(var srcn = sourceVector.length, srci = 1; srci < srcn; ++srci) {
-		var source = sourceVector[srci];
-		if (source instanceof MatchDocumentSource) {
-			var previous = sourceVector[srci - 1];
-			if (previous instanceof SortDocumentSource) {
-				/* swap this item with the previous */
-				sourceVector[srci - 1] = source;
-				sourceVector[srci] = previous;
-			}
-		}
-	}
+	var sources = pipelineInst.sources = Pipeline.parseDocumentSources(pipeline, ctx);
 
-	/*
-	Coalesce adjacent filters where possible.  Two adjacent filters are equivalent to one filter whose predicate is the conjunction of the two original filters' predicates.
-	For now, capture this by giving any DocumentSource the option to absorb it's successor; this will also allow adjacent projections to coalesce when possible.
-	Run through the DocumentSources, and give each one the opportunity to coalesce with its successor.  If successful, remove the successor.
-	Move all document sources to a temporary list.
-	*/
-	var tempVector = sourceVector.slice(0);
-	sourceVector.length = 0;
-
-	// move the first one to the final list
-	sourceVector.push(tempVector[0]);
-
-	// run through the sources, coalescing them or keeping them
-	for(var tempn = tempVector.length, tempi = 1; tempi < tempn; ++tempi) {
-		/*
-		If we can't coalesce the source with the last, then move it to the final list, and make it the new last.
-		(If we succeeded, then we're still on the same last, and there's no need to move or do anything with the source -- the destruction of tempVector will take care of the rest.)
-		*/
-		var lastSource = sourceVector[sourceVector.length - 1],
-			temp = tempVector[tempi];
-		if (!temp || !lastSource) throw new Error("null document sources found");
-		if (!lastSource.coalesce(temp)){
-			sourceVector.push(temp);
-		}
-	}
-
-	// optimize the elements in the pipeline
-	for(var i = 0, l = sourceVector.length; i<l; i++) {
-		var iter = sourceVector[i];
-		if (!iter) throw new Error("Pipeline received empty document as argument");
-		iter.optimize();
-	}
+	klass.optimizations.local.moveMatchBeforeSort(pipelineInst);
+	klass.optimizations.local.moveLimitBeforeSkip(pipelineInst);
+	klass.optimizations.local.coalesceAdjacent(pipelineInst);
+	klass.optimizations.local.optimizeEachDocumentSource(pipelineInst);
+	klass.optimizations.local.duplicateMatchBeforeInitalRedact(pipelineInst);
 
 	return pipelineInst;
 };
@@ -186,86 +271,153 @@ function ifError(err) {
 }
 
 /**
- * Run the pipeline
+ * Gets the initial $match query when $match is the first pipeline stage
  * @method run
  * @param	inputSource		{DocumentSource}	The input document source for the pipeline
  * @param	[callback]		{Function}			Optional callback function if using async extensions
+ * @return {Object}	An empty object or the match spec
 **/
-proto.run = function run(inputSource, callback){
-	if (inputSource && !(inputSource instanceof DocumentSource)) throw new Error("arg `inputSource` must be an instance of DocumentSource");
-	if (!callback) callback = klass.SYNC_CALLBACK;
-	var self = this;
-	if (callback === klass.SYNC_CALLBACK) { // SYNCHRONOUS MODE
-		inputSource.setSource(undefined, ifError);	//TODO: HACK: temp solution to the fact that we need to initialize our source since we're using setSource as a workaround for the lack of real async cursors
-		var source = inputSource;
-		for(var i = 0, l = self.sourceVector.length; i < l; i++){
-			var temp = self.sourceVector[i];
-			temp.setSource(source, ifError);
-			source = temp;
-		}
-		/*
-		Iterate through the resulting documents, and add them to the result.
-		We do this even if we're doing an explain, in order to capture the document counts and other stats.
-		However, we don't capture the result documents for explain.
-		*/
-		var resultArray = [];
-		try{
-			for(var hasDoc = !source.eof(); hasDoc; hasDoc = source.advance()) {
-				var document = source.getCurrent();
-				resultArray.push(document);	// add the document to the result set
-				//Commenting out this assertion for munge.  MUHAHAHA!!!
-				// object will be too large, assert. the extra 1KB is for headers
-				//if(resultArray.len() < BSONObjMaxUserSize - 1024) throw new Error("aggregation result exceeds maximum document size (" + BSONObjMaxUserSize / (1024 * 1024) + "MB); code 16389");
+proto.getInitialQuery = function getInitialQuery() {
+	var sources = this.sources;
+	if(sources.length === 0) {
+		return {};
+	}
+
+	/* look for an initial $match */
+	var match = sources[0].constructor === MatchDocumentSource ? sources[0] : undefined;
+	if(!match) return {};
+
+	return match.getQuery();
+};
+
+/**
+ * Creates the JSON representation of the pipeline
+ * @method run
+ * @param	inputSource		{DocumentSource}	The input document source for the pipeline
+ * @param	[callback]		{Function}			Optional callback function if using async extensions
+ * @return {Object}	An empty object or the match spec
+**/
+proto.serialize = function serialize() {
+	var serialized = {},
+		array = [];
+
+	// create an array out of the pipeline operations
+	this.sources.forEach(function(source) {
+		source.serializeToArray(array);
+	});
+
+	serialized[klass.COMMAND_NAME] = this.ctx && this.ctx.ns && this.ctx.ns.coll ? this.ctx.ns.coll : '';
+	serialized[klass.PIPELINE_NAME] = array;
+
+	if(this.explain) serialized[klass.EXPLAIN_NAME] = this.explain;
+
+	return serialized;
+};
+
+/**
+ * Points each source at its previous source
+ * @method stitch
+**/
+proto.stitch = function stitch() {
+	if(this.sources.length <= 0) throw new Error("should not have an empty pipeline; massert code 16600");
+
+	/* chain together the sources we found */
+	var prevSource = this.sources[0];
+	for(var srci = 1, srcn = this.sources.length; srci < srcn; srci++) {
+		var tempSource = this.sources[srci];
+		tempSource.setSource(prevSource);
+		prevSource = tempSource;
+	}
+};
+
+/**
+ * Run the pipeline
+ * @method run
+ * @param callback {Function} Optional. Run the pipeline in async mode; callback(err, result)
+ * @return result {Object} The result of executing the pipeline
+**/
+proto.run = function run(callback) {
+	// should not get here in the explain case
+	if(this.explain) throw new Error("Should not be running a pipeline in explain mode!");
+
+	/* NOTE: DEVIATION FROM MONGO SOURCE. WE'RE SUPPORTING SYNC AND ASYNC */
+	if(callback) {
+		return this._runAsync(callback);
+	} else {
+		return this._runSync();
+	}
+
+};
+
+/**
+ * Get the last document source in the pipeline
+ * @method _getFinalSource
+ * @return {Object}		The DocumentSource at the end of the pipeline
+ * @private
+**/
+proto._getFinalSource = function _getFinalSource() {
+	return this.sources[this.sources.length - 1];
+};
+
+/**
+ * Run the pipeline synchronously
+ * @method _runSync
+ * @return {Object}		The results object {result:resultArray}
+ * @private
+**/
+proto._runSync = function _runSync() {
+	var resultArray = [],
+		finalSource = this._getFinalSource(),
+		next = finalSource.getNext();
+	while(next) {
+		// add the document to the result set
+		resultArray.push(next);
+		next = finalSource.getNext();
+	}
+	return {result:resultArray};
+};
+
+/**
+ * Run the pipeline asynchronously
+ * @method _runAsync
+ * @param callback {Function} callback(err, resultObject)
+ * @private
+**/
+proto._runAsync = function _runAsync(callback) {
+	var resultArray = [],
+		finalSource = this._getFinalSource(),
+		gotNext = function(err, doc) {
+			if(err) return callback(err);
+			if(doc) {
+				resultArray.push(doc);
+				return setImmediate(function() { //setImmediate to avoid callstack size issues
+					finalSource.getNext(gotNext);
+				});
+			} else {
+				return callback(null, {result:resultArray});	
 			}
-		} catch (err) {
-			return callback(err);
-		}
-		var result = {
-			result: resultArray
-//			,ok: true;	//not actually in here... where does this come from?
 		};
-		return callback(null, result);
-	} else {	// ASYNCHRONOUS MODE	//TODO: move this up to a higher level package?
-		return inputSource.setSource(undefined, function(err){	//TODO: HACK: temp solution to the fact that we need to initialize our source since we're using setSource as a workaround for the lack of real async cursors
-			if (err) return callback(err);
-			// chain together the sources we found
-			var source = inputSource;
-			async.eachSeries(
-				self.sourceVector,
-				function eachSrc(temp, next){
-					temp.setSource(source, function(err){
-						if (err) return next(err);
-						source = temp;
-						return next();
-					});
-				},
-				function doneSrcs(err){ //source is left pointing at the last source in the chain
-					if (err) return callback(err);
-					/*
-					Iterate through the resulting documents, and add them to the result.
-					We do this even if we're doing an explain, in order to capture the document counts and other stats.
-					However, we don't capture the result documents for explain.
-					*/
-					// the array in which the aggregation results reside
-					var resultArray = [];
-					try{
-						for(var hasDoc = !source.eof(); hasDoc; hasDoc = source.advance()) {
-							var document = source.getCurrent();
-							resultArray.push(document);	// add the document to the result set
-							//Commenting out this assertion for munge.  MUHAHAHA!!!
-							// object will be too large, assert. the extra 1KB is for headers
-							//if(resultArray.len() < BSONObjMaxUserSize - 1024) throw new Error("aggregation result exceeds maximum document size (" + BSONObjMaxUserSize / (1024 * 1024) + "MB); code 16389");
-						}
-					} catch (err) {
-						return callback(err);
-					}
-					var result = {
-						result: resultArray
-	//					,ok: true;	//not actually in here... where does this come from?
-					};
-					return callback(null, result);
-				}
-			);
-		});
-	}
+	finalSource.getNext(gotNext);	
+};
+
+/**
+ * Get the pipeline explanation
+ * @method writeExplainOps
+ * @return {Array}	An array of source explanations
+**/
+proto.writeExplainOps = function writeExplainOps() {
+	var array = [];
+	this.sources.forEach(function(source) {
+		source.serializeToArray(array, /*explain=*/true);
+	});
+	return array;
+};
+
+/**
+ * Set the source of documents for the pipeline
+ * @method addInitialSource
+ * @param source {DocumentSource}
+**/
+proto.addInitialSource = function addInitialSource(source) {
+	this.sources.unshift(source);
 };

+ 117 - 0
lib/pipeline/documentSources/GeoNearDocumentSource.js

@@ -0,0 +1,117 @@
+"use strict";
+
+/**
+ * A document source skipper
+ * @class GeoNearDocumentSource
+ * @namespace mungedb-aggregate.pipeline.documentSources
+ * @module mungedb-aggregate
+ * @constructor
+ * @param [ctx] {ExpressionContext}
+ **/
+var GeoNearDocumentSource = module.exports = function GeoNearDocumentSource(ctx){
+}, klass = GeoNearDocumentSource, base = require('./DocumentSource'), proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
+
+klass.geoNearName = "$geoNear";
+proto.getSourceName = function getSourceName(){
+	return klass.geoNearName;
+};
+
+/**
+ * Coalesce skips together
+ * @param {Object} nextSource the next source
+ * @return {bool} return whether we can coalese together
+ **/
+proto.coalesce = function coalesce(nextSource) {
+	var nextSkip =	nextSource.constructor === GeoNearDocumentSource?nextSource:null;
+
+	// if it's not another $skip, we can't coalesce
+	if (!nextSkip) return false;
+
+	// we need to skip over the sum of the two consecutive $skips
+	this.skip += nextSkip.skip;
+	return true;
+};
+
+proto.skipper = function skipper() {
+	if (this.count === 0) {
+		while (!this.source.eof() && this.count++ < this.skip) {
+			this.source.advance();
+		}
+	}
+
+	if (this.source.eof()) {
+		this.current = null;
+		return;
+	}
+
+	this.current = this.source.getCurrent();
+};
+
+
+/**
+ * Is the source at EOF?
+ * @method	eof
+ **/
+proto.eof = function eof() {
+	this.skipper();
+	return this.source.eof();
+};
+
+/**
+ * some implementations do the equivalent of verify(!eof()) so check eof() first
+ * @method	getCurrent
+ * @returns	{Document}	the current Document without advancing
+ **/
+proto.getCurrent = function getCurrent() {
+	this.skipper();
+	return this.source.getCurrent();
+};
+
+/**
+ * Advance the state of the DocumentSource so that it will return the next Document.
+ * The default implementation returns false, after checking for interrupts.
+ * Derived classes can call the default implementation in their own implementations in order to check for interrupts.
+ *
+ * @method	advance
+ * @returns	{Boolean}	whether there is another document to fetch, i.e., whether or not getCurrent() will succeed.  This default implementation always returns false.
+ **/
+proto.advance = function advance() {
+	base.prototype.advance.call(this); // check for interrupts
+	if (this.eof()) {
+		this.current = null;
+		return false;
+	}
+
+	this.current = this.source.getCurrent();
+	return this.source.advance();
+};
+
+/**
+ * Create an object that represents the document source.  The object
+ * will have a single field whose name is the source's name.  This
+ * will be used by the default implementation of addToJsonArray()
+ * to add this object to a pipeline being represented in JSON.
+ *
+ * @method	sourceToJson
+ * @param	{Object} builder	JSONObjBuilder: a blank object builder to write to
+ * @param	{Boolean}	explain	create explain output
+ **/
+proto.sourceToJson = function sourceToJson(builder, explain) {
+	builder.$skip = this.skip;
+};
+
+/**
+ * Creates a new GeoNearDocumentSource with the input number as the skip
+ *
+ * @param {Number} JsonElement this thing is *called* Json, but it expects a number
+ **/
+klass.createFromJson = function createFromJson(jsonElement, ctx) {
+	if (typeof jsonElement !== "number") throw new Error("code 15972; the value to skip must be a number");
+
+	var nextSkip = new GeoNearDocumentSource(ctx);
+
+	nextSkip.skip = jsonElement;
+	if (nextSkip.skip < 0 || isNaN(nextSkip.skip)) throw new Error("code 15956; the number to skip cannot be negative");
+
+	return nextSkip;
+};

+ 117 - 0
lib/pipeline/documentSources/OutDocumentSource.js

@@ -0,0 +1,117 @@
+"use strict";
+
+/**
+ * A document source skipper
+ * @class OutDocumentSource
+ * @namespace mungedb-aggregate.pipeline.documentSources
+ * @module mungedb-aggregate
+ * @constructor
+ * @param [ctx] {ExpressionContext}
+ **/
+var OutDocumentSource = module.exports = function OutDocumentSource(ctx){
+}, klass = OutDocumentSource, base = require('./DocumentSource'), proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
+
+klass.outName = "$out";
+proto.getSourceName = function getSourceName(){
+	return klass.outName;
+};
+
+/**
+ * Coalesce skips together
+ * @param {Object} nextSource the next source
+ * @return {bool} return whether we can coalese together
+ **/
+proto.coalesce = function coalesce(nextSource) {
+	var nextSkip =	nextSource.constructor === OutDocumentSource?nextSource:null;
+
+	// if it's not another $skip, we can't coalesce
+	if (!nextSkip) return false;
+
+	// we need to skip over the sum of the two consecutive $skips
+	this.skip += nextSkip.skip;
+	return true;
+};
+
+proto.skipper = function skipper() {
+	if (this.count === 0) {
+		while (!this.source.eof() && this.count++ < this.skip) {
+			this.source.advance();
+		}
+	}
+
+	if (this.source.eof()) {
+		this.current = null;
+		return;
+	}
+
+	this.current = this.source.getCurrent();
+};
+
+
+/**
+ * Is the source at EOF?
+ * @method	eof
+ **/
+proto.eof = function eof() {
+	this.skipper();
+	return this.source.eof();
+};
+
+/**
+ * some implementations do the equivalent of verify(!eof()) so check eof() first
+ * @method	getCurrent
+ * @returns	{Document}	the current Document without advancing
+ **/
+proto.getCurrent = function getCurrent() {
+	this.skipper();
+	return this.source.getCurrent();
+};
+
+/**
+ * Advance the state of the DocumentSource so that it will return the next Document.
+ * The default implementation returns false, after checking for interrupts.
+ * Derived classes can call the default implementation in their own implementations in order to check for interrupts.
+ *
+ * @method	advance
+ * @returns	{Boolean}	whether there is another document to fetch, i.e., whether or not getCurrent() will succeed.  This default implementation always returns false.
+ **/
+proto.advance = function advance() {
+	base.prototype.advance.call(this); // check for interrupts
+	if (this.eof()) {
+		this.current = null;
+		return false;
+	}
+
+	this.current = this.source.getCurrent();
+	return this.source.advance();
+};
+
+/**
+ * Create an object that represents the document source.  The object
+ * will have a single field whose name is the source's name.  This
+ * will be used by the default implementation of addToJsonArray()
+ * to add this object to a pipeline being represented in JSON.
+ *
+ * @method	sourceToJson
+ * @param	{Object} builder	JSONObjBuilder: a blank object builder to write to
+ * @param	{Boolean}	explain	create explain output
+ **/
+proto.sourceToJson = function sourceToJson(builder, explain) {
+	builder.$skip = this.skip;
+};
+
+/**
+ * Creates a new OutDocumentSource with the input number as the skip
+ *
+ * @param {Number} JsonElement this thing is *called* Json, but it expects a number
+ **/
+klass.createFromJson = function createFromJson(jsonElement, ctx) {
+	if (typeof jsonElement !== "number") throw new Error("code 15972; the value to skip must be a number");
+
+	var nextSkip = new OutDocumentSource(ctx);
+
+	nextSkip.skip = jsonElement;
+	if (nextSkip.skip < 0 || isNaN(nextSkip.skip)) throw new Error("code 15956; the number to skip cannot be negative");
+
+	return nextSkip;
+};

+ 117 - 0
lib/pipeline/documentSources/RedactDocumentSource.js

@@ -0,0 +1,117 @@
+"use strict";
+
+/**
+ * A document source skipper
+ * @class RedactDocumentSource
+ * @namespace mungedb-aggregate.pipeline.documentSources
+ * @module mungedb-aggregate
+ * @constructor
+ * @param [ctx] {ExpressionContext}
+ **/
+var RedactDocumentSource = module.exports = function RedactDocumentSource(ctx){
+}, klass = RedactDocumentSource, base = require('./DocumentSource'), proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
+
+klass.redactName = "$redact";
+proto.getSourceName = function getSourceName(){
+	return klass.redactName;
+};
+
+/**
+ * Coalesce skips together
+ * @param {Object} nextSource the next source
+ * @return {bool} return whether we can coalese together
+ **/
+proto.coalesce = function coalesce(nextSource) {
+	var nextSkip =	nextSource.constructor === RedactDocumentSource?nextSource:null;
+
+	// if it's not another $skip, we can't coalesce
+	if (!nextSkip) return false;
+
+	// we need to skip over the sum of the two consecutive $skips
+	this.skip += nextSkip.skip;
+	return true;
+};
+
+proto.skipper = function skipper() {
+	if (this.count === 0) {
+		while (!this.source.eof() && this.count++ < this.skip) {
+			this.source.advance();
+		}
+	}
+
+	if (this.source.eof()) {
+		this.current = null;
+		return;
+	}
+
+	this.current = this.source.getCurrent();
+};
+
+
+/**
+ * Is the source at EOF?
+ * @method	eof
+ **/
+proto.eof = function eof() {
+	this.skipper();
+	return this.source.eof();
+};
+
+/**
+ * some implementations do the equivalent of verify(!eof()) so check eof() first
+ * @method	getCurrent
+ * @returns	{Document}	the current Document without advancing
+ **/
+proto.getCurrent = function getCurrent() {
+	this.skipper();
+	return this.source.getCurrent();
+};
+
+/**
+ * Advance the state of the DocumentSource so that it will return the next Document.
+ * The default implementation returns false, after checking for interrupts.
+ * Derived classes can call the default implementation in their own implementations in order to check for interrupts.
+ *
+ * @method	advance
+ * @returns	{Boolean}	whether there is another document to fetch, i.e., whether or not getCurrent() will succeed.  This default implementation always returns false.
+ **/
+proto.advance = function advance() {
+	base.prototype.advance.call(this); // check for interrupts
+	if (this.eof()) {
+		this.current = null;
+		return false;
+	}
+
+	this.current = this.source.getCurrent();
+	return this.source.advance();
+};
+
+/**
+ * Create an object that represents the document source.  The object
+ * will have a single field whose name is the source's name.  This
+ * will be used by the default implementation of addToJsonArray()
+ * to add this object to a pipeline being represented in JSON.
+ *
+ * @method	sourceToJson
+ * @param	{Object} builder	JSONObjBuilder: a blank object builder to write to
+ * @param	{Boolean}	explain	create explain output
+ **/
+proto.sourceToJson = function sourceToJson(builder, explain) {
+	builder.$skip = this.skip;
+};
+
+/**
+ * Creates a new RedactDocumentSource with the input number as the skip
+ *
+ * @param {Number} JsonElement this thing is *called* Json, but it expects a number
+ **/
+klass.createFromJson = function createFromJson(jsonElement, ctx) {
+	if (typeof jsonElement !== "number") throw new Error("code 15972; the value to skip must be a number");
+
+	var nextSkip = new RedactDocumentSource(ctx);
+
+	nextSkip.skip = jsonElement;
+	if (nextSkip.skip < 0 || isNaN(nextSkip.skip)) throw new Error("code 15956; the number to skip cannot be negative");
+
+	return nextSkip;
+};

+ 4 - 1
lib/pipeline/documentSources/index.js

@@ -9,5 +9,8 @@ module.exports = {
 	ProjectDocumentSource: require("./ProjectDocumentSource.js"),
 	SkipDocumentSource: require("./SkipDocumentSource.js"),
 	SortDocumentSource: require("./SortDocumentSource.js"),
-	UnwindDocumentSource: require("./UnwindDocumentSource.js")
+	UnwindDocumentSource: require("./UnwindDocumentSource.js"),
+	RedactDocumentSource: require("./RedactDocumentSource.js"),
+	GeoNearDocumentSource: require("./GeoNearDocumentSource.js"),
+	OutDocumentSource: require("./OutDocumentSource.js")
 };

+ 54 - 35
test/lib/pipeline/Pipeline.js

@@ -1,9 +1,9 @@
 "use strict";
 var assert = require("assert"),
 	Pipeline = require("../../../lib/pipeline/Pipeline"),
+	FieldPath = require("../../../lib/pipeline/FieldPath"),
 	DocumentSource = require('../../../lib/pipeline/documentSources/DocumentSource');
 
-
 module.exports = {
 
 	"Pipeline": {
@@ -34,17 +34,13 @@ module.exports = {
 					this.optimizeWasCalled = true;
 				};
 				
-				proto.eof = function(){
-					return this.current < 0;
-				};
-
-				proto.advance = function(){
-					this.current = this.current - 1;
-					return !this.eof();
-				};
-
-				proto.getCurrent = function(){
-					return this.current;
+				proto.getNext = function(callback){
+					var answer = this.current > 0 ? {val:this.current--} : null;
+					if(callback) {
+						return callback(null, answer);
+					} else {
+						return answer;
+					}
 				};
 
 				klass.createFromJson = function(options, ctx){
@@ -79,55 +75,78 @@ module.exports = {
 
 			"should swap $match and $sort if the $match immediately follows the $sort": function () {
 				var p = Pipeline.parseCommand({pipeline:[{$sort:{"xyz":1}}, {$match:{}}]});
-				assert.equal(p.sourceVector[0].constructor.matchName, "$match");
-				assert.equal(p.sourceVector[1].constructor.sortName, "$sort");
+				assert.equal(p.sources[0].constructor.matchName, "$match");
+				assert.equal(p.sources[1].constructor.sortName, "$sort");
 			},
 
 			"should attempt to coalesce all sources": function () {
 				var p = Pipeline.parseCommand({pipeline:[{$test:{coalesce:false}}, {$test:{coalesce:true}}, {$test:{coalesce:false}}, {$test:{coalesce:false}}]});
-				assert.equal(p.sourceVector.length, 3);
-				p.sourceVector.slice(0,-1).forEach(function(source){
+				assert.equal(p.sources.length, 3);
+				p.sources.slice(0,-1).forEach(function(source){
 					assert.equal(source.coalesceWasCalled, true);
 				});
-				assert.equal(p.sourceVector[p.sourceVector.length -1].coalesceWasCalled, false);
+				assert.equal(p.sources[p.sources.length -1].coalesceWasCalled, false);
 			},
 
 			"should optimize all sources": function () {
 				var p = Pipeline.parseCommand({pipeline:[{$test:{coalesce:false}}, {$test:{coalesce:false}}]});
-				p.sourceVector.forEach(function(source){
+				p.sources.forEach(function(source){
 					assert.equal(source.optimizeWasCalled, true);
 				});
 			}
 
 		},
 
-		"#run": {
-
-			"should set the parent source for all sources in the pipeline except the first one": function (next) {
+		"#stitch": {
+			"should set the parent source for all sources in the pipeline except the first one": function () {
 				var p = Pipeline.parseCommand({pipeline:[{$test:{coalesce:false}}, {$test:{coalesce:false}}, {$test:{coalesce:false}}]});
-				p.run(new DocumentSource({}), function(err, results){
-					assert.equal(p.sourceVector[1].source, p.sourceVector[0]);
-					assert.equal(p.sourceVector[2].source, p.sourceVector[1]);
-					return next();
-				});
-			},
+				p.stitch();
+				assert.equal(p.sources[1].source, p.sources[0]);
+			}
+		},
 
-			"should iterate through sources and return resultant array": function (next) {
-				var p = Pipeline.parseCommand({pipeline:[{$test:{coalesce:false}}, {$test:{coalesce:false}}, {$test:{coalesce:false}}]});
-				p.run(new DocumentSource({}), function(err, results){
-					assert.deepEqual(results.result, [5,4,3,2,1,0]); //see the test source for why this should be so
-					return next();
-				});
+		"#_runSync": {
+
+			"should iterate through sources and return resultant array": function () {
+				var p = Pipeline.parseCommand({pipeline:[{$test:{coalesce:false}}, {$test:{coalesce:false}}, {$test:{coalesce:false}}]}),
+					results = p.run();
+				assert.deepEqual(results.result, [ { val: 5 }, { val: 4 }, { val: 3 }, { val: 2 }, { val: 1 } ]);
 			},
 
 			"should call callback with errors from pipeline components": function (next) {
-				var p = Pipeline.parseCommand({pipeline:[{$match:{$foo:{bar:"baz"}}}]});
-				p.run(new DocumentSource({}), function(err, results){
+				var p = Pipeline.parseCommand({pipeline:[{$limit:1},{$project:{a:new Date()}}, {$project:{a:{$add:["$a", 1]}}}]}); //Can't add date
+				p.run(function(err, results){
 					assert(err instanceof Error);
 					return next();
 				});
 			}
 
+		},
+
+		"#_runAsync": {
+			"should iterate through sources and return resultant array asynchronously": function () {
+				var p = Pipeline.parseCommand({pipeline:[{$test:{coalesce:false}}, {$test:{coalesce:false}}, {$test:{coalesce:false}}]}),
+					results = p.run(function(err, results) {
+						assert.deepEqual(results.result, [ { val: 5 }, { val: 4 }, { val: 3 }, { val: 2 }, { val: 1 } ]);
+					});
+			}
+		},
+
+		"#addInitialSource": {
+			"should put the given source at the beginning of the pipeline": function () {
+				var p = Pipeline.parseCommand({pipeline:[{$test:{coalesce:false}}, {$test:{coalesce:false}}, {$test:{coalesce:false}}]}),
+					initialSource = Pipeline.stageDesc.$test({coalesce:false});
+				p.addInitialSource(initialSource);
+				assert.equal(initialSource, p.sources[0]);
+			},
+
+			"should be able to addInitialSource then stitch": function () {
+				var p = Pipeline.parseCommand({pipeline:[{$test:{coalesce:false}}, {$test:{coalesce:false}}, {$test:{coalesce:false}}]}),
+					initialSource = Pipeline.stageDesc.$test({coalesce:false});
+				p.addInitialSource(initialSource);
+				p.stitch();
+				assert.equal(p.sources[1].source, p.sources[0]);
+			}
 		}
 
 	}