Explorar o código

Merge pull request #112 from RiveraGroup/feature/mongo_2.6.5_documentSource_Cursor

Feature/mongo 2.6.5 document source cursor
Chris Sexton %!s(int64=11) %!d(string=hai) anos
pai
achega
bb75b3abd1

+ 0 - 30
lib/Cursor.js

@@ -1,30 +0,0 @@
-"use strict";
-
-/**
- * This class is a simplified implementation of the cursors used in MongoDB for reading from an Array of documents.
- * @param	{Array}	items	The array source of the data
- **/
-var klass = module.exports = function Cursor(items){
-	if (!(items instanceof Array)) throw new Error("arg `items` must be an Array");
-	this.cachedData = items.slice(0);	// keep a copy so array changes when using async doc srcs do not cause side effects
-	this.length = items.length;
-	this.offset = 0;
-}, base = Object, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
-
-proto.ok = function ok(){
-	return (this.offset < this.length) || this.hasOwnProperty("curr");
-};
-
-proto.advance = function advance(){
-	if (this.offset >= this.length){
-		delete this.curr;
-		return false;
-	}
-	this.curr = this.cachedData[this.offset++];
-	return this.curr;
-};
-
-proto.current = function current(){
-	if (!this.hasOwnProperty("curr")) this.advance();
-	return this.curr;
-};

+ 175 - 168
lib/pipeline/documentSources/CursorDocumentSource.js

@@ -1,14 +1,13 @@
 "use strict";
 "use strict";
 
 
-var DocumentSource = require('./DocumentSource'),
+var async = require('async'),
+	Value = require('../Value'),
+	Runner = require('../../query/Runner'),
+	DocumentSource = require('./DocumentSource'),
 	LimitDocumentSource = require('./LimitDocumentSource');
 	LimitDocumentSource = require('./LimitDocumentSource');
 
 
-// Mimicking max memory size from mongo/db/query/new_find.cpp
-// Need to actually decide some size for this?
-var MAX_BATCH_DOCS = 150;
-
 /**
 /**
- * Constructs and returns Documents from the objects produced by a supplied Cursor.
+ * Constructs and returns Documents from the BSONObj objects produced by a supplied Runner.
  * An object of this type may only be used by one thread, see SERVER-6123.
  * An object of this type may only be used by one thread, see SERVER-6123.
  *
  *
  * This is usually put at the beginning of a chain of document sources
  * This is usually put at the beginning of a chain of document sources
@@ -20,46 +19,36 @@ var MAX_BATCH_DOCS = 150;
  * @constructor
  * @constructor
  * @param	{CursorDocumentSource.CursorWithContext}	cursorWithContext the cursor to use to fetch data
  * @param	{CursorDocumentSource.CursorWithContext}	cursorWithContext the cursor to use to fetch data
  **/
  **/
-var CursorDocumentSource = module.exports = CursorDocumentSource = function CursorDocumentSource(cursorWithContext, expCtx){
+var CursorDocumentSource = module.exports = CursorDocumentSource = function CursorDocumentSource(namespace, runner, expCtx){
 	base.call(this, expCtx);
 	base.call(this, expCtx);
 
 
-	this.current = null;
+	this._docsAddedToBatches = 0;
+	this._ns = namespace;
+	this._runner = runner;
 
 
-//	this.ns = null;
-//	/*
-//	The bson dependencies must outlive the Cursor wrapped by this
-//	source.  Therefore, bson dependencies must appear before pCursor
-//	in order cause its destructor to be called *after* pCursor's.
-//	*/
-//	this.query = null;
-//	this.sort = null;
+}, klass = CursorDocumentSource, base = DocumentSource, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
 
 
-	this._projection = null;
+klass.MaxDocumentsToReturnToClientAtOnce = 150; //DEVIATION: we are using documents instead of bytes
 
 
-	this._cursorWithContext = cursorWithContext;
-	this._curIdx = 0;
-	this._currentBatch = [];
-	this._limit = undefined;
-	this._docsAddedToBatches = 0;
+proto._currentBatch = [];
+proto._currentBatchIndex = 0;
 
 
-	if (!this._cursorWithContext || !this._cursorWithContext._cursor) throw new Error("CursorDocumentSource requires a valid cursorWithContext");
+// BSONObj members must outlive _projection and cursor.
+proto._query = undefined;
+proto._sort = undefined;
+proto._projection = undefined;
+proto._dependencies = undefined;
+proto._limit = undefined;
+proto._docsAddedToBatches = undefined; // for _limit enforcement
 
 
-}, klass = CursorDocumentSource, base = require('./DocumentSource'), proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
+proto._ns = undefined;
+proto._runner = undefined; // PipelineRunner holds a weak_ptr to this.
 
 
 
 
-klass.CursorWithContext = (function (){
-	/**
-	 * Holds a Cursor and all associated state required to access the cursor.
-	 * @class CursorWithContext
-	 * @namespace mungedb-aggregate.pipeline.documentSources.CursorDocumentSource
-	 * @module mungedb-aggregate
-	 * @constructor
-	 **/
-	var klass = function CursorWithContext(ns){
-		this._cursor = null;
-	};
-	return klass;
-})();
+
+proto.isValidInitialSource = function(){
+	return true;
+};
 
 
 /**
 /**
  * Release the Cursor and the read lock it requires, but without changing the other data.
  * Release the Cursor and the read lock it requires, but without changing the other data.
@@ -69,189 +58,207 @@ klass.CursorWithContext = (function (){
  * @method	dispose
  * @method	dispose
  **/
  **/
 proto.dispose = function dispose() {
 proto.dispose = function dispose() {
-	this._cursorWithContext = null;
+	if (this._runner) this._runner.reset();
 	this._currentBatch = [];
 	this._currentBatch = [];
-	this._curIdx = 0;
 };
 };
 
 
+/**
+ * Get the source's name.
+ * @method	getSourceName
+ * @returns	{String}	the string name of the source as a constant string; this is static, and there's no need to worry about adopting it
+ **/
 proto.getSourceName = function getSourceName() {
 proto.getSourceName = function getSourceName() {
 	return "$cursor";
 	return "$cursor";
 };
 };
 
 
+/**
+ * Returns the next Document if there is one
+ *
+ * @method	getNext
+ **/
 proto.getNext = function getNext(callback) {
 proto.getNext = function getNext(callback) {
-	if (!callback) throw new Error(this.getSourceName() + ' #getNext() requires callback');
-
-	if (this._currentBatch.length <= this._curIdx) {
-		this.loadBatch();
-
-		if (this._currentBatch.length <= this._curIdx) {
-			callback(null, DocumentSource.EOF);
-			return DocumentSource.EOF;
-		}
+	if (this.expCtx && this.expCtx.checkForInterrupt && this.expCtx.checkForInterrupt()){
+		return callback(new Error('Interrupted'));
 	}
 	}
-
-	// Don't unshift. It's expensiver.
-	var out = this._currentBatch[this._curIdx];
-	this._curIdx++;
-
-	callback(null, out);
-	return out;
+	
+	var self = this;
+	if (self._currentBatchIndex >= self._currentBatch.length) {
+		self._currentBatchIndex = 0;
+		self._currentBatch = [];
+		return self.loadBatch(function(err){
+			if (err) return callback(err);
+			if (self._currentBatch.length === 0)
+				return callback(null, null);
+			
+			return callback(null, self._currentBatch[self._currentBatchIndex++]);
+		});
+	}
+	return callback(null, self._currentBatch[self._currentBatchIndex++]);
 };
 };
 
 
+/**
+ * Attempt to coalesce this DocumentSource with any $limits that it encounters
+ *
+ * @method	coalesce
+ * @param	{DocumentSource}	nextSource	the next source in the document processing chain.
+ * @returns	{Boolean}	whether or not the attempt to coalesce was successful or not; if the attempt was not successful, nothing has been changed
+ **/
 proto.coalesce = function coalesce(nextSource) {
 proto.coalesce = function coalesce(nextSource) {
-	if (this._limit) {
+	// Note: Currently we assume the $limit is logically after any $sort or
+	// $match. If we ever pull in $match or $sort using this method, we
+	// will need to keep track of the order of the sub-stages.
+
+	if (!this._limit) {
+		if (nextSource instanceof LimitDocumentSource) {
+			this._limit = nextSource;
+			return this._limit;
+		}
+		return false;// false if next is not a $limit
+	}
+	else {
 		return this._limit.coalesce(nextSource);
 		return this._limit.coalesce(nextSource);
-	} else if (nextSource instanceof LimitDocumentSource) {
-		this._limit = nextSource;
-		return this._limit;
-	} else {
-		return false;
 	}
 	}
+
+	return false;
 };
 };
 
 
-///**
-// * Record the namespace.  Required for explain.
-// *
-// * @method	setNamespace
-// * @param	{String}	ns	the namespace
-// **/
-//proto.setNamespace = function setNamespace(ns) {}
-//
-///**
-// * Record the query that was specified for the cursor this wraps, if any.
-// * This should be captured after any optimizations are applied to
-// * the pipeline so that it reflects what is really used.
-// * This gets used for explain output.
-// *
-// * @method	setQuery
-// * @param	{Object}	pBsonObj	the query to record
-// **/
+
+/**
+ * Record the query that was specified for the cursor this wraps, if
+ * any.
+ * 
+ * This should be captured after any optimizations are applied to
+ * the pipeline so that it reflects what is really used.
+ * 
+ * This gets used for explain output.
+ *
+ * @method	setQuery
+ * @param	{Object}	pBsonObj	the query to record
+ **/
 proto.setQuery = function setQuery(query) {
 proto.setQuery = function setQuery(query) {
 	this._query = query;
 	this._query = query;
 };
 };
 
 
-///**
-// * Record the sort that was specified for the cursor this wraps, if any.
-// * This should be captured after any optimizations are applied to
-// * the pipeline so that it reflects what is really used.
-// * This gets used for explain output.
-// *
-// * @method	setSort
-// * @param	{Object}	pBsonObj	the query to record
-// **/
-//proto.setSort = function setSort(pBsonObj) {};
+/**
+ * Record the sort that was specified for the cursor this wraps, if
+ * any.
+ * 
+ * This should be captured after any optimizations are applied to
+ * the pipeline so that it reflects what is really used.
+ * 
+ * This gets used for explain output.
+ *
+ * @method	setSort
+ * @param	{Object}	pBsonObj	the query to record
+ **/
+proto.setSort = function setSort(sort) {
+	this._sort = sort;
+};
 
 
 /**
 /**
- * setProjection method
+ * Informs this object of projection and dependency information.
  *
  *
  * @method	setProjection
  * @method	setProjection
  * @param	{Object}	projection
  * @param	{Object}	projection
  **/
  **/
 proto.setProjection = function setProjection(projection, deps) {
 proto.setProjection = function setProjection(projection, deps) {
-
-	if (this._projection){
-		throw new Error("projection is already set");
-	}
-
-
-	//dont think we need this yet
-
-//	this._projection = new Projection();
-//	this._projection.init(projection);
-//
-//	this.cursor().fields = this._projection;
-
-	this._projection = projection;  //just for testing
+	this._projection = projection;
 	this._dependencies = deps;
 	this._dependencies = deps;
 };
 };
 
 
-//----------------virtuals from DocumentSource--------------
-
 /**
 /**
- * Set the underlying source this source should use to get Documents
- * from.
- * It is an error to set the source more than once.  This is to
- * prevent changing sources once the original source has been started;
- * this could break the state maintained by the DocumentSource.
- * This pointer is not reference counted because that has led to
- * some circular references.  As a result, this doesn't keep
- * sources alive, and is only intended to be used temporarily for
- * the lifetime of a Pipeline::run().
  *
  *
  * @method setSource
  * @method setSource
  * @param source   {DocumentSource}  the underlying source to use
  * @param source   {DocumentSource}  the underlying source to use
  * @param callback  {Function}        a `mungedb-aggregate`-specific extension to the API to half-way support reading from async sources
  * @param callback  {Function}        a `mungedb-aggregate`-specific extension to the API to half-way support reading from async sources
  **/
  **/
 proto.setSource = function setSource(theSource) {
 proto.setSource = function setSource(theSource) {
-	if (theSource) throw new Error("CursorDocumentSource doesn't take a source"); //TODO: This needs to put back without the if once async is fully and properly supported
+	throw new Error('this doesnt take a source');
 };
 };
 
 
 proto.serialize = function serialize(explain) {
 proto.serialize = function serialize(explain) {
-	if (!explain)
-		return null;
 
 
-	if (!this._cursorWithContext)
-		throw new Error("code 17135; Cursor deleted.");
+	// we never parse a documentSourceCursor, so we only serialize for explain
+	if (!explain)
+		return {};
 
 
-	// A stab at what mongo wants
-	return {
+	var out = {};
+	out[this.getSourceName()] = {
 		query: this._query,
 		query: this._query,
 		sort: this._sort ? this._sort : null,
 		sort: this._sort ? this._sort : null,
-		limit: this._limit ? this._limit : null,
+		limit: this._limit ? this._limit.getLimit() : null,
 		fields: this._projection ? this._projection : null,
 		fields: this._projection ? this._projection : null,
-		indexonly: false,
-		cursorType: this._cursorWithContext ? "cursor" : null
+		plan: this._runner.getInfo(explain)
 	};
 	};
+	return out;
 };
 };
 
 
-// LimitDocumentSource has the setLimit function which trickles down to any documentsource
+/**
+ * returns -1 for no limit
+ * 
+ * @method getLimit
+**/
 proto.getLimit = function getLimit() {
 proto.getLimit = function getLimit() {
 	return this._limit ? this._limit.getLimit() : -1;
 	return this._limit ? this._limit.getLimit() : -1;
 };
 };
 
 
-//----------------private--------------
-
-//proto.chunkMgr = function chunkMgr(){};
-
-//proto.canUseCoveredIndex = function canUseCoveredIndex(){};
-
-//proto.yieldSometimes = function yieldSometimes(){};
-
-proto.loadBatch = function loadBatch() {
-	var nDocs = 0,
-		cursor = this._cursorWithContext ? this._cursorWithContext._cursor : null;
-
-	if (!cursor)
-		return this.dispose();
-
-	for(;cursor.ok(); cursor.advance()) {
-		if (!cursor.ok())
-			break;
-
-		// these methods do not exist
-		// if (!cursor.currentMatches() || cursor.currentIsDup())
-		// continue;
-
-		var next = cursor.current();
-		this._currentBatch.push(this._projection ? base.documentFromJsonWithDeps(next, this._dependencies) : next);
-
-		if (this._limit) {
-			this._docsAddedToBatches++;
-			if (this._docsAddedToBatches == this._limit.getLimit())
-				break;
-
-			if (this._docsAddedToBatches >= this._limit.getLimit()) {
-				throw new Error("added documents to the batch over limit size");
+/**
+ * Load a batch of documents from the Runner into the internal array
+ * 
+ * @method loadBatch
+**/
+proto.loadBatch = function loadBatch(callback) {
+	if (!this._runner) {
+		this.dispose();
+		return callback;
+	}
+	
+	this._runner.restoreState();
+
+	var self = this,
+		whileBreak = false,		// since we are in an async loop instead of a normal while loop, need to mimic the
+		whileReturn = false;	// functionality.  These flags are similar to saying 'break' or 'return' from inside the loop
+	return async.whilst(
+		function test(){
+			return !whileBreak && !whileReturn;
+		},
+		function(next) {
+			return self._runner.getNext(function(err, obj, state){
+				if (err) return next(err);
+				if (state === Runner.RunnerState.RUNNER_ADVANCED) {
+					if (self._dependencies) {
+						self._currentBatch.push(self._dependencies.extractFields(obj));
+					} else {
+						self._currentBatch.push(obj);
+					}
+
+					if (self._limit) {
+						if (++self._docsAddedToBatches === self._limit.getLimit()) {
+							whileBreak = true;
+							return next();
+						}
+						//this was originally a 'verify' in the mongo code
+						if (self._docsAddedToBatches > self._limit.getLimit()){
+							return next(new Error('documents collected past the end of the limit'));
+						}
+					}
+
+					if (self._currentBatch >= klass.MaxDocumentsToReturnToClientAtOnce) {
+						// End self batch and prepare Runner for yielding.
+						self._runner.saveState();
+						whileReturn = true;
+					}
+				} else {
+					whileBreak = true;
+				}
+				return next();
+			});
+		},
+		function(err){
+			if (!whileReturn){
+				self._runner.reset();
 			}
 			}
+			callback(err);
 		}
 		}
-
-		// Mongo uses number of bytes, but that doesn't make sense here. Yield when nDocs is over a threshold
-		if (nDocs > MAX_BATCH_DOCS) {
-			this._curIdx++; // advance the deque
-			nDocs++;
-			return;
-		}
-	}
-
-	this._cursorWithContext = undefined;	//NOTE: Trying to emulate erasing the cursor; not exactly how mongo does it
+	);
 };
 };

+ 2 - 18
lib/pipeline/documentSources/DocumentSource.js

@@ -37,22 +37,6 @@ var DocumentSource = module.exports = function DocumentSource(expCtx){
 
 
 }, klass = DocumentSource, base = Object, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
 }, klass = DocumentSource, base = Object, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
 
 
-/**
- * Use EOF as boost::none for document sources to signal the end of their document stream.
- **/
-klass.EOF = (function() {
-	/**
-	 * Represents a non-value in a document stream
-	 * @class EOF
-	 * @namespace mungedb-aggregate.pipeline.documentSources.DocumentSource
-	 * @module mungedb-aggregate
-	 * @constructor
-	 **/
-	var klass = function EOF(){
-	};
-	return klass;
-})();
-
 /*
 /*
 class DocumentSource :
 class DocumentSource :
 public IntrusiveCounterUnsigned,
 public IntrusiveCounterUnsigned,
@@ -83,7 +67,7 @@ proto.getPipelineStep = function getPipelineStep() {
 };
 };
 
 
 /**
 /**
- * Returns the next Document if there is one or DocumentSource.EOF if at EOF.
+ * Returns the next Document if there is one or null if at EOF.
  *
  *
  * some implementations do the equivalent of verify(!eof()) so check eof() first
  * some implementations do the equivalent of verify(!eof()) so check eof() first
  * @method	getNext
  * @method	getNext
@@ -206,7 +190,7 @@ proto.serializeToArray = function serializeToArray(array, explain) {
  * @method GET_NEXT_PASS_THROUGH
  * @method GET_NEXT_PASS_THROUGH
  * @param callback {Function}
  * @param callback {Function}
  * @param callback.err {Error} An error or falsey
  * @param callback.err {Error} An error or falsey
- * @param callback.doc {Object} The source's next object or DocumentSource.EOF
+ * @param callback.doc {Object} The source's next object or null
  **/
  **/
 klass.GET_NEXT_PASS_THROUGH = function GET_NEXT_PASS_THROUGH(callback) {
 klass.GET_NEXT_PASS_THROUGH = function GET_NEXT_PASS_THROUGH(callback) {
 	if (!callback) throw new Error(this.getSourceName() + ' #getNext() requires callback');
 	if (!callback) throw new Error(this.getSourceName() + ' #getNext() requires callback');

+ 2 - 2
lib/pipeline/documentSources/LimitDocumentSource.js

@@ -57,8 +57,8 @@ proto.getNext = function getNext(callback) {
 
 
 	if (++this.count > this.limit) {
 	if (++this.count > this.limit) {
 		this.source.dispose();
 		this.source.dispose();
-		callback(null, DocumentSource.EOF);
-		return DocumentSource.EOF;
+		callback(null, null);
+		return null;
 	}
 	}
 
 
 	return this.source.getNext(callback);
 	return this.source.getNext(callback);

+ 4 - 4
lib/pipeline/documentSources/MatchDocumentSource.js

@@ -50,9 +50,9 @@ proto.getNext = function getNext(callback) {
 			return self.matcher.matches(doc);
 			return self.matcher.matches(doc);
 		},
 		},
 		makeReturn = function makeReturn(doc) {
 		makeReturn = function makeReturn(doc) {
-			if(doc !== DocumentSource.EOF && test(doc)) { // Passes the match criteria
+			if(doc !== null && test(doc)) { // Passes the match criteria
 				return doc;
 				return doc;
-			} else if(doc === DocumentSource.EOF){ // Got EOF
+			} else if(doc === null){ // Got EOF
 				return doc;
 				return doc;
 			}
 			}
 			return undefined; // Didn't match, but not EOF
 			return undefined; // Didn't match, but not EOF
@@ -61,14 +61,14 @@ proto.getNext = function getNext(callback) {
 		function(cb) {
 		function(cb) {
 			self.source.getNext(function(err, doc) {
 			self.source.getNext(function(err, doc) {
 				if(err) return callback(err);
 				if(err) return callback(err);
-				if (makeReturn(doc)) {
+				if (makeReturn(doc) !== undefined) {
 					next = doc;
 					next = doc;
 				}
 				}
 				return cb();
 				return cb();
 			});
 			});
 		},
 		},
 		function() {
 		function() {
-			var foundDoc = (next === DocumentSource.EOF || next !== undefined);
+			var foundDoc = (next === null || next !== undefined);
 			return foundDoc; //keep going until doc is found
 			return foundDoc; //keep going until doc is found
 		},
 		},
 		function(err) {
 		function(err) {

+ 6 - 6
lib/pipeline/documentSources/RedactDocumentSource.js

@@ -37,17 +37,17 @@ proto.getNext = function getNext(callback) {
 		doc;
 		doc;
 	async.whilst(
 	async.whilst(
 		function() {
 		function() {
-			return doc !== DocumentSource.EOF;
+			return doc !== null;
 		},
 		},
 		function(cb) {
 		function(cb) {
 			self.source.getNext(function(err, input) {
 			self.source.getNext(function(err, input) {
 				doc = input;
 				doc = input;
-				if (input === DocumentSource.EOF)
+				if (input === null)
 					return cb();
 					return cb();
 				self._variables.setRoot(input);
 				self._variables.setRoot(input);
 				self._variables.setValue(self._currentId, input);
 				self._variables.setValue(self._currentId, input);
 				var result = self.redactObject();
 				var result = self.redactObject();
-				if (result !== DocumentSource.EOF)
+				if (result !== null)
 					return cb(result); //Using the err argument to pass the result document; this lets us break out without having EOF
 					return cb(result); //Using the err argument to pass the result document; this lets us break out without having EOF
 				return cb();
 				return cb();
 			});
 			});
@@ -55,7 +55,7 @@ proto.getNext = function getNext(callback) {
 		function(doc) {
 		function(doc) {
 			if (doc)
 			if (doc)
 				return callback(null, doc);
 				return callback(null, doc);
-			return callback(null, DocumentSource.EOF);
+			return callback(null, null);
 		}
 		}
 	);
 	);
 	return doc;
 	return doc;
@@ -79,7 +79,7 @@ proto.redactValue = function redactValue(input) {
 	} else if (input instanceof Object && input.constructor === Object) {
 	} else if (input instanceof Object && input.constructor === Object) {
 		this._variables.setValue(this._currentId, input);
 		this._variables.setValue(this._currentId, input);
 		var result = this.redactObject();
 		var result = this.redactObject();
-		if (result !== DocumentSource.EOF)
+		if (result !== null)
 			return result;
 			return result;
 		return null;
 		return null;
 	} else {
 	} else {
@@ -96,7 +96,7 @@ proto.redactObject = function redactObject() {
 	if (expressionResult === KEEP_VAL) {
 	if (expressionResult === KEEP_VAL) {
 		return this._variables.getDocument(this._currentId);
 		return this._variables.getDocument(this._currentId);
 	} else if (expressionResult === PRUNE_VAL) {
 	} else if (expressionResult === PRUNE_VAL) {
-		return DocumentSource.EOF;
+		return null;
 	} else if (expressionResult === DESCEND_VAL) {
 	} else if (expressionResult === DESCEND_VAL) {
 		var input = this._variables.getDocument(this._currentId);
 		var input = this._variables.getDocument(this._currentId);
 		var out = {};
 		var out = {};

+ 1 - 1
lib/pipeline/documentSources/SkipDocumentSource.js

@@ -89,7 +89,7 @@ proto.getNext = function getNext(callback) {
 				});
 				});
 			},
 			},
 			function() {
 			function() {
-				return self.count < self.skip || next === DocumentSource.EOF;
+				return self.count < self.skip || next === null;
 			},
 			},
 			function (err) {
 			function (err) {
 				if (err) { return callback(err); }
 				if (err) { return callback(err); }

+ 3 - 3
lib/pipeline/documentSources/UnwindDocumentSource.js

@@ -71,7 +71,7 @@ klass.Unwinder = (function() {
 	 **/
 	 **/
 	proto.getNext = function getNext() {
 	proto.getNext = function getNext() {
 		if (this._inputArray === undefined || this._index === this._inputArray.length) {
 		if (this._inputArray === undefined || this._index === this._inputArray.length) {
-			return DocumentSource.EOF;
+			return null;
 		}
 		}
 
 
 		this._document = Document.cloneDeep(this._document);
 		this._document = Document.cloneDeep(this._document);
@@ -113,7 +113,7 @@ proto.getNext = function getNext(callback) {
 
 
 	async.until(
 	async.until(
 		function () {
 		function () {
-			if (out !== DocumentSource.EOF || exhausted) {
+			if (out !== null || exhausted) {
 				return true;
 				return true;
 			}
 			}
 
 
@@ -125,7 +125,7 @@ proto.getNext = function getNext(callback) {
 					return cb(err);
 					return cb(err);
 				}
 				}
 
 
-				if (doc === DocumentSource.EOF) {
+				if (doc === null) {
 					exhausted = true;
 					exhausted = true;
 				} else {
 				} else {
 					self._unwinder.resetDocument(doc);
 					self._unwinder.resetDocument(doc);

+ 89 - 0
lib/query/ArrayRunner.js

@@ -0,0 +1,89 @@
+"use strict";
+
+var Runner = require('./Runner');
+
+/**
+ * This class is an array runner used to run a pipeline against a static array of data
+ * @param	{Array}	items	The array source of the data
+ **/
+var klass = module.exports = function ArrayRunner(array){
+	base.call(this);
+	
+	if (!array || array.constructor !== Array ) throw new Error('Array runner requires an array');
+	this._array = array;
+	this._position = 0;
+	this._state = Runner.RunnerState.RUNNER_ADVANCED;
+}, base = Runner, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
+
+/**
+ * Get the next result from the array.
+ * 
+ * @method getNext
+ * @param [callback] {Function}
+ */
+proto.getNext = function getNext(callback) {
+	var obj, err;
+	try {
+		if (this._state === Runner.RunnerState.RUNNER_ADVANCED) {
+			if (this._position < this._array.length){
+				obj = this._array[this._position++];
+			} else {
+				this._state = Runner.RunnerState.RUNNER_EOF;
+			}
+		}
+	} catch (ex) {
+		err = ex;
+		this._state = Runner.RunnerState.RUNNER_ERROR;
+	}
+	
+	return callback(err, obj, this._state);
+};
+
+/**
+ * Save any state required to yield.
+ * 
+ * @method saveState
+ */
+proto.saveState = function saveState() {
+	//nothing to do here
+};
+
+/**
+ * Restore saved state, possibly after a yield.  Return true if the runner is OK, false if
+ * it was killed.
+ * 
+ * @method restoreState
+ */
+proto.restoreState = function restoreState() {
+	//nothing to do here
+};
+
+/**
+ * Returns a description of the Runner
+ * 
+ * @method getInfo
+ * @param [explain]
+ * @param [planInfo]
+ */
+proto.getInfo = function getInfo(explain) {
+	if (explain){
+		return {
+			type: this.constructor.name,
+			nDocs: this._array.length,
+			position: this._position,
+			state: this._state
+		};
+	}
+	return undefined;
+};
+
+/**
+ * dispose of the Runner.
+ * 
+ * @method reset
+ */
+proto.reset = function reset(){
+	this._array = [];
+	this._position = 0;
+	this._state = Runner.RunnerState.RUNNER_DEAD;
+};

+ 222 - 0
lib/query/Runner.js

@@ -0,0 +1,222 @@
+"use strict";
+
+/**
+ * This class is an implementation of the base class for runners used in MongoDB
+ * 
+ * Note that a lot of stuff here is not used by our code yet.  Check the existing implementations
+ * for what we currently use
+ * 
+ **/
+var klass = module.exports = function Runner(){
+	
+}, base = Object, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
+
+klass.RunnerState = {
+	// We successfully populated the out parameter.
+	RUNNER_ADVANCED: "RUNNER_ADVANCED",
+
+	// We're EOF.  We won't return any more results (edge case exception: capped+tailable).
+	RUNNER_EOF: "RUNNER_EOF",
+
+	// We were killed or had an error.
+	RUNNER_DEAD: "RUNNER_DEAD",
+
+	// getNext was asked for data it cannot provide, or the underlying PlanStage had an
+	// unrecoverable error.
+	// If the underlying PlanStage has any information on the error, it will be available in
+	// the objOut parameter. Call WorkingSetCommon::toStatusString() to retrieve the error
+	// details from the output BSON object.
+	RUNNER_ERROR: "RUNNER_ERROR"
+};
+
+klass.YieldPolicy = {
+	// Any call to getNext() may yield.  In particular, the runner may be killed during any
+	// call to getNext().  If this occurs, getNext() will return RUNNER_DEAD.
+	//
+	// If you are enabling autoyield, you must register the Runner with ClientCursor via
+	// ClientCursor::registerRunner and deregister via ClientCursor::deregisterRunnerwhen
+	// done.  Registered runners are informed about DiskLoc deletions and Namespace
+	// invalidations and other important events.
+	//
+	// Exception: This is not required if the Runner is cached inside of a ClientCursor.
+	// This is only done if the Runner is cached and can be referred to by a cursor id.
+	// This is not a popular thing to do.
+	YIELD_AUTO: "YIELD_AUTO",
+
+	// Owner must yield manually if yields are requested.  How to yield yourself:
+	//
+	// 0. Let's say you have Runner* runner.
+	//
+	// 1. Register your runner with ClientCursor.  Registered runners are informed about
+	// DiskLoc deletions and Namespace invalidation and other important events.  Do this by
+	// calling ClientCursor::registerRunner(runner).  This could be done once when you get
+	// your runner, or per-yield.
+	//
+	// 2. Call runner->saveState() before you yield.
+	//
+	// 3. Call RunnerYieldPolicy::staticYield(runner->ns(), NULL) to yield.  Any state that
+	// may change between yields must be checked by you.  (For example, DiskLocs may not be
+	// valid across yielding, indices may be dropped, etc.)
+	//
+	// 4. Call runner->restoreState() before using the runner again.
+	//
+	// 5. Your runner's next call to getNext may return RUNNER_DEAD.
+	//
+	// 6. When you're done with your runner, deregister it from ClientCursor via
+	// ClientCursor::deregister(runner).
+	YIELD_MANUAL: "YIELD_MANUAL"
+};
+
+
+/**
+ * Set the yielding policy of the underlying runner.  See the RunnerYieldPolicy enum above.
+ * 
+ * @method setYieldPolicy
+ * @param [policy]
+ */
+proto.setYieldPolicy = function setYieldPolicy(policy) {
+	throw new Error('Not implemented');
+};
+
+/**
+ * Get the next result from the query.
+ *
+ * If objOut is not NULL, only results that have a BSONObj are returned.  The BSONObj may
+ * point to on-disk data (isOwned will be false) and must be copied by the caller before
+ * yielding.
+ *
+ * If dlOut is not NULL, only results that have a valid DiskLoc are returned.
+ *
+ * If both objOut and dlOut are not NULL, only results with both a valid BSONObj and DiskLoc
+ * will be returned.  The BSONObj is the object located at the DiskLoc provided.
+ *
+ * If the underlying query machinery produces a result that does not have the data requested
+ * by the user, it will be silently dropped.
+ *
+ * If the caller is running a query, they probably only care about the object.
+ * If the caller is an internal client, they may only care about DiskLocs (index scan), or
+ * about object + DiskLocs (collection scan).
+ *
+ * Some notes on objOut and ownership:
+ *
+ * objOut may be an owned object in certain cases: invalidation of the underlying DiskLoc,
+ * the object is created from covered index key data, the object is projected or otherwise
+ * the result of a computation.
+ *
+ * objOut will also be owned when the underlying PlanStage has provided error details in the
+ * event of a RUNNER_ERROR. Call WorkingSetCommon::toStatusString() to convert the object
+ * to a loggable format.
+ *
+ * objOut will be unowned if it's the result of a fetch or a collection scan.
+ * 
+ * @method getNext
+ * @param [callback] {Function}
+ */
+proto.getNext = function getNext(callback) {
+	throw new Error('Not implemented');
+};
+
+
+/**
+ * Will the next call to getNext() return EOF?  It's useful to know if the runner is done
+ * without having to take responsibility for a result.
+ * 
+ * @method isEOF
+ */
+proto.isEOF = function isEOF(){
+	throw new Error('Not implemented');
+};
+
+/**
+ * Inform the runner about changes to DiskLoc(s) that occur while the runner is yielded.
+ * The runner must take any actions required to continue operating correctly, including
+ * broadcasting the invalidation request to the PlanStage tree being run.
+ *
+ * Called from CollectionCursorCache::invalidateDocument.
+ *
+ * See db/invalidation_type.h for InvalidationType.
+ * 
+ * @method invalidate
+ * @param [dl]
+ * @param [type]
+ */
+proto.invalidate = function invalidate(dl, type) {
+	throw new Error('Not implemented');
+};
+
+/**
+ * Mark the Runner as no longer valid.  Can happen when a runner yields and the underlying
+ * database is dropped/indexes removed/etc.  All future to calls to getNext return
+ * RUNNER_DEAD. Every other call is a NOOP.
+ *
+ * The runner must guarantee as a postcondition that future calls to collection() will
+ * return NULL.
+ * 
+ * @method kill
+ */
+proto.kill = function kill() {
+	throw new Error('Not implemented');
+};
+
+/**
+ * Save any state required to yield.
+ * 
+ * @method saveState
+ */
+proto.saveState = function saveState() {
+	throw new Error('Not implemented');
+};
+
+/**
+ * Restore saved state, possibly after a yield.  Return true if the runner is OK, false if
+ * it was killed.
+ * 
+ * @method restoreState
+ */
+proto.restoreState = function restoreState() {
+	throw new Error('Not implemented');
+};
+
+/**
+ * Return the NS that the query is running over.
+ * 
+ * @method ns
+ */
+proto.ns = function ns() {
+	throw new Error('Not implemented');
+};
+
+/**
+ * Return the Collection that the query is running over.
+ * 
+ * @method collection
+ */
+proto.collection = function collection() {
+	throw new Error('Not implemented');
+};
+
+/**
+ * Returns OK, allocating and filling '*explain' or '*planInfo' with a description of the
+ * chosen plan, depending on which is non-NULL (one of the two should be NULL). Caller
+ * takes onwership of either '*explain' and '*planInfo'. Otherwise, returns false
+ * a detailed error status.
+ *
+ * If 'explain' is NULL, then this out-parameter is ignored. Similarly, if 'staticInfo'
+ * is NULL, then no static debug information is produced.
+ * 
+ * @method getInfo
+ * @param [explain]
+ * @param [planInfo]
+ */
+proto.getInfo = function getInfo(explain, planInfo) {
+	throw new Error('Not implemented');
+};
+
+/**
+ * dispose of the Runner.
+ * 
+ * @method reset
+ */
+proto.reset = function reset(){
+	throw new Error('Not implemented');
+};

+ 5 - 0
lib/query/index.js

@@ -0,0 +1,5 @@
+"use strict";
+module.exports = {
+	Runner: require("./Runner.js"),
+	ArrayRunner: require("./ArrayRunner.js")
+};

+ 0 - 93
test/lib/Cursor.js

@@ -1,93 +0,0 @@
-"use strict";
-var assert = require("assert"),
-	Cursor = require("../../lib/Cursor");
-
-module.exports = {
-
-	"Cursor": {
-
-		"constructor(data)": {
-			"should throw an exception if it does not get a valid array or stream": function(){
-				assert.throws(function(){
-					var c = new Cursor();
-				});
-				assert.throws(function(){
-					var c = new Cursor(5);
-				});
-			}
-		},
-
-		"#ok": {
-			"should return true if there is still data in the array": function(){
-				var c = new Cursor([1,2,3,4,5]);
-				assert.equal(c.ok(), true);
-			},
-			"should return false if there is no data left in the array": function(){
-				var c = new Cursor([]);
-				assert.equal(c.ok(), false);
-			},
-			"should return true if there is no data left in the array, but there is still a current value": function(){
-				var c = new Cursor([1,2]);
-				c.advance();
-				c.advance();
-				assert.equal(c.ok(), true);
-				c.advance();
-				assert.equal(c.ok(), false);
-			}
-//			,
-//			"should return true if there is still data in the stream": function(){
-//				
-//			},
-//			"should return false if there is no data left in the stream": function(){
-//				
-//			}
-
-		},
-		
-		"#advance": {
-			"should return true if there is still data in the array": function(){
-				var c = new Cursor([1,2,3,4,5]);
-				assert.equal(c.advance(), true);
-			},
-			"should return false if there is no data left in the array": function(){
-				var c = new Cursor([1]);
-				c.advance();
-				assert.equal(c.advance(), false);
-			},
-			"should update the current object to the next item in the array": function(){
-				var c = new Cursor([1,"2"]);
-				c.advance();
-				assert.strictEqual(c.current(), 1);
-				c.advance();
-				assert.strictEqual(c.current(), "2");
-				c.advance();
-				assert.strictEqual(c.current(), undefined);
-			}
-//,			"should return true if there is still data in the stream": function(){
-//				
-//			},
-//			"should return false if there is no data left in the stream": function(){
-//				
-//			},
-//			"should update the current object to the next item in the stream": function(){
-//				
-//			}
-		},
-		
-		"#current": {
-			"should return the first value if the cursor has not been advanced yet": function(){
-				var c = new Cursor([1,2,3,4,5]);
-				assert.equal(c.current(), 1);
-			},
-			"should return the first value if the cursor has been advanced once": function(){
-				var c = new Cursor([1,2,3,4,5]);
-				c.advance();
-				assert.equal(c.current(), 1);
-			}
-		}
-
-	}
-
-};
-
-if (!module.parent)(new(require("mocha"))()).ui("exports").reporter("spec").addFile(__filename).run();

+ 27 - 89
test/lib/pipeline/documentSources/CursorDocumentSource.js

@@ -5,14 +5,11 @@ var assert = require("assert"),
 	CursorDocumentSource = require("../../../../lib/pipeline/documentSources/CursorDocumentSource"),
 	CursorDocumentSource = require("../../../../lib/pipeline/documentSources/CursorDocumentSource"),
 	LimitDocumentSource = require("../../../../lib/pipeline/documentSources/LimitDocumentSource"),
 	LimitDocumentSource = require("../../../../lib/pipeline/documentSources/LimitDocumentSource"),
 	SkipDocumentSource = require("../../../../lib/pipeline/documentSources/SkipDocumentSource"),
 	SkipDocumentSource = require("../../../../lib/pipeline/documentSources/SkipDocumentSource"),
-	Cursor = require("../../../../lib/Cursor");
-
-var getCursor = function(values) {
-	if (!values)
-		values = [1,2,3,4,5];
-	var cwc = new CursorDocumentSource.CursorWithContext();
-	cwc._cursor = new Cursor( values );
-	return new CursorDocumentSource(cwc);
+	ArrayRunner = require("../../../../lib/query/ArrayRunner");
+
+var getCursorDocumentSource = function(values) {
+	values = values || [1,2,3,4,5];
+	return new CursorDocumentSource(null, new ArrayRunner(values), null);
 };
 };
 
 
 
 
@@ -21,24 +18,15 @@ module.exports = {
 	"CursorDocumentSource": {
 	"CursorDocumentSource": {
 
 
 		"constructor(data)": {
 		"constructor(data)": {
-			"should fail if CursorWithContext is not provided": function(){
-				assert.throws(function(){
-					var cds = new CursorDocumentSource();
-				});
-			},
 			"should get a accept a CursorWithContext and set it internally": function(){
 			"should get a accept a CursorWithContext and set it internally": function(){
-				var cwc = new CursorDocumentSource.CursorWithContext();
-				cwc._cursor = new Cursor( [] );
-
-				var cds = new CursorDocumentSource(cwc);
-
-				assert.ok(cds._cursorWithContext);
+				var cds = getCursorDocumentSource([]);
+				assert.ok(cds._runner);
 			}
 			}
 		},
 		},
 
 
 		"#coalesce": {
 		"#coalesce": {
 			"should be able to coalesce a limit into itself": function (){
 			"should be able to coalesce a limit into itself": function (){
-				var cds = getCursor(),
+				var cds = getCursorDocumentSource(),
 					lds = LimitDocumentSource.createFromJson(2);
 					lds = LimitDocumentSource.createFromJson(2);
 
 
 				assert.equal(cds.coalesce(lds) instanceof LimitDocumentSource, true);
 				assert.equal(cds.coalesce(lds) instanceof LimitDocumentSource, true);
@@ -46,7 +34,7 @@ module.exports = {
 			},
 			},
 
 
 			"should keep original limit if coalesced to a larger limit": function() {
 			"should keep original limit if coalesced to a larger limit": function() {
-				var cds = getCursor();
+				var cds = getCursorDocumentSource();
 				cds.coalesce(LimitDocumentSource.createFromJson(2));
 				cds.coalesce(LimitDocumentSource.createFromJson(2));
 				cds.coalesce(LimitDocumentSource.createFromJson(3));
 				cds.coalesce(LimitDocumentSource.createFromJson(3));
 				assert.equal(cds.getLimit(), 2);
 				assert.equal(cds.getLimit(), 2);
@@ -54,7 +42,7 @@ module.exports = {
 
 
 
 
 			"cursor only returns $limit number when coalesced": function(next) {
 			"cursor only returns $limit number when coalesced": function(next) {
-				var cds = getCursor(),
+				var cds = getCursorDocumentSource(),
 					lds = LimitDocumentSource.createFromJson(2);
 					lds = LimitDocumentSource.createFromJson(2);
 
 
 
 
@@ -69,40 +57,29 @@ module.exports = {
 						});
 						});
 					},
 					},
 					function() {
 					function() {
-						return docs[i++] !== DocumentSource.EOF;
+						return docs[i++] !== null;
 					},
 					},
 					function(err) {
 					function(err) {
-						assert.deepEqual([1, 2, DocumentSource.EOF], docs);
+						if (err) throw err;
+						assert.deepEqual([1, 2, null], docs);
 						next();
 						next();
 					}
 					}
 				);
 				);
 			},
 			},
 
 
 			"should leave non-limit alone": function () {
 			"should leave non-limit alone": function () {
-				var cwc = new CursorDocumentSource.CursorWithContext();
-				cwc._cursor = new Cursor( [] );
-
 				var sds = new SkipDocumentSource(),
 				var sds = new SkipDocumentSource(),
-					cds = new CursorDocumentSource(cwc);
+					cds = getCursorDocumentSource([]);
 
 
 				assert.equal(cds.coalesce(sds), false);
 				assert.equal(cds.coalesce(sds), false);
 			}
 			}
 		},
 		},
 
 
 		"#getNext": {
 		"#getNext": {
-			"should throw an error if no callback is given": function() {
-				var cwc = new CursorDocumentSource.CursorWithContext();
-				cwc._cursor = new Cursor( [1,2,3,4] );
-				var cds = new CursorDocumentSource(cwc);
-				assert.throws(cds.getNext.bind(cds));
-			},
-
 			"should return the current cursor value async": function(next){
 			"should return the current cursor value async": function(next){
 				var expected = JSON.stringify([1,2]);
 				var expected = JSON.stringify([1,2]);
-				var cwc = new CursorDocumentSource.CursorWithContext();
-				cwc._cursor = new Cursor( [1,2,3,4] );
 
 
-				var cds = new CursorDocumentSource(cwc);
+				var cds = getCursorDocumentSource([1,2,3,4]);
 				async.series([
 				async.series([
 						cds.getNext.bind(cds),
 						cds.getNext.bind(cds),
 						cds.getNext.bind(cds),
 						cds.getNext.bind(cds),
@@ -111,18 +88,16 @@ module.exports = {
 						cds.getNext.bind(cds),
 						cds.getNext.bind(cds),
 					],
 					],
 					function(err,res) {
 					function(err,res) {
-						assert.deepEqual([1,2,3,4,DocumentSource.EOF], res);
+						assert.deepEqual([1,2,3,4,null], res);
 						next();
 						next();
 					}
 					}
 				);
 				);
 			},
 			},
 			"should return values past the batch limit": function(next){
 			"should return values past the batch limit": function(next){
-				var cwc = new CursorDocumentSource.CursorWithContext(),
-					n = 0,
+				var n = 0,
 					arr = Array.apply(0, new Array(200)).map(function() { return n++; });
 					arr = Array.apply(0, new Array(200)).map(function() { return n++; });
-				cwc._cursor = new Cursor( arr );
 
 
-				var cds = new CursorDocumentSource(cwc);
+				var cds = getCursorDocumentSource(arr);
 				async.each(arr,
 				async.each(arr,
 					function(a,next) {
 					function(a,next) {
 						cds.getNext(function(err,val) {
 						cds.getNext(function(err,val) {
@@ -135,25 +110,24 @@ module.exports = {
 					}
 					}
 				);
 				);
 				cds.getNext(function(err,val) {
 				cds.getNext(function(err,val) {
-					assert.equal(val, DocumentSource.EOF);
+					assert.equal(val, null);
 					next();
 					next();
 				});
 				});
 			},
 			},
 		},
 		},
 		"#dispose": {
 		"#dispose": {
 			"should empty the current cursor": function(next){
 			"should empty the current cursor": function(next){
-				var cwc = new CursorDocumentSource.CursorWithContext();
-				cwc._cursor = new Cursor( [1,2,3] );
-
-				var cds = new CursorDocumentSource(cwc);
+				var cds = getCursorDocumentSource();
 				async.series([
 				async.series([
 						cds.getNext.bind(cds),
 						cds.getNext.bind(cds),
 						cds.getNext.bind(cds),
 						cds.getNext.bind(cds),
-						cds.getNext.bind(cds),
-						cds.getNext.bind(cds),
+						function(next){
+							cds.dispose();
+							return cds.getNext(next);
+						}
 					],
 					],
 					function(err,res) {
 					function(err,res) {
-						assert.deepEqual([1,2,3,DocumentSource.EOF], res);
+						assert.deepEqual([1,2,null], res);
 						next();
 						next();
 					}
 					}
 				);
 				);
@@ -163,46 +137,10 @@ module.exports = {
 		"#setProjection": {
 		"#setProjection": {
 
 
 			"should set a projection": function() {
 			"should set a projection": function() {
-				var cwc = new CursorDocumentSource.CursorWithContext();
-				cwc._cursor = new Cursor( [1,2,3] );
-
-				var cds = new CursorDocumentSource(cwc);
+				var cds = getCursorDocumentSource();
 				cds.setProjection({a:1}, {a:true});
 				cds.setProjection({a:1}, {a:true});
 				assert.deepEqual(cds._projection, {a:1});
 				assert.deepEqual(cds._projection, {a:1});
 				assert.deepEqual(cds._dependencies, {a:true});
 				assert.deepEqual(cds._dependencies, {a:true});
-			},
-
-			"should throw an error if projection is already set": function (){
-				var cwc = new CursorDocumentSource.CursorWithContext();
-				cwc._cursor = new Cursor( [1,2,3] );
-
-				var cds = new CursorDocumentSource(cwc);
-				cds.setProjection({a:1}, {});
-				assert.throws(function() {
-					cds.setProjection({a:1}, {});
-				});
-			},
-
-			"should project properly": function(next) {
-				var cwc = new CursorDocumentSource.CursorWithContext();
-				cwc._cursor = new Cursor( [{a:1},{a:2,b:3},{c:4,d:5}] );
-
-				var cds = new CursorDocumentSource(cwc);
-				cds.setProjection({a:1}, {a:true});
-				assert.deepEqual(cds._projection, {a:1});
-				assert.deepEqual(cds._dependencies, {a:true});
-
-				async.series([
-						cds.getNext.bind(cds),
-						cds.getNext.bind(cds),
-						cds.getNext.bind(cds),
-						cds.getNext.bind(cds),
-					],
-					function(err,res) {
-						assert.deepEqual([{a:1},{a:2},{},DocumentSource.EOF], res);
-						next();
-					}
-				);
 			}
 			}
 
 
 		}
 		}
@@ -211,4 +149,4 @@ module.exports = {
 
 
 };
 };
 
 
-if (!module.parent)(new(require("mocha"))()).ui("exports").reporter("spec").addFile(__filename).run();
+if (!module.parent)(new(require("mocha"))()).ui("exports").reporter("spec").addFile(__filename).grep(process.env.MOCHA_GREP || '').run(process.exit);

+ 7 - 6
test/lib/pipeline/documentSources/GeoNearDocumentSource.js

@@ -3,13 +3,17 @@ var assert = require("assert"),
 	DocumentSource = require("../../../../lib/pipeline/documentSources/DocumentSource"),
 	DocumentSource = require("../../../../lib/pipeline/documentSources/DocumentSource"),
 	GeoNearDocumentSource = require("../../../../lib/pipeline/documentSources/GeoNearDocumentSource"),
 	GeoNearDocumentSource = require("../../../../lib/pipeline/documentSources/GeoNearDocumentSource"),
 	CursorDocumentSource = require("../../../../lib/pipeline/documentSources/CursorDocumentSource"),
 	CursorDocumentSource = require("../../../../lib/pipeline/documentSources/CursorDocumentSource"),
-	Cursor = require("../../../../lib/Cursor"),
+	ArrayRunner = require("../../../../lib/query/ArrayRunner"),
 	FieldPath = require("../../../../lib/pipeline/FieldPath");
 	FieldPath = require("../../../../lib/pipeline/FieldPath");
 
 
 var createGeoNear = function(ctx) {
 var createGeoNear = function(ctx) {
 	var ds = new GeoNearDocumentSource(ctx);
 	var ds = new GeoNearDocumentSource(ctx);
 	return ds;
 	return ds;
 };
 };
+var addSource = function addSource(ds, data) {
+	var cds = new CursorDocumentSource(null, new ArrayRunner(data), null);
+	ds.setSource(cds);
+};
 
 
 module.exports = {
 module.exports = {
 
 
@@ -55,14 +59,11 @@ module.exports = {
 		"#setSource()":{
 		"#setSource()":{
 
 
 			"check that setting source of GeoNearDocumentSource throws error":function() {
 			"check that setting source of GeoNearDocumentSource throws error":function() {
-				var cwc = new CursorDocumentSource.CursorWithContext();
 				var input = [{}];
 				var input = [{}];
-				cwc._cursor = new Cursor( input );
-				var cds = new CursorDocumentSource(cwc);
 				var gnds = createGeoNear();
 				var gnds = createGeoNear();
 
 
 				assert.throws(function(){
 				assert.throws(function(){
-					gnds.setSource(cds);
+					addSource(gnds, input);
 				});
 				});
 			}
 			}
 
 
@@ -95,4 +96,4 @@ module.exports = {
 	}
 	}
 };
 };
 
 
-if (!module.parent)(new(require("mocha"))()).ui("exports").reporter("spec").addFile(__filename).grep(process.env.MOCHA_GREP || '').run(process.exit);
+if (!module.parent)(new(require("mocha"))()).ui("exports").reporter("spec").addFile(__filename).grep(process.env.MOCHA_GREP || '').run(process.exit);

+ 12 - 23
test/lib/pipeline/documentSources/LimitDocumentSource.js

@@ -1,8 +1,14 @@
 "use strict";
 "use strict";
 var assert = require("assert"),
 var assert = require("assert"),
 	DocumentSource = require("../../../../lib/pipeline/documentSources/DocumentSource"),
 	DocumentSource = require("../../../../lib/pipeline/documentSources/DocumentSource"),
-	LimitDocumentSource = require("../../../../lib/pipeline/documentSources/LimitDocumentSource");
+	LimitDocumentSource = require("../../../../lib/pipeline/documentSources/LimitDocumentSource"),
+	CursorDocumentSource = require("../../../../lib/pipeline/documentSources/CursorDocumentSource"),
+	ArrayRunner = require("../../../../lib/query/ArrayRunner");
 
 
+var addSource = function addSource(ds, data) {
+	var cds = new CursorDocumentSource(null, new ArrayRunner(data), null);
+	ds.setSource(cds);
+};
 
 
 module.exports = {
 module.exports = {
 
 
@@ -73,7 +79,7 @@ module.exports = {
 			"should return the current document source": function currSource(next){
 			"should return the current document source": function currSource(next){
 				var lds = new LimitDocumentSource({"$limit":[{"a":1},{"a":2}]});
 				var lds = new LimitDocumentSource({"$limit":[{"a":1},{"a":2}]});
 				lds.limit = 1;
 				lds.limit = 1;
-				lds.source = {getNext:function(cb){cb(null,{ item:1 });}};
+				addSource(lds, [{item:1}]);
 				lds.getNext(function(err,val) {
 				lds.getNext(function(err,val) {
 					assert.deepEqual(val, { item:1 });
 					assert.deepEqual(val, { item:1 });
 					return next();
 					return next();
@@ -84,19 +90,10 @@ module.exports = {
 			"should return EOF for no sources remaining": function noMoar(next){
 			"should return EOF for no sources remaining": function noMoar(next){
 				var lds = new LimitDocumentSource({"$match":[{"a":1},{"a":1}]});
 				var lds = new LimitDocumentSource({"$match":[{"a":1},{"a":1}]});
 				lds.limit = 1;
 				lds.limit = 1;
-				lds.source = {
-					calls: 0,
-					getNext:function(cb) {
-						if (lds.source.calls)
-							return cb(null,DocumentSource.EOF);
-						lds.source.calls++;
-						return cb(null,{item:1});
-					},
-					dispose:function() { return true; }
-				};
+				addSource(lds, [{item:1}]);
 				lds.getNext(function(){});
 				lds.getNext(function(){});
 				lds.getNext(function(err,val) {
 				lds.getNext(function(err,val) {
-					assert.strictEqual(val, DocumentSource.EOF);
+					assert.strictEqual(val, null);
 					return next();
 					return next();
 				});
 				});
 			},
 			},
@@ -104,18 +101,10 @@ module.exports = {
 			"should return EOF if we hit our limit": function noMoar(next){
 			"should return EOF if we hit our limit": function noMoar(next){
 				var lds = new LimitDocumentSource();
 				var lds = new LimitDocumentSource();
 				lds.limit = 1;
 				lds.limit = 1;
-				lds.source = {
-					calls: 0,
-					getNext:function(cb) {
-						if (lds.source.calls)
-							return cb(null,DocumentSource.EOF);
-						return cb(null,{item:1});
-					},
-					dispose:function() { return true; }
-				};
+				addSource(lds, [{item:1},{item:2}]);
 				lds.getNext(function(){});
 				lds.getNext(function(){});
 				lds.getNext(function (err,val) {
 				lds.getNext(function (err,val) {
-					assert.strictEqual(val, DocumentSource.EOF);
+					assert.strictEqual(val, null);
 					return next();
 					return next();
 				});
 				});
 			}
 			}

+ 15 - 33
test/lib/pipeline/documentSources/MatchDocumentSource.js

@@ -2,13 +2,18 @@
 var assert = require("assert"),
 var assert = require("assert"),
 	async = require("async"),
 	async = require("async"),
 	DocumentSource = require("../../../../lib/pipeline/documentSources/DocumentSource"),
 	DocumentSource = require("../../../../lib/pipeline/documentSources/DocumentSource"),
-	MatchDocumentSource = require("../../../../lib/pipeline/documentSources/MatchDocumentSource");
+	MatchDocumentSource = require("../../../../lib/pipeline/documentSources/MatchDocumentSource"),
+	CursorDocumentSource = require("../../../../lib/pipeline/documentSources/CursorDocumentSource"),
+	ArrayRunner = require("../../../../lib/query/ArrayRunner");
 
 
 var testRedactSafe = function testRedactSafe(input, safePortion) {
 var testRedactSafe = function testRedactSafe(input, safePortion) {
 	var match = MatchDocumentSource.createFromJson(input);
 	var match = MatchDocumentSource.createFromJson(input);
 	assert.deepEqual(match.redactSafePortion(), safePortion);
 	assert.deepEqual(match.redactSafePortion(), safePortion);
 };
 };
-
+var addSource = function addSource(match, data) {
+	var cds = new CursorDocumentSource(null, new ArrayRunner(data), null);
+	match.setSource(cds);
+};
 
 
 module.exports = {
 module.exports = {
 
 
@@ -67,7 +72,7 @@ module.exports = {
 
 
 			"should return the current document source": function currSource(next){
 			"should return the current document source": function currSource(next){
 				var mds = new MatchDocumentSource({item: 1});
 				var mds = new MatchDocumentSource({item: 1});
-				mds.source = {getNext:function(cb){cb(null,{ item:1 });}};
+				addSource(mds, [{ item:1 }]);
 				mds.getNext(function(err,val) {
 				mds.getNext(function(err,val) {
 					assert.deepEqual(val, { item:1 });
 					assert.deepEqual(val, { item:1 });
 					next();
 					next();
@@ -77,16 +82,9 @@ module.exports = {
 			"should return matched sources remaining": function (next){
 			"should return matched sources remaining": function (next){
 				var mds = new MatchDocumentSource({ item: {$lt: 5} }),
 				var mds = new MatchDocumentSource({ item: {$lt: 5} }),
 					items = [ 1,2,3,4,5,6,7,8,9 ];
 					items = [ 1,2,3,4,5,6,7,8,9 ];
-				mds.source = {
-					calls: 0,
-					getNext:function(cb) {
-						if (this.calls >= items.length)
-							return cb(null,DocumentSource.EOF);
-						return cb(null,{item: items[this.calls++]});
-					},
-					dispose:function() { return true; }
-				};
+				addSource(mds, items.map(function(i){return {item:i};}));
 
 
+				debugger;
 				async.series([
 				async.series([
 						mds.getNext.bind(mds),
 						mds.getNext.bind(mds),
 						mds.getNext.bind(mds),
 						mds.getNext.bind(mds),
@@ -95,7 +93,7 @@ module.exports = {
 						mds.getNext.bind(mds),
 						mds.getNext.bind(mds),
 					],
 					],
 					function(err,res) {
 					function(err,res) {
-						assert.deepEqual([{item:1},{item:2},{item:3},{item:4},DocumentSource.EOF], res);
+						assert.deepEqual([{item:1},{item:2},{item:3},{item:4},null], res);
 						next();
 						next();
 					}
 					}
 				);
 				);
@@ -104,15 +102,7 @@ module.exports = {
 			"should not return matched out documents for sources remaining": function (next){
 			"should not return matched out documents for sources remaining": function (next){
 				var mds = new MatchDocumentSource({ item: {$gt: 5} }),
 				var mds = new MatchDocumentSource({ item: {$gt: 5} }),
 					items = [ 1,2,3,4,5,6,7,8,9 ];
 					items = [ 1,2,3,4,5,6,7,8,9 ];
-				mds.source = {
-					calls: 0,
-					getNext:function(cb) {
-						if (this.calls >= items.length)
-							return cb(null,DocumentSource.EOF);
-						return cb(null,{item: items[this.calls++]});
-					},
-					dispose:function() { return true; }
-				};
+				addSource(mds, items.map(function(i){return {item:i};}));
 
 
 				async.series([
 				async.series([
 						mds.getNext.bind(mds),
 						mds.getNext.bind(mds),
@@ -122,7 +112,7 @@ module.exports = {
 						mds.getNext.bind(mds),
 						mds.getNext.bind(mds),
 					],
 					],
 					function(err,res) {
 					function(err,res) {
-						assert.deepEqual([{item:6},{item:7},{item:8},{item:9},DocumentSource.EOF], res);
+						assert.deepEqual([{item:6},{item:7},{item:8},{item:9},null], res);
 						next();
 						next();
 					}
 					}
 				);
 				);
@@ -131,21 +121,13 @@ module.exports = {
 			"should return EOF for no sources remaining": function (next){
 			"should return EOF for no sources remaining": function (next){
 				var mds = new MatchDocumentSource({ item: {$gt: 5} }),
 				var mds = new MatchDocumentSource({ item: {$gt: 5} }),
 					items = [ ];
 					items = [ ];
-				mds.source = {
-					calls: 0,
-					getNext:function(cb) {
-						if (this.calls >= items.length)
-							return cb(null,DocumentSource.EOF);
-						return cb(null,{item: items[this.calls++]});
-					},
-					dispose:function() { return true; }
-				};
+				addSource(mds, items.map(function(i){return {item:i};}));
 
 
 				async.series([
 				async.series([
 						mds.getNext.bind(mds),
 						mds.getNext.bind(mds),
 					],
 					],
 					function(err,res) {
 					function(err,res) {
-						assert.deepEqual([DocumentSource.EOF], res);
+						assert.deepEqual([null], res);
 						next();
 						next();
 					}
 					}
 				);
 				);

+ 9 - 11
test/lib/pipeline/documentSources/OutDocumentSource.js

@@ -4,12 +4,16 @@ var assert = require("assert"),
 	DocumentSource = require("../../../../lib/pipeline/documentSources/DocumentSource"),
 	DocumentSource = require("../../../../lib/pipeline/documentSources/DocumentSource"),
 	OutDocumentSource = require("../../../../lib/pipeline/documentSources/OutDocumentSource"),
 	OutDocumentSource = require("../../../../lib/pipeline/documentSources/OutDocumentSource"),
 	CursorDocumentSource = require("../../../../lib/pipeline/documentSources/CursorDocumentSource"),
 	CursorDocumentSource = require("../../../../lib/pipeline/documentSources/CursorDocumentSource"),
-	Cursor = require("../../../../lib/Cursor");
+	ArrayRunner = require("../../../../lib/query/ArrayRunner");
 
 
 var createOut = function(ctx) {
 var createOut = function(ctx) {
 	var ds = new OutDocumentSource(ctx);
 	var ds = new OutDocumentSource(ctx);
 	return ds;
 	return ds;
 };
 };
+var addSource = function addSource(ds, data) {
+	var cds = new CursorDocumentSource(null, new ArrayRunner(data), null);
+	ds.setSource(cds);
+};
 
 
 module.exports = {
 module.exports = {
 
 
@@ -43,12 +47,9 @@ module.exports = {
 
 
 			"should act as passthrough (for now)": function(next) {
 			"should act as passthrough (for now)": function(next) {
 				var ods = OutDocumentSource.createFromJson("test"),
 				var ods = OutDocumentSource.createFromJson("test"),
-					cwc = new CursorDocumentSource.CursorWithContext(),
 					l = [{_id:0,a:[{b:1},{b:2}]}, {_id:1,a:[{b:1},{b:1}]} ];
 					l = [{_id:0,a:[{b:1},{b:2}]}, {_id:1,a:[{b:1},{b:1}]} ];
 
 
-				cwc._cursor = new Cursor( l );
-				var cds = new CursorDocumentSource(cwc);
-				ods.setSource(cds);
+				addSource(ods, l);
 
 
 				var docs = [], i = 0;
 				var docs = [], i = 0;
 				async.doWhilst(
 				async.doWhilst(
@@ -59,10 +60,10 @@ module.exports = {
 						});
 						});
 					},
 					},
 					function() {
 					function() {
-						return docs[i++] !== DocumentSource.EOF;
+						return docs[i++] !== null;
 					},
 					},
 					function(err) {
 					function(err) {
-						assert.deepEqual([{_id:0,a:[{b:1},{b:2}]}, {_id:1,a:[{b:1},{b:1}]}, DocumentSource.EOF], docs);
+						assert.deepEqual([{_id:0,a:[{b:1},{b:2}]}, {_id:1,a:[{b:1},{b:1}]}, null], docs);
 						next();
 						next();
 					}
 					}
 				);
 				);
@@ -83,13 +84,10 @@ module.exports = {
 		"#serialize()":{
 		"#serialize()":{
 
 
 			"serialize":function() {
 			"serialize":function() {
-				var cwc = new CursorDocumentSource.CursorWithContext();
 				var input = [{_id: 0, a: 1}, {_id: 1, a: 2}];
 				var input = [{_id: 0, a: 1}, {_id: 1, a: 2}];
-				cwc._cursor = new Cursor( input );
-				var cds = new CursorDocumentSource(cwc);
 				var title = "CognitiveScientists";
 				var title = "CognitiveScientists";
 				var ods = OutDocumentSource.createFromJson(title);
 				var ods = OutDocumentSource.createFromJson(title);
-				ods.setSource(cds);
+				addSource(ods, input);
 				var srcNm = ods.getSourceName();
 				var srcNm = ods.getSourceName();
 				var serialize = {};
 				var serialize = {};
 				serialize[srcNm] = title;
 				serialize[srcNm] = title;

+ 9 - 13
test/lib/pipeline/documentSources/RedactDocumentSource.js

@@ -4,7 +4,7 @@ var assert = require("assert"),
 	DocumentSource = require("../../../../lib/pipeline/documentSources/DocumentSource"),
 	DocumentSource = require("../../../../lib/pipeline/documentSources/DocumentSource"),
 	RedactDocumentSource = require("../../../../lib/pipeline/documentSources/RedactDocumentSource"),
 	RedactDocumentSource = require("../../../../lib/pipeline/documentSources/RedactDocumentSource"),
 	CursorDocumentSource = require("../../../../lib/pipeline/documentSources/CursorDocumentSource"),
 	CursorDocumentSource = require("../../../../lib/pipeline/documentSources/CursorDocumentSource"),
-	Cursor = require("../../../../lib/Cursor"),
+	ArrayRunner = require("../../../../lib/query/ArrayRunner"),
 	Expressions = require("../../../../lib/pipeline/expressions");
 	Expressions = require("../../../../lib/pipeline/expressions");
 
 
 var exampleRedact = {$cond:{
 var exampleRedact = {$cond:{
@@ -15,9 +15,7 @@ var exampleRedact = {$cond:{
 
 
 var createCursorDocumentSource = function createCursorDocumentSource (input) {
 var createCursorDocumentSource = function createCursorDocumentSource (input) {
 	if (!input || input.constructor !== Array) throw new Error('invalid');
 	if (!input || input.constructor !== Array) throw new Error('invalid');
-	var cwc = new CursorDocumentSource.CursorWithContext();
-	cwc._cursor = new Cursor(input);
-	return new CursorDocumentSource(cwc);
+	return new CursorDocumentSource(null, new ArrayRunner(input), null);
 };
 };
 
 
 var createRedactDocumentSource = function createRedactDocumentSource (src, expression) {
 var createRedactDocumentSource = function createRedactDocumentSource (src, expression) {
@@ -55,29 +53,27 @@ module.exports = {
 				var rds = RedactDocumentSource.createFromJson(exampleRedact);
 				var rds = RedactDocumentSource.createFromJson(exampleRedact);
 				rds.setSource({
 				rds.setSource({
 					getNext: function getNext(cb) {
 					getNext: function getNext(cb) {
-						return cb(null, DocumentSource.EOF);
+						return cb(null, null);
 					}
 					}
 				});
 				});
 				rds.getNext(function(err, doc) {
 				rds.getNext(function(err, doc) {
-					assert.equal(DocumentSource.EOF, doc);
+					assert.equal(null, doc);
 					next();
 					next();
 				});
 				});
 			},
 			},
 
 
 			"iterator state accessors consistently report the source is exhausted": function assertExhausted() {
 			"iterator state accessors consistently report the source is exhausted": function assertExhausted() {
-				var cwc = new CursorDocumentSource.CursorWithContext();
 				var input = [{}];
 				var input = [{}];
-				cwc._cursor = new Cursor( input );
-				var cds = new CursorDocumentSource(cwc);
+				var cds = createCursorDocumentSource(input);
 				var rds = RedactDocumentSource.createFromJson(exampleRedact);
 				var rds = RedactDocumentSource.createFromJson(exampleRedact);
 				rds.setSource(cds);
 				rds.setSource(cds);
 				rds.getNext(function(err, actual) {
 				rds.getNext(function(err, actual) {
 					rds.getNext(function(err, actual1) {
 					rds.getNext(function(err, actual1) {
-						assert.equal(DocumentSource.EOF, actual1);
+						assert.equal(null, actual1);
 						rds.getNext(function(err, actual2) {
 						rds.getNext(function(err, actual2) {
-							assert.equal(DocumentSource.EOF, actual2);
+							assert.equal(null, actual2);
 							rds.getNext(function(err, actual3) {
 							rds.getNext(function(err, actual3) {
-								assert.equal(DocumentSource.EOF, actual3);
+								assert.equal(null, actual3);
 							});
 							});
 						});
 						});
 					});
 					});
@@ -235,4 +231,4 @@ module.exports = {
 
 
 };
 };
 
 
-if (!module.parent)(new(require("mocha"))()).ui("exports").reporter("spec").addFile(__filename).grep(process.env.MOCHA_GREP || '').run(process.exit);
+if (!module.parent)(new(require("mocha"))()).ui("exports").reporter("spec").addFile(__filename).grep(process.env.MOCHA_GREP || '').run(process.exit);

+ 17 - 28
test/lib/pipeline/documentSources/SkipDocumentSource.js

@@ -1,10 +1,15 @@
 "use strict";
 "use strict";
 var assert = require("assert"),
 var assert = require("assert"),
 	async = require("async"),
 	async = require("async"),
-	Cursor = require("../../../../lib/Cursor"),
 	DocumentSource = require("../../../../lib/pipeline/documentSources/DocumentSource"),
 	DocumentSource = require("../../../../lib/pipeline/documentSources/DocumentSource"),
+	SkipDocumentSource = require("../../../../lib/pipeline/documentSources/SkipDocumentSource"),
 	CursorDocumentSource = require("../../../../lib/pipeline/documentSources/CursorDocumentSource"),
 	CursorDocumentSource = require("../../../../lib/pipeline/documentSources/CursorDocumentSource"),
-	SkipDocumentSource = require("../../../../lib/pipeline/documentSources/SkipDocumentSource");
+	ArrayRunner = require("../../../../lib/query/ArrayRunner");
+
+var addSource = function addSource(ds, data) {
+	var cds = new CursorDocumentSource(null, new ArrayRunner(data), null);
+	ds.setSource(cds);
+};
 
 
 
 
 module.exports = {
 module.exports = {
@@ -84,19 +89,15 @@ module.exports = {
 
 
 				var expected = [
 				var expected = [
 					{val:4},
 					{val:4},
-					DocumentSource.EOF
+					null
 				];
 				];
-
-				var cwc = new CursorDocumentSource.CursorWithContext();
 				var input = [
 				var input = [
 					{val:1},
 					{val:1},
 					{val:2},
 					{val:2},
 					{val:3},
 					{val:3},
 					{val:4},
 					{val:4},
 				];
 				];
-				cwc._cursor = new Cursor( input );
-				var cds = new CursorDocumentSource(cwc);
-				sds.setSource(cds);
+				addSource(sds, input);
 
 
 				async.series([
 				async.series([
 						sds.getNext.bind(sds),
 						sds.getNext.bind(sds),
@@ -108,20 +109,17 @@ module.exports = {
 					}
 					}
 				);
 				);
 				sds.getNext(function(err, actual) {
 				sds.getNext(function(err, actual) {
-					assert.equal(actual, DocumentSource.EOF);
+					assert.equal(actual, null);
 				});
 				});
 			},
 			},
 			"should return documents if skip count is not hit and there are more documents": function hitSkip(next){
 			"should return documents if skip count is not hit and there are more documents": function hitSkip(next){
 				var sds = SkipDocumentSource.createFromJson(1);
 				var sds = SkipDocumentSource.createFromJson(1);
 
 
-				var cwc = new CursorDocumentSource.CursorWithContext();
 				var input = [{val:1},{val:2},{val:3}];
 				var input = [{val:1},{val:2},{val:3}];
-				cwc._cursor = new Cursor( input );
-				var cds = new CursorDocumentSource(cwc);
-				sds.setSource(cds);
+				addSource(sds, input);
 
 
 				sds.getNext(function(err,actual) {
 				sds.getNext(function(err,actual) {
-					assert.notEqual(actual, DocumentSource.EOF);
+					assert.notEqual(actual, null);
 					assert.deepEqual(actual, {val:2});
 					assert.deepEqual(actual, {val:2});
 					next();
 					next();
 				});
 				});
@@ -130,11 +128,8 @@ module.exports = {
 			"should return the current document source": function currSource(){
 			"should return the current document source": function currSource(){
 				var sds = SkipDocumentSource.createFromJson(1);
 				var sds = SkipDocumentSource.createFromJson(1);
 
 
-				var cwc = new CursorDocumentSource.CursorWithContext();
 				var input = [{val:1},{val:2},{val:3}];
 				var input = [{val:1},{val:2},{val:3}];
-				cwc._cursor = new Cursor( input );
-				var cds = new CursorDocumentSource(cwc);
-				sds.setSource(cds);
+				addSource(sds, input);
 
 
 				sds.getNext(function(err, actual) {
 				sds.getNext(function(err, actual) {
 					assert.deepEqual(actual, { val:2 });
 					assert.deepEqual(actual, { val:2 });
@@ -147,17 +142,11 @@ module.exports = {
 
 
 				var expected = [
 				var expected = [
 					{item:4},
 					{item:4},
-					DocumentSource.EOF
+					null
 				];
 				];
-
-				var i = 1;
-				sds.source = {
-					getNext:function(cb){
-						if (i>=5)
-							return cb(null,DocumentSource.EOF);
-						return cb(null, { item:i++ });
-					}
-				};
+				
+				var input = [{item:1},{item:2},{item:3},{item:4}];
+				addSource(sds, input);
 
 
 				async.series([
 				async.series([
 						sds.getNext.bind(sds),
 						sds.getNext.bind(sds),

+ 9 - 12
test/lib/pipeline/documentSources/UnwindDocumentSource.js

@@ -4,7 +4,7 @@ var assert = require("assert"),
 	DocumentSource = require("../../../../lib/pipeline/documentSources/DocumentSource"),
 	DocumentSource = require("../../../../lib/pipeline/documentSources/DocumentSource"),
 	UnwindDocumentSource = require("../../../../lib/pipeline/documentSources/UnwindDocumentSource"),
 	UnwindDocumentSource = require("../../../../lib/pipeline/documentSources/UnwindDocumentSource"),
 	CursorDocumentSource = require("../../../../lib/pipeline/documentSources/CursorDocumentSource"),
 	CursorDocumentSource = require("../../../../lib/pipeline/documentSources/CursorDocumentSource"),
-	Cursor = require("../../../../lib/Cursor");
+	ArrayRunner = require("../../../../lib/query/ArrayRunner");
 
 
 
 
 //HELPERS
 //HELPERS
@@ -35,10 +35,7 @@ var createUnwind = function createUnwind(unwind) {
 };
 };
 
 
 var addSource = function addSource(unwind, data) {
 var addSource = function addSource(unwind, data) {
-	var cwc = new CursorDocumentSource.CursorWithContext();
-	cwc._cursor = new Cursor(data);
-	var cds = new CursorDocumentSource(cwc);
-	var pds = new UnwindDocumentSource();
+	var cds = new CursorDocumentSource(null, new ArrayRunner(data), null);
 	unwind.setSource(cds);
 	unwind.setSource(cds);
 };
 };
 
 
@@ -53,7 +50,7 @@ var checkResults = function checkResults(data, expectedResults, path, next) {
 
 
 	expectedResults = expectedResults || [];
 	expectedResults = expectedResults || [];
 
 
-	expectedResults.push(DocumentSource.EOF);
+	expectedResults.push(null);
 
 
 	//Load the results from the DocumentSourceUnwind
 	//Load the results from the DocumentSourceUnwind
 	var docs = [], i = 0;
 	var docs = [], i = 0;
@@ -65,7 +62,7 @@ var checkResults = function checkResults(data, expectedResults, path, next) {
 			});
 			});
 		},
 		},
 		function() {
 		function() {
-			return docs[i++] !== DocumentSource.EOF;
+			return docs[i++] !== null;
 		},
 		},
 		function(err) {
 		function(err) {
 			assert.deepEqual(expectedResults, docs);
 			assert.deepEqual(expectedResults, docs);
@@ -111,7 +108,7 @@ module.exports = {
 				var pds = createUnwind();
 				var pds = createUnwind();
 				addSource(pds, []);
 				addSource(pds, []);
 				pds.getNext(function(err,doc) {
 				pds.getNext(function(err,doc) {
-					assert.strictEqual(doc, DocumentSource.EOF);
+					assert.strictEqual(doc, null);
 					next();
 					next();
 				});
 				});
 			},
 			},
@@ -120,7 +117,7 @@ module.exports = {
 				var pds = createUnwind();
 				var pds = createUnwind();
 				addSource(pds, [{_id:0, a:[1]}]);
 				addSource(pds, [{_id:0, a:[1]}]);
 				pds.getNext(function(err,doc) {
 				pds.getNext(function(err,doc) {
-					assert.notStrictEqual(doc, DocumentSource.EOF);
+					assert.notStrictEqual(doc, null);
 					next();
 					next();
 				});
 				});
 			},
 			},
@@ -129,7 +126,7 @@ module.exports = {
 				var pds = createUnwind();
 				var pds = createUnwind();
 				addSource(pds, [{_id:0, a:[1,2]}]);
 				addSource(pds, [{_id:0, a:[1,2]}]);
 				pds.getNext(function(err,doc) {
 				pds.getNext(function(err,doc) {
-					assert.notStrictEqual(doc, DocumentSource.EOF);
+					assert.notStrictEqual(doc, null);
 					assert.strictEqual(doc.a, 1);
 					assert.strictEqual(doc.a, 1);
 					pds.getNext(function(err,doc) {
 					pds.getNext(function(err,doc) {
 						assert.strictEqual(doc.a, 2);
 						assert.strictEqual(doc.a, 2);
@@ -151,10 +148,10 @@ module.exports = {
 						});
 						});
 					},
 					},
 					function() {
 					function() {
-						return docs[i++] !== DocumentSource.EOF;
+						return docs[i++] !== null;
 					},
 					},
 					function(err) {
 					function(err) {
-						assert.deepEqual([{_id:0, a:1},{_id:0, a:2},DocumentSource.EOF], docs);
+						assert.deepEqual([{_id:0, a:1},{_id:0, a:2},null], docs);
 						next();
 						next();
 					}
 					}
 				);
 				);

+ 88 - 0
test/lib/query/ArrayRunner.js

@@ -0,0 +1,88 @@
+"use strict";
+var assert = require("assert"),
+	Runner = require("../../../lib/query/Runner"),
+	ArrayRunner = require("../../../lib/query/ArrayRunner");
+
+module.exports = {
+
+	"ArrayRunner": {
+		"#constructor": {
+			"should accept an array of data": function(){
+				assert.doesNotThrow(function(){
+					var ar = new ArrayRunner([1,2,3]);
+				});
+			},
+			"should fail if not given an array": function(){
+				assert.throws(function(){
+					var ar = new ArrayRunner();
+				});
+				assert.throws(function(){
+					var ar = new ArrayRunner(123);
+				});
+			}
+		},
+		"#getNext": {
+			"should return the next item in the array": function(done){
+				var ar = new ArrayRunner([1,2,3]);
+				
+				ar.getNext(function(err, out, state){
+					assert.strictEqual(state, Runner.RunnerState.RUNNER_ADVANCED);
+					assert.strictEqual(out, 1);
+					ar.getNext(function(err, out, state){
+						assert.strictEqual(state, Runner.RunnerState.RUNNER_ADVANCED);
+						assert.strictEqual(out, 2);
+						ar.getNext(function(err, out, state){
+							assert.strictEqual(state, Runner.RunnerState.RUNNER_ADVANCED);
+							assert.strictEqual(out, 3);
+							done();
+						});
+					});
+				});
+			},
+			"should return EOF if there is nothing left in the array": function(done){
+				var ar = new ArrayRunner([1]);
+				
+				ar.getNext(function(err, out, state){
+					assert.strictEqual(state, Runner.RunnerState.RUNNER_ADVANCED);
+					assert.strictEqual(out, 1);
+					ar.getNext(function(err, out, state){
+						assert.strictEqual(state, Runner.RunnerState.RUNNER_EOF);
+						assert.strictEqual(out, undefined);
+						done();
+					});
+				});
+			}
+		},
+		"#getInfo": {
+			"should return nothing if explain flag is not set": function(){
+				var ar = new ArrayRunner([1,2,3]);
+				assert.strictEqual(ar.getInfo(), undefined);
+			},
+			"should return information about the runner if explain flag is set": function(){
+				var ar = new ArrayRunner([1,2,3]);
+				assert.deepEqual(ar.getInfo(true), {
+					"type":"ArrayRunner",
+					"nDocs":3,
+					"position":0,
+					"state": Runner.RunnerState.RUNNER_ADVANCED
+				});
+			}
+		},
+		"#reset": {
+			"should clear out the runner": function(){
+				var ar = new ArrayRunner([1,2,3]);
+				ar.reset();
+				
+				assert.deepEqual(ar.getInfo(true), {
+					"type":"ArrayRunner",
+					"nDocs":0,
+					"position":0,
+					"state": Runner.RunnerState.RUNNER_DEAD
+				});				
+			}
+		}
+	}
+
+};
+
+if (!module.parent)(new(require("mocha"))()).ui("exports").reporter("spec").addFile(__filename).run();