Browse Source

Merge branch 'feature/mongo_2.6.5' into feature/mongo_2.6.5_aggregate

Phil Murray 11 years ago
parent
commit
fa309ca503

+ 22 - 56
lib/pipeline/Pipeline.js

@@ -204,16 +204,19 @@ klass.optimizations.sharded.findSplitPoint = function findSplitPoint(shardPipe,
 		var current = mergePipe.sources[0];
 		var current = mergePipe.sources[0];
 		mergePipe.sources.splice(0, 1);
 		mergePipe.sources.splice(0, 1);
 
 
-		if (typeof current.isSplittable != "undefined") {
-			shardPipe.sources.push(current);
-		}
-		else {
+		if (current.isSplittable && current.isSplittable()) {
 			var shardSource = current.getShardSource(),
 			var shardSource = current.getShardSource(),
 				mergeSource = current.getMergeSource();
 				mergeSource = current.getMergeSource();
-			if (typeof shardSource != "undefined") { shardPipe.sources.push(shardSource); }		//push_back
-			if (typeof mergeSource != "undefined") { mergePipe.sources.unshift(mergeSource); }	//push_front
+			//if (typeof shardSource != "undefined") { shardPipe.sources.push(shardSource); }		//push_back
+			if (shardSource) { shardPipe.sources.push(shardSource); }		//push_back
+			//if (typeof mergeSource != "undefined") { mergePipe.sources.unshift(mergeSource); }	//push_front
+			if (mergeSource) { mergePipe.sources.unshift(mergeSource); }	//push_front
 			break;
 			break;
 		}
 		}
+		else {
+			if (!shardPipe.sources) { shardPipe.sources = []; }
+			shardPipe.sources.push(current);
+		}
 	}
 	}
 };
 };
 
 
@@ -225,54 +228,15 @@ klass.optimizations.sharded.findSplitPoint = function findSplitPoint(shardPipe,
  * @param mergePipe merge sources
  * @param mergePipe merge sources
  */
  */
 klass.optimizations.sharded.moveFinalUnwindFromShardsToMerger = function moveFinalUnwindFromShardsToMerger(shardPipe, mergePipe) {
 klass.optimizations.sharded.moveFinalUnwindFromShardsToMerger = function moveFinalUnwindFromShardsToMerger(shardPipe, mergePipe) {
-	while((shardPipe.sources != null) && (!shardPipe.sources.length > 0 
-			&& shardPipe.sources[length-1].constructor === UnwindDocumentSource)) {
-		mergePipe.sources.unshift(shardPipe.sources[length-1]);
-		shardPipe.sources.pop();
+	if (true) {
+		while(shardPipe.sources !== null && (shardPipe.sources.length > 0 && shardPipe.sources[shardPipe.sources.length-1] instanceof UnwindDocumentSource)) {
+			mergePipe.sources.unshift(shardPipe.sources.pop());
+		}
 	}
 	}
 };
 };
 
 
 //SKIPPED: optimizations.sharded.limitFieldsSentFromShardsToMerger.  Somehow what this produces is not handled by Expression.js (err 16404)
 //SKIPPED: optimizations.sharded.limitFieldsSentFromShardsToMerger.  Somehow what this produces is not handled by Expression.js (err 16404)
-/**
- * Optimize pipeline by adding $project stage if shard fields are not exhaustive
- * @static
- * @method limitFieldsSentFromShardsToMerger
- * @param shardPipe shard sources
- * @param mergePipe merge sources
- */
-// klass.optimizations.sharded.limitFieldsSentFromShardsToMerger = function limitFieldsSentFromShardsToMerger(shardPipe, mergePipe) {
-// 	var mergeDeps = mergePipe.getDependencies(shardPipe.getInitialQuery());
-// 	if (mergeDeps.needWholeDocument) {
-// 		return;
-// 	}
-// 	if (mergeDeps.fields == null) {
-// 		mergeDeps.fields = {};
-// 	}
-// 	if (mergeDeps.fields.length == 0) {
-// 		mergeDeps.fields["_id"] = 0;
-// 	}
-// 	if (shardPipe.sources == null) {
-// 		shardPipe.sources = {};
-// 	}
-// 	//NOTE: Deviation from Mongo: not setting mergeDeps.needTextScore because we aren't handling that (Document meta stuff)
-
-//     // HEURISTIC: only apply optimization if none of the shard stages have an exhaustive list of
-//     // field dependencies. While this may not be 100% ideal in all cases, it is simple and
-//     // avoids the worst cases by ensuring that:
-//     // 1) Optimization IS applied when the shards wouldn't have known their exhaustive list of
-//     //    dependencies. This situation can happen when a $sort is before the first $project or
-//     //    $group. Without the optimization, the shards would have to reify and transmit full
-//     //    objects even though only a subset of fields are needed.
-//     // 2) Optimization IS NOT applied immediately following a $project or $group since it would
-//     //    add an unnecessary project (and therefore a deep-copy).
-//     for (var i = 0; i < shardPipe.sources.length; i++) {
-//         if (shardPipe.sources.getDependencies() & DocumentSource.GetDepsReturn.EXHAUSTIVE_FIELDS)
-//             return;
-//     }
-
-//     // if we get here, add the project.
-//     shardPipe.sources.push(ProjectDocumentSource.createFromJson({$project: mergeDeps.toProjection()[0]}, shardPipe.ctx));
-// };
+
 
 
 /**
 /**
  * Create an `Array` of `DocumentSource`s from the given JSON pipeline
  * Create an `Array` of `DocumentSource`s from the given JSON pipeline
@@ -397,9 +361,11 @@ proto.serialize = function serialize() {
 		array = [];
 		array = [];
 
 
 	// create an array out of the pipeline operations
 	// create an array out of the pipeline operations
-	for (var source in this.sources) {
-	//this.sources.forEach(function(source) {
-		source.serializeToArray(array);
+	if (this.sources) {
+		for (var i = 0; i < this.sources.length; i++) {
+		//this.sources.forEach(function(source) {
+			this.sources[i].serializeToArray(array);
+		}
 	}
 	}
 
 
 	serialized[klass.COMMAND_NAME] = this.ctx && this.ctx.ns && this.ctx.ns.coll ? this.ctx.ns.coll : '';
 	serialized[klass.COMMAND_NAME] = this.ctx && this.ctx.ns && this.ctx.ns.coll ? this.ctx.ns.coll : '';
@@ -434,11 +400,11 @@ proto.stitch = function stitch() {
 proto.run = function run(callback) {
 proto.run = function run(callback) {
 	// should not get here in the explain case
 	// should not get here in the explain case
 	if(this.explain) throw new Error("Should not be running a pipeline in explain mode!");
 	if(this.explain) throw new Error("Should not be running a pipeline in explain mode!");
-	
+
 	var doc = null,
 	var doc = null,
 		error = null,
 		error = null,
 		finalSource = this._getFinalSource();
 		finalSource = this._getFinalSource();
-	
+
 	async.doWhilst(
 	async.doWhilst(
 		function iterator(next){
 		function iterator(next){
 			return finalSource.getNext(function (err, obj){
 			return finalSource.getNext(function (err, obj){
@@ -493,7 +459,7 @@ proto.addInitialSource = function addInitialSource(source) {
 //Note: Deviation from Mongo: Mongo 2.6.5 passes a param to getDependencies
 //Note: Deviation from Mongo: Mongo 2.6.5 passes a param to getDependencies
 //	to calculate TextScore.  mungedb-aggregate doesn't do this, so no param is needed.
 //	to calculate TextScore.  mungedb-aggregate doesn't do this, so no param is needed.
 proto.getDependencies = function getDependencies () {
 proto.getDependencies = function getDependencies () {
-    var deps = new DepsTracker(), 
+    var deps = new DepsTracker(),
 		knowAllFields = false;
 		knowAllFields = false;
 
 
     //NOTE: Deviation from Mongo -- We aren't using Meta and textscore
     //NOTE: Deviation from Mongo -- We aren't using Meta and textscore

+ 13 - 9
lib/pipeline/documentSources/CursorDocumentSource.js

@@ -31,6 +31,10 @@ var CursorDocumentSource = module.exports = CursorDocumentSource = function Curs
 
 
 klass.MaxDocumentsToReturnToClientAtOnce = 150; //DEVIATION: we are using documents instead of bytes
 klass.MaxDocumentsToReturnToClientAtOnce = 150; //DEVIATION: we are using documents instead of bytes
 
 
+klass.create = function create(ns, runner, expCtx) {
+	return new CursorDocumentSource(ns, runner, expCtx);
+};
+
 proto._currentBatch = [];
 proto._currentBatch = [];
 proto._currentBatchIndex = 0;
 proto._currentBatchIndex = 0;
 
 
@@ -81,7 +85,7 @@ proto.getNext = function getNext(callback) {
 	if (this.expCtx && this.expCtx.checkForInterrupt && this.expCtx.checkForInterrupt()){
 	if (this.expCtx && this.expCtx.checkForInterrupt && this.expCtx.checkForInterrupt()){
 		return callback(new Error('Interrupted'));
 		return callback(new Error('Interrupted'));
 	}
 	}
-	
+
 	var self = this;
 	var self = this;
 	if (self._currentBatchIndex >= self._currentBatch.length) {
 	if (self._currentBatchIndex >= self._currentBatch.length) {
 		self._currentBatchIndex = 0;
 		self._currentBatchIndex = 0;
@@ -90,7 +94,7 @@ proto.getNext = function getNext(callback) {
 			if (err) return callback(err);
 			if (err) return callback(err);
 			if (self._currentBatch.length === 0)
 			if (self._currentBatch.length === 0)
 				return callback(null, null);
 				return callback(null, null);
-			
+
 			return callback(null, self._currentBatch[self._currentBatchIndex++]);
 			return callback(null, self._currentBatch[self._currentBatchIndex++]);
 		});
 		});
 	}
 	}
@@ -127,10 +131,10 @@ proto.coalesce = function coalesce(nextSource) {
 /**
 /**
  * Record the query that was specified for the cursor this wraps, if
  * Record the query that was specified for the cursor this wraps, if
  * any.
  * any.
- * 
+ *
  * This should be captured after any optimizations are applied to
  * This should be captured after any optimizations are applied to
  * the pipeline so that it reflects what is really used.
  * the pipeline so that it reflects what is really used.
- * 
+ *
  * This gets used for explain output.
  * This gets used for explain output.
  *
  *
  * @method	setQuery
  * @method	setQuery
@@ -143,10 +147,10 @@ proto.setQuery = function setQuery(query) {
 /**
 /**
  * Record the sort that was specified for the cursor this wraps, if
  * Record the sort that was specified for the cursor this wraps, if
  * any.
  * any.
- * 
+ *
  * This should be captured after any optimizations are applied to
  * This should be captured after any optimizations are applied to
  * the pipeline so that it reflects what is really used.
  * the pipeline so that it reflects what is really used.
- * 
+ *
  * This gets used for explain output.
  * This gets used for explain output.
  *
  *
  * @method	setSort
  * @method	setSort
@@ -196,7 +200,7 @@ proto.serialize = function serialize(explain) {
 
 
 /**
 /**
  * returns -1 for no limit
  * returns -1 for no limit
- * 
+ *
  * @method getLimit
  * @method getLimit
 **/
 **/
 proto.getLimit = function getLimit() {
 proto.getLimit = function getLimit() {
@@ -205,7 +209,7 @@ proto.getLimit = function getLimit() {
 
 
 /**
 /**
  * Load a batch of documents from the Runner into the internal array
  * Load a batch of documents from the Runner into the internal array
- * 
+ *
  * @method loadBatch
  * @method loadBatch
 **/
 **/
 proto.loadBatch = function loadBatch(callback) {
 proto.loadBatch = function loadBatch(callback) {
@@ -213,7 +217,7 @@ proto.loadBatch = function loadBatch(callback) {
 		this.dispose();
 		this.dispose();
 		return callback;
 		return callback;
 	}
 	}
-	
+
 	this._runner.restoreState();
 	this._runner.restoreState();
 
 
 	var self = this,
 	var self = this,

+ 4 - 0
lib/pipeline/documentSources/GeoNearDocumentSource.js

@@ -25,6 +25,10 @@ var GeoNearDocumentSource = module.exports = function GeoNearDocumentSource(ctx)
 
 
 klass.geoNearName = "$geoNear";
 klass.geoNearName = "$geoNear";
 
 
+klass.create = function create(expCtx) {
+	return new GeoNearDocumentSource(expCtx);
+};
+
 proto.getSourceName = function() {
 proto.getSourceName = function() {
 	return klass.geoNearName;
 	return klass.geoNearName;
 };
 };

+ 47 - 38
lib/pipeline/documentSources/GroupDocumentSource.js

@@ -108,14 +108,20 @@ proto.getNext = function getNext(callback) {
 				if(self.currentGroupsKeysIndex === self.groupsKeys.length) {
 				if(self.currentGroupsKeysIndex === self.groupsKeys.length) {
 					return next(null, null);
 					return next(null, null);
 				}
 				}
-
-				var id = self.originalGroupsKeys[self.currentGroupsKeysIndex],
-					stringifiedId = self.groupsKeys[self.currentGroupsKeysIndex],
-					accumulators = self.groups[stringifiedId],
+				
+				var out;
+				try {
+					var id = self.originalGroupsKeys[self.currentGroupsKeysIndex],
+						stringifiedId = self.groupsKeys[self.currentGroupsKeysIndex],
+						accumulators = self.groups[stringifiedId];
+						
 					out = self.makeDocument(id, accumulators, self.expCtx.inShard);
 					out = self.makeDocument(id, accumulators, self.expCtx.inShard);
 
 
-				if(++self.currentGroupsKeysIndex === self.groupsKeys.length) {
-					self.dispose();
+					if(++self.currentGroupsKeysIndex === self.groupsKeys.length) {
+						self.dispose();
+					}
+				} catch (ex) {
+					return next(ex);
 				}
 				}
 
 
 				return next(null, out);
 				return next(null, out);
@@ -300,44 +306,46 @@ proto.populate = function populate(callback) {
 					input = doc;
 					input = doc;
 					return cb(); //Need to stop now, no new input
 					return cb(); //Need to stop now, no new input
 				}
 				}
+				try {
+					input = doc;
+					self.variables.setRoot(input);
 
 
-				input = doc;
-				self.variables.setRoot(input);
-
-				/* get the _id value */
-				var id = self.computeId(self.variables);
+					/* get the _id value */
+					var id = self.computeId(self.variables);
 
 
-				if(undefined === id) id = null;
+					if(undefined === id) id = null;
 
 
-				var groupKey = JSON.stringify(id),
-					group = self.groups[groupKey];
+					var groupKey = JSON.stringify(id),
+						group = self.groups[groupKey];
 
 
-				if(!group) {
-					self.originalGroupsKeys.push(id);
-					self.groupsKeys.push(groupKey);
-					group = [];
-					self.groups[groupKey] = group;
-					// Add the accumulators
-					for(var afi = 0; afi<self.accumulatorFactories.length; afi++) {
-						group.push(new self.accumulatorFactories[afi]());
+					if(!group) {
+						self.originalGroupsKeys.push(id);
+						self.groupsKeys.push(groupKey);
+						group = [];
+						self.groups[groupKey] = group;
+						// Add the accumulators
+						for(var afi = 0; afi<self.accumulatorFactories.length; afi++) {
+							group.push(new self.accumulatorFactories[afi]());
+						}
 					}
 					}
-				}
-				//NOTE: Skipped memory usage stuff for case when group already existed
-
-				if(numAccumulators !== group.length) {
-					throw new Error('Group must have one of each accumulator');
-				}
+					//NOTE: Skipped memory usage stuff for case when group already existed
 
 
-				//NOTE: passing the input to each accumulator
-				for(var gi=0; gi<group.length; gi++) {
-					group[gi].process(self.expressions[gi].evaluate(self.variables, self.doingMerge));
-				}
+					if(numAccumulators !== group.length) {
+						throw new Error('Group must have one of each accumulator');
+					}
 
 
-				// We are done with the ROOT document so release it.
-				self.variables.clearRoot();
+					//NOTE: passing the input to each accumulator
+					for(var gi=0; gi<group.length; gi++) {
+						group[gi].process(self.expressions[gi].evaluate(self.variables, self.doingMerge));
+					}
 
 
-				//NOTE: Skipped the part about sorted files
+					// We are done with the ROOT document so release it.
+					self.variables.clearRoot();
 
 
+					//NOTE: Skipped the part about sorted files
+				} catch (ex) {
+					return cb(ex);
+				}
 				return cb();
 				return cb();
 			});
 			});
 		},
 		},
@@ -511,17 +519,18 @@ proto._getTypeStr = function _getTypeStr(obj) {
 
 
 proto.getShardSource = function getShardSource() {
 proto.getShardSource = function getShardSource() {
 	return this;
 	return this;
-}
+};
 
 
 proto.getMergeSource = function getMergeSource() {
 proto.getMergeSource = function getMergeSource() {
-	var merger = klass.create(this.expCtx);
+	var self = this,
+		merger = klass.create(this.expCtx);
 
 
 	var idGenerator = new VariablesIdGenerator(),
 	var idGenerator = new VariablesIdGenerator(),
 		vps = new VariablesParseState(idGenerator);
 		vps = new VariablesParseState(idGenerator);
 
 
 	merger.idExpressions.push(FieldPathExpression.parse("$$ROOT._id", vps));
 	merger.idExpressions.push(FieldPathExpression.parse("$$ROOT._id", vps));
 	for (var i = 0; i < self.fieldNames.length; i++) {
 	for (var i = 0; i < self.fieldNames.length; i++) {
-		merger.addAccumulator(self.fieldNames[i], self.accumulatorFactories[i], FieldPathExpression("$$ROOT." + self.fieldNames[i], vps));
+		merger.addAccumulator(self.fieldNames[i], self.accumulatorFactories[i], FieldPathExpression.create("$$ROOT." + self.fieldNames[i], vps));
 	}
 	}
 
 
 	return merger;
 	return merger;

+ 7 - 3
lib/pipeline/documentSources/MatchDocumentSource.js

@@ -60,9 +60,13 @@ proto.getNext = function getNext(callback) {
 	async.doUntil(
 	async.doUntil(
 		function(cb) {
 		function(cb) {
 			self.source.getNext(function(err, doc) {
 			self.source.getNext(function(err, doc) {
-				if(err) return callback(err);
-				if (makeReturn(doc) !== undefined) {
-					next = doc;
+				if(err) return cb(err);
+				try {
+					if (makeReturn(doc) !== undefined) {
+						next = doc;
+					}
+				} catch (ex) {
+					return cb(ex);
 				}
 				}
 				return cb();
 				return cb();
 			});
 			});

+ 8 - 0
lib/pipeline/documentSources/OutDocumentSource.js

@@ -52,6 +52,14 @@ klass.createFromJson = function(jsonElement, ctx) {
 //	It doesn't implement getShardSource or getMergeSource
 //	It doesn't implement getShardSource or getMergeSource
 klass.isSplittableDocumentSource = true;
 klass.isSplittableDocumentSource = true;
 
 
+proto.getShardSource = function getShardSource() {
+	return null;
+};
+
+proto.getMergeSource = function getMergeSource() {
+	return this;
+};
+
 //NeedsMongodDocumentSource implementation
 //NeedsMongodDocumentSource implementation
 klass.needsMongodDocumentSource = true;
 klass.needsMongodDocumentSource = true;
 
 

+ 9 - 5
lib/pipeline/documentSources/ProjectDocumentSource.js

@@ -13,7 +13,7 @@ var DocumentSource = require('./DocumentSource');
 var ProjectDocumentSource = module.exports = function ProjectDocumentSource(ctx, exprObj){
 var ProjectDocumentSource = module.exports = function ProjectDocumentSource(ctx, exprObj){
 	if (arguments.length > 2) throw new Error("up to two args expected");
 	if (arguments.length > 2) throw new Error("up to two args expected");
 	base.call(this, ctx);
 	base.call(this, ctx);
-	this.OE = new ObjectExpression(exprObj);
+	this.OE = ObjectExpression.create();
 	this._raw = undefined;
 	this._raw = undefined;
 	this._variables = undefined;
 	this._variables = undefined;
 }, klass = ProjectDocumentSource, base = require('./DocumentSource'), proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
 }, klass = ProjectDocumentSource, base = require('./DocumentSource'), proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
@@ -60,9 +60,13 @@ proto.getNext = function getNext(callback) {
 		 * If we're excluding fields at the top level, leave out the _id if
 		 * If we're excluding fields at the top level, leave out the _id if
 		 * it is found, because we took care of it above.
 		 * it is found, because we took care of it above.
 		 **/
 		 **/
-		self._variables.setRoot(input);
-		self.OE.addToDocument(out, input, self._variables);
-		self._variables.clearRoot();
+		try {
+			self._variables.setRoot(input);
+			self.OE.addToDocument(out, input, self._variables);
+			self._variables.clearRoot();
+		} catch (ex){
+			return callback(ex);
+		}
 
 
 		return callback(null, out);
 		return callback(null, out);
 	});
 	});
@@ -108,7 +112,7 @@ klass.createFromJson = function(elem, expCtx) {
 	var project = new ProjectDocumentSource(expCtx, exprObj);
 	var project = new ProjectDocumentSource(expCtx, exprObj);
 	project._variables = new Variables(idGenerator.getIdCount());
 	project._variables = new Variables(idGenerator.getIdCount());
 
 
-	var projectObj = elem
+	var projectObj = elem;
 	project.OE = exprObj;
 	project.OE = exprObj;
 
 
 	project._raw = elem;
 	project._raw = elem;

+ 12 - 5
lib/pipeline/documentSources/RedactDocumentSource.js

@@ -44,17 +44,24 @@ proto.getNext = function getNext(callback) {
 				doc = input;
 				doc = input;
 				if (input === null)
 				if (input === null)
 					return cb();
 					return cb();
-				self._variables.setRoot(input);
-				self._variables.setValue(self._currentId, input);
-				var result = self.redactObject();
+				var result;
+				try {
+					self._variables.setRoot(input);
+					self._variables.setValue(self._currentId, input);
+					result = self.redactObject();
+				} catch (ex) {
+					return cb(ex);
+				}
 				if (result !== null)
 				if (result !== null)
 					return cb(result); //Using the err argument to pass the result document; this lets us break out without having EOF
 					return cb(result); //Using the err argument to pass the result document; this lets us break out without having EOF
 				return cb();
 				return cb();
 			});
 			});
 		},
 		},
 		function(doc) {
 		function(doc) {
-			if (doc)
-				return callback(null, doc);
+			if (doc){
+				if (doc instanceof Error) return callback(doc);
+				else return callback(null, doc);
+			}
 			return callback(null, null);
 			return callback(null, null);
 		}
 		}
 	);
 	);

+ 15 - 30
lib/pipeline/documentSources/SortDocumentSource.js

@@ -61,13 +61,6 @@ proto.getLimit = function getLimit() {
 	return this.limitSrc ? this.limitSrc.getLimit() : -1;
 	return this.limitSrc ? this.limitSrc.getLimit() : -1;
 };
 };
 
 
-proto.getDependencies = function getDependencies(deps) {
-	for(var i = 0; i < this.vSortKey.length; ++i) {
-		this.vSortKey[i].addDependencies(deps);
-	}
-	return DocumentSource.GetDepsReturn.SEE_NEXT;
-};
-
 proto.coalesce = function coalesce(nextSource) {
 proto.coalesce = function coalesce(nextSource) {
 	if (!this.limitSrc) {
 	if (!this.limitSrc) {
 		if (nextSource instanceof LimitDocumentSource) {
 		if (nextSource instanceof LimitDocumentSource) {
@@ -186,25 +179,17 @@ proto.makeSortOptions = function makeSortOptions(){
 
 
 	var opts;
 	var opts;
 	if ( this.limitSrc)
 	if ( this.limitSrc)
-		opts.limit = limitSrc.getLimt();
+		opts.limit = this.limitSrc.getLimt();
 
 
 	return opts;
 	return opts;
-}
+};
 
 
 
 
 proto.populate = function populate(callback) {
 proto.populate = function populate(callback) {
 	if ( this._mergePresorted ){
 	if ( this._mergePresorted ){
 		// Skipping stuff about mergeCursors and commandShards
 		// Skipping stuff about mergeCursors and commandShards
-
-		if ( this.source instanceof MergeCursorDocumentSouce ){
-			populateFromCursors( this.source);
-		} else if ( this.source instanceof CommandShardsDocumentSource){
-			populateFromJsonArrays(this.source);
-		} else {
-			throw new Error("code 17196; the " + klass.sortName + "can only mergePresorted from MergeCursors and CommandShards");
-		}
+		throw new Error("Merge presorted not implemented.");
 	} else {
 	} else {
-
 		/* pull everything from the underlying source */
 		/* pull everything from the underlying source */
 		var self = this,
 		var self = this,
 			next;
 			next;
@@ -225,9 +210,12 @@ proto.populate = function populate(callback) {
 				return next !== null;
 				return next !== null;
 			},
 			},
 			function(err) {
 			function(err) {
-				/* sort the list */
-				self.documents.sort(SortDocumentSource.prototype.compare.bind(self));
-
+				try {
+					/* sort the list */
+					self.documents.sort(SortDocumentSource.prototype.compare.bind(self));
+				} catch (ex) {
+					return callback(ex);
+				}
 				/* start the sort iterator */
 				/* start the sort iterator */
 				self.docIterator = 0;
 				self.docIterator = 0;
 
 
@@ -258,7 +246,7 @@ klass.IteratorFromCursor = (function(){
 	};
 	};
 
 
 	proto.next = function next() {
 	proto.next = function next() {
-		var doc = DocumentSourceMergeCursors(this._cursor);
+		// var doc = new DocumentSourceMergeCursors(this._cursor);
 		// TODO: make_pair for return
 		// TODO: make_pair for return
 		//return {this._sorter.extractKey(doc): doc};
 		//return {this._sorter.extractKey(doc): doc};
 	};
 	};
@@ -273,7 +261,7 @@ proto.populateFromCursors = function populateFromCursors(cursors){
 
 
 	this._output.reset( ); // TODO: MySorter::Iterator::merge(iterators, makeSortOptions(), Comparator(*this))
 	this._output.reset( ); // TODO: MySorter::Iterator::merge(iterators, makeSortOptions(), Comparator(*this))
 
 
-}
+};
 
 
 klass.IteratorFromBsonArray = (function(){
 klass.IteratorFromBsonArray = (function(){
 	/**
 	/**
@@ -287,7 +275,7 @@ klass.IteratorFromBsonArray = (function(){
 	}, base = Object, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
 	}, base = Object, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
 
 
 	proto.next = function next() {
 	proto.next = function next() {
-		var doc = DocumentSourceMergeCursors(this._cursor);
+		// var doc = new DocumentSourceMergeCursors(this._cursor);
 		// TODO: make_pair for return
 		// TODO: make_pair for return
 		//return {this._sorter.extractKey(doc): doc};
 		//return {this._sorter.extractKey(doc): doc};
 	};
 	};
@@ -305,7 +293,8 @@ proto.populateFromBsonArrays = function populateFromBsonArrays(arrays){
 		//this.iterators.push(boost::make_shared<IteratorFromBsonArray>(this, arrays[i]));
 		//this.iterators.push(boost::make_shared<IteratorFromBsonArray>(this, arrays[i]));
 	}
 	}
 	this._output.reset( ); // TODO: MySorter::Iterator::merge(iterators, makeSortOptions(), Comparator(*this))
 	this._output.reset( ); // TODO: MySorter::Iterator::merge(iterators, makeSortOptions(), Comparator(*this))
-}
+};
+
 /**
 /**
 * Extract the key
 * Extract the key
 *
 *
@@ -323,7 +312,7 @@ proto.extractKey = function extractKey(d){
 		keys.push(this.vSortKey[i].evaluate(vars));
 		keys.push(this.vSortKey[i].evaluate(vars));
 	}
 	}
 	return keys;
 	return keys;
-}
+};
 
 
 /**
 /**
  * Compare two documents according to the specified sort key.
  * Compare two documents according to the specified sort key.
@@ -427,10 +416,6 @@ klass.create = function create(expCtx, sortOrder, limit) {
 		if ( keyField instanceof Object) {
 		if ( keyField instanceof Object) {
 			// this restriction is due to needing to figure out sort direction
 			// this restriction is due to needing to figure out sort direction
 			throw new Error("code 17312; " + klass.sortName + "the only expression supported by $sort right now is {$meta: 'textScore'}");
 			throw new Error("code 17312; " + klass.sortName + "the only expression supported by $sort right now is {$meta: 'textScore'}");
-
-			nextSort.vSortKey.push(new ExpressionMeta());
-			nextSort.vAscending.push(false); // best scoring documents first
-			continue;
 		}
 		}
 
 
 		if (typeof sortOrder[keyField] !== "number") throw new Error("code 15974; " + klass.sortName + "$sort key ordering must be specified using a number or {$meta: 'text'}");
 		if (typeof sortOrder[keyField] !== "number") throw new Error("code 15974; " + klass.sortName + "$sort key ordering must be specified using a number or {$meta: 'text'}");

+ 22 - 6
lib/pipeline/documentSources/UnwindDocumentSource.js

@@ -86,6 +86,7 @@ klass.Unwinder = (function() {
 /**
 /**
  * Get the document source name.
  * Get the document source name.
  *
  *
+ * @method getSourceName
  * @returns {string}
  * @returns {string}
  */
  */
 proto.getSourceName = function getSourceName() {
 proto.getSourceName = function getSourceName() {
@@ -95,6 +96,7 @@ proto.getSourceName = function getSourceName() {
 /**
 /**
  * Get the next source.
  * Get the next source.
  *
  *
+ * @method getNext
  * @param callback
  * @param callback
  * @returns {*}
  * @returns {*}
  */
  */
@@ -108,8 +110,14 @@ proto.getNext = function getNext(callback) {
 	}
 	}
 
 
 	var self = this,
 	var self = this,
-		out = this._unwinder.getNext(),
+		out,
 		exhausted = false;
 		exhausted = false;
+		
+	try {
+		out = this._unwinder.getNext();
+	} catch (ex) {
+		return callback(ex);
+	}
 
 
 	async.until(
 	async.until(
 		function () {
 		function () {
@@ -125,11 +133,15 @@ proto.getNext = function getNext(callback) {
 					return cb(err);
 					return cb(err);
 				}
 				}
 
 
-				if (doc === null) {
-					exhausted = true;
-				} else {
-					self._unwinder.resetDocument(doc);
-					out = self._unwinder.getNext();
+				try {
+					if (doc === null) {
+						exhausted = true;
+					} else {
+						self._unwinder.resetDocument(doc);
+						out = self._unwinder.getNext();
+					}
+				} catch (ex) {
+					return cb(ex);
 				}
 				}
 
 
 				return cb();
 				return cb();
@@ -150,6 +162,7 @@ proto.getNext = function getNext(callback) {
 /**
 /**
  * Serialize the data.
  * Serialize the data.
  *
  *
+ * @method serialize
  * @param explain
  * @param explain
  * @returns {{}}
  * @returns {{}}
  */
  */
@@ -168,6 +181,7 @@ proto.serialize = function serialize(explain) {
 /**
 /**
  * Get the fields this operation needs to do its job.
  * Get the fields this operation needs to do its job.
  *
  *
+ * @method getDependencies
  * @param deps
  * @param deps
  * @returns {DocumentSource.GetDepsReturn.SEE_NEXT|*}
  * @returns {DocumentSource.GetDepsReturn.SEE_NEXT|*}
  */
  */
@@ -184,6 +198,7 @@ proto.getDependencies = function getDependencies(deps) {
 /**
 /**
  * Unwind path.
  * Unwind path.
  *
  *
+ * @method unwindPath
  * @param fieldPath
  * @param fieldPath
  */
  */
 proto.unwindPath = function unwindPath(fieldPath) {
 proto.unwindPath = function unwindPath(fieldPath) {
@@ -198,6 +213,7 @@ proto.unwindPath = function unwindPath(fieldPath) {
 
 
 /**
 /**
  * Creates a new UnwindDocumentSource with the input path as the path to unwind
  * Creates a new UnwindDocumentSource with the input path as the path to unwind
+ * @method createFromJson
  * @param {String} JsonElement this thing is *called* Json, but it expects a string
  * @param {String} JsonElement this thing is *called* Json, but it expects a string
 **/
 **/
 klass.createFromJson = function createFromJson(jsonElement, ctx) {
 klass.createFromJson = function createFromJson(jsonElement, ctx) {

+ 43 - 15
test/lib/pipeline/Pipeline.js

@@ -4,6 +4,7 @@ var assert = require("assert"),
 	FieldPath = require("../../../lib/pipeline/FieldPath"),
 	FieldPath = require("../../../lib/pipeline/FieldPath"),
 	DocumentSource = require('../../../lib/pipeline/documentSources/DocumentSource'),
 	DocumentSource = require('../../../lib/pipeline/documentSources/DocumentSource'),
 	CursorDocumentSource = require("../../../lib/pipeline/documentSources/CursorDocumentSource"),
 	CursorDocumentSource = require("../../../lib/pipeline/documentSources/CursorDocumentSource"),
+	ProjectDocumentSource = require("../../../lib/pipeline/documentSources/ProjectDocumentSource"),
 	ArrayRunner = require("../../../lib/query/ArrayRunner");
 	ArrayRunner = require("../../../lib/query/ArrayRunner");
 
 
 var addSource = function addSource(match, data) {
 var addSource = function addSource(match, data) {
@@ -11,8 +12,12 @@ var addSource = function addSource(match, data) {
 	match.setSource(cds);
 	match.setSource(cds);
 };
 };
 
 
-var shardedTest = function(inputPipe, expectedMergePipeString, expectedShardPipeString) {
-	var expectedMergePipe = JSON.parse(expectedMergePipeString),
+var shardedTest = function(inputPipeString, expectedMergePipeString, expectedShardPipeString) {
+	inputPipeString = '{"pipeline": ' + inputPipeString + '}';
+	expectedMergePipeString = '{"pipeline": ' + expectedMergePipeString + '}';
+	expectedShardPipeString = '{"pipeline": ' + expectedShardPipeString + '}';
+	var inputPipe = JSON.parse(inputPipeString),
+		expectedMergePipe = JSON.parse(expectedMergePipeString),
 		expectedShardPipe = JSON.parse(expectedShardPipeString);
 		expectedShardPipe = JSON.parse(expectedShardPipeString);
 
 
 	var mergePipe = Pipeline.parseCommand(inputPipe, {});
 	var mergePipe = Pipeline.parseCommand(inputPipe, {});
@@ -21,10 +26,8 @@ var shardedTest = function(inputPipe, expectedMergePipeString, expectedShardPipe
 	var shardPipe = mergePipe.splitForSharded();
 	var shardPipe = mergePipe.splitForSharded();
 	assert.notEqual(shardPipe, null);
 	assert.notEqual(shardPipe, null);
 
 
-	assert.equal(shardPipe.serialize()["pipeline"],
-		expectedShardPipe["pipeline"]);
-	assert.equal(mergePipe.serialize()["pipeline"],
-		expectedMergePipe["pipeline"]);
+	assert.deepEqual(shardPipe.serialize().pipeline, expectedShardPipe.pipeline);
+	assert.deepEqual(mergePipe.serialize().pipeline, expectedMergePipe.pipeline);
 };
 };
 
 
 module.exports = {
 module.exports = {
@@ -145,25 +148,38 @@ module.exports = {
 		"sharded": {
 		"sharded": {
 
 
 			"should handle empty pipeline for sharded": function () {
 			"should handle empty pipeline for sharded": function () {
-				var inputPipe = {pipeline: []},
+				var inputPipe = "[]",
 					expectedMergePipe = "[]",
 					expectedMergePipe = "[]",
 					expectedShardPipe = "[]";
 					expectedShardPipe = "[]";
-				shardedTest(inputPipe, expectedMergePipe, expectedShardPipe);
+				shardedTest(inputPipe, expectedShardPipe, expectedMergePipe);
 			},
 			},
 
 
 			"should handle one unwind": function () {
 			"should handle one unwind": function () {
-				var inputPipe = "[{$unwind: '$a'}]}",
-					expectedMergePipe = "[]",
-					expectedShardPipe = "[{$unwind: '$a'}]";
+				var inputPipe = '[{"$unwind":"$a"}]',
+					expectedShardPipe = "[]",
+					expectedMergePipe = '[{"$unwind":"$a"}]';
 				shardedTest(inputPipe, expectedMergePipe, expectedShardPipe);
 				shardedTest(inputPipe, expectedMergePipe, expectedShardPipe);
 			},
 			},
 
 
 			"should handle two unwinds": function () {
 			"should handle two unwinds": function () {
-				var inputPipe = "[{$unwind: '$a'}, {$unwind: '$b'}]}",
-					expectedMergePipe = "[]",
-					expectedShardPipe = "[{$unwind: '$a'}, {$unwind: '$b'}]}";
+				var inputPipe = '[{"$unwind":"$a"}, {"$unwind":"$b"}]',
+					expectedShardPipe = "[]",
+					expectedMergePipe = '[{"$unwind": "$a"}, {"$unwind": "$b"}]';
 				shardedTest(inputPipe, expectedMergePipe, expectedShardPipe);
 				shardedTest(inputPipe, expectedMergePipe, expectedShardPipe);
+			},
 
 
+			"should handle unwind not final": function () {
+				var inputPipe = '[{"$unwind": "$a"}, {"$match": {"a":1}}]',
+					expectedShardPipe = '[]',
+					expectedMergePipe = '[{"$unwind": "$a"}, {"$match": {"a":1}}]';
+				shardedTest(inputPipe, expectedShardPipe, expectedMergePipe);
+			},
+
+			"should handle unwind with other": function () {
+				var inputPipe = '[{"$match": {"a":1}}, {"$unwind": "$a"}]',
+					expectedShardPipe = '[{"$match":{"a":1}}]',
+					expectedMergePipe = '[{"$unwind":"$a"}]';
+				shardedTest(inputPipe,expectedMergePipe, expectedShardPipe);
 			}
 			}
 
 
 		},
 		},
@@ -216,8 +232,20 @@ module.exports = {
 				p.stitch();
 				p.stitch();
 				assert.equal(p.sources[1].source, p.sources[0]);
 				assert.equal(p.sources[1].source, p.sources[0]);
 			}
 			}
-		}
+		},
 
 
+		"#getDependencies()": {
+
+			"should properly detect dependencies": function testGetDependencies() {
+				var p = Pipeline.parseCommand({pipeline: [
+					{$sort: {"xyz": 1}},
+					{$project: {"a":"$xyz"}}
+				]});
+				var depsTracker = p.getDependencies();
+				assert.equal(Object.keys(depsTracker.fields).length, 2);
+			}
+
+		}
 	}
 	}
 
 
 };
 };

+ 12 - 1
test/lib/pipeline/documentSources/GroupDocumentSource.js

@@ -24,7 +24,6 @@ function assertExpectedResult(args) {
 			next,
 			next,
 			results = [],
 			results = [],
 			cds = new CursorDocumentSource(null, new ArrayRunner(args.docs), null);
 			cds = new CursorDocumentSource(null, new ArrayRunner(args.docs), null);
-			debugger;
 		gds.setSource(cds);
 		gds.setSource(cds);
 		async.whilst(
 		async.whilst(
 			function() {
 			function() {
@@ -302,6 +301,18 @@ module.exports = {
 					spec: {_id:0, first:{$first:"$missing"}},
 					spec: {_id:0, first:{$first:"$missing"}},
 					expected: [{_id:0, first:null}]
 					expected: [{_id:0, first:null}]
 				});
 				});
+			},
+			
+			"should return errors in the callback": function(done){
+				var gds = GroupDocumentSource.createFromJson({_id:null, sum: {$sum:"$a"}}),
+					next,
+					results = [],
+					cds = new CursorDocumentSource(null, new ArrayRunner([{"a":"foo"}]), null);
+				gds.setSource(cds);
+				gds.getNext(function(err, doc) {
+					assert(err, "Expected Error");
+					done();
+				});
 			}
 			}
 		}
 		}
 
 

+ 0 - 1
test/lib/pipeline/documentSources/MatchDocumentSource.js

@@ -84,7 +84,6 @@ module.exports = {
 					items = [ 1,2,3,4,5,6,7,8,9 ];
 					items = [ 1,2,3,4,5,6,7,8,9 ];
 				addSource(mds, items.map(function(i){return {item:i};}));
 				addSource(mds, items.map(function(i){return {item:i};}));
 
 
-				debugger;
 				async.series([
 				async.series([
 						mds.getNext.bind(mds),
 						mds.getNext.bind(mds),
 						mds.getNext.bind(mds),
 						mds.getNext.bind(mds),

+ 12 - 1
test/lib/pipeline/documentSources/ProjectDocumentSource.js

@@ -7,7 +7,8 @@ var assert = require("assert"),
 	CursorDocumentSource = require("../../../../lib/pipeline/documentSources/CursorDocumentSource"),
 	CursorDocumentSource = require("../../../../lib/pipeline/documentSources/CursorDocumentSource"),
 	ArrayRunner = require("../../../../lib/query/ArrayRunner"),
 	ArrayRunner = require("../../../../lib/query/ArrayRunner"),
 	TestBase = require("./TestBase"),
 	TestBase = require("./TestBase"),
-	And = require("../../../../lib/pipeline/expressions/AndExpression");
+	And = require("../../../../lib/pipeline/expressions/AndExpression"),
+	Add = require("../../../../lib/pipeline/expressions/AddExpression");
 
 
 
 
 /**
 /**
@@ -65,6 +66,16 @@ module.exports = {
 
 
 	"#getNext()": {
 	"#getNext()": {
 
 
+		"should return errors in the callback": function Errors() {
+			var input = [{_id: 0, a: "foo"}];
+			var cds = new CursorDocumentSource(null, new ArrayRunner(input), null);
+			var pds = ProjectDocumentSource.createFromJson({x:{"$add":["$a", "$a"]}});
+			pds.setSource(cds);
+			pds.getNext(function(err, actual) {
+				assert(err, "Expected error");
+			});
+		},
+
 		"should return EOF": function testEOF(next) {
 		"should return EOF": function testEOF(next) {
 			var pds = createProject({});
 			var pds = createProject({});
 			pds.setSource({
 			pds.setSource({

+ 12 - 0
test/lib/pipeline/documentSources/RedactDocumentSource.js

@@ -61,6 +61,18 @@ module.exports = {
 					next();
 					next();
 				});
 				});
 			},
 			},
+			"should return Error in callback": function testError(next) {
+				var rds = RedactDocumentSource.createFromJson({$cond:{
+					if:{$gt:[0,{$add:["$a", 3]}]},
+					then:"$$DESCEND",
+					else:"$$PRUNE"
+				}});
+				rds.setSource(createCursorDocumentSource([{a:"foo"}]));
+				rds.getNext(function(err, doc) {
+					assert(err, "Expected Error");
+					next();
+				});
+			},
 
 
 			"iterator state accessors consistently report the source is exhausted": function assertExhausted() {
 			"iterator state accessors consistently report the source is exhausted": function assertExhausted() {
 				var input = [{}];
 				var input = [{}];

+ 1 - 1
test/lib/pipeline/documentSources/SortDocumentSource.js

@@ -569,7 +569,7 @@ module.exports = {
 
 
 				var deps = {fields: {}, needWholeDocument: false, needTextScore: false};
 				var deps = {fields: {}, needWholeDocument: false, needTextScore: false};
 
 
-				assert.equal('SEE_NEXT', sds.getDependencies(deps));
+				assert.equal(DocumentSource.GetDepsReturn.SEE_NEXT, sds.getDependencies(deps));
 				// Sort keys are now part of deps fields.
 				// Sort keys are now part of deps fields.
 				assert.equal(3, Object.keys(deps.fields).length);
 				assert.equal(3, Object.keys(deps.fields).length);
 			 	assert.equal(1, deps.fields.a);
 			 	assert.equal(1, deps.fields.a);

+ 2 - 2
test/lib/pipeline/documentSources/TestBase.js

@@ -16,7 +16,7 @@ var TestBase = (function() {
 	proto.createProject = function(projection) {
 	proto.createProject = function(projection) {
 		projection = projection || {a:true};
 		projection = projection || {a:true};
 		var spec = {$project:projection};
 		var spec = {$project:projection};
-		this._project = ProjectDocumentSource(spec /*,ctx()*/);
+		this._project = new ProjectDocumentSource(spec /*,ctx()*/);
 		this.checkJsonRepresentation(spec);
 		this.checkJsonRepresentation(spec);
 		this._project.setSource(this.source());
 		this._project.setSource(this.source());
 	};
 	};
@@ -44,4 +44,4 @@ var TestBase = (function() {
 	return klass;
 	return klass;
 })();
 })();
 
 
-module.exports = TestBase;
+module.exports = TestBase;