Browse Source

EAGLESIX-812: fixed test cases for existing ported document sources to use the existing version of Cursor. Also removed DocumentSource.EOF and replaced with null

Phil Murray 11 năm trước cách đây
mục cha
commit
861f850099

+ 2 - 18
lib/pipeline/documentSources/DocumentSource.js

@@ -37,22 +37,6 @@ var DocumentSource = module.exports = function DocumentSource(expCtx){
 
 }, klass = DocumentSource, base = Object, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
 
-/**
- * Use EOF as boost::none for document sources to signal the end of their document stream.
- **/
-klass.EOF = (function() {
-	/**
-	 * Represents a non-value in a document stream
-	 * @class EOF
-	 * @namespace mungedb-aggregate.pipeline.documentSources.DocumentSource
-	 * @module mungedb-aggregate
-	 * @constructor
-	 **/
-	var klass = function EOF(){
-	};
-	return klass;
-})();
-
 /*
 class DocumentSource :
 public IntrusiveCounterUnsigned,
@@ -83,7 +67,7 @@ proto.getPipelineStep = function getPipelineStep() {
 };
 
 /**
- * Returns the next Document if there is one or DocumentSource.EOF if at EOF.
+ * Returns the next Document if there is one or null if at EOF.
  *
  * some implementations do the equivalent of verify(!eof()) so check eof() first
  * @method	getNext
@@ -206,7 +190,7 @@ proto.serializeToArray = function serializeToArray(array, explain) {
  * @method GET_NEXT_PASS_THROUGH
  * @param callback {Function}
  * @param callback.err {Error} An error or falsey
- * @param callback.doc {Object} The source's next object or DocumentSource.EOF
+ * @param callback.doc {Object} The source's next object or null
  **/
 klass.GET_NEXT_PASS_THROUGH = function GET_NEXT_PASS_THROUGH(callback) {
 	if (!callback) throw new Error(this.getSourceName() + ' #getNext() requires callback');

+ 2 - 2
lib/pipeline/documentSources/LimitDocumentSource.js

@@ -57,8 +57,8 @@ proto.getNext = function getNext(callback) {
 
 	if (++this.count > this.limit) {
 		this.source.dispose();
-		callback(null, DocumentSource.EOF);
-		return DocumentSource.EOF;
+		callback(null, null);
+		return null;
 	}
 
 	return this.source.getNext(callback);

+ 4 - 4
lib/pipeline/documentSources/MatchDocumentSource.js

@@ -50,9 +50,9 @@ proto.getNext = function getNext(callback) {
 			return self.matcher.matches(doc);
 		},
 		makeReturn = function makeReturn(doc) {
-			if(doc !== DocumentSource.EOF && test(doc)) { // Passes the match criteria
+			if(doc !== null && test(doc)) { // Passes the match criteria
 				return doc;
-			} else if(doc === DocumentSource.EOF){ // Got EOF
+			} else if(doc === null){ // Got EOF
 				return doc;
 			}
 			return undefined; // Didn't match, but not EOF
@@ -61,14 +61,14 @@ proto.getNext = function getNext(callback) {
 		function(cb) {
 			self.source.getNext(function(err, doc) {
 				if(err) return callback(err);
-				if (makeReturn(doc)) {
+				if (makeReturn(doc) !== undefined) {
 					next = doc;
 				}
 				return cb();
 			});
 		},
 		function() {
-			var foundDoc = (next === DocumentSource.EOF || next !== undefined);
+			var foundDoc = (next === null || next !== undefined);
 			return foundDoc; //keep going until doc is found
 		},
 		function(err) {

+ 6 - 6
lib/pipeline/documentSources/RedactDocumentSource.js

@@ -37,17 +37,17 @@ proto.getNext = function getNext(callback) {
 		doc;
 	async.whilst(
 		function() {
-			return doc !== DocumentSource.EOF;
+			return doc !== null;
 		},
 		function(cb) {
 			self.source.getNext(function(err, input) {
 				doc = input;
-				if (input === DocumentSource.EOF)
+				if (input === null)
 					return cb();
 				self._variables.setRoot(input);
 				self._variables.setValue(self._currentId, input);
 				var result = self.redactObject();
-				if (result !== DocumentSource.EOF)
+				if (result !== null)
 					return cb(result); //Using the err argument to pass the result document; this lets us break out without having EOF
 				return cb();
 			});
@@ -55,7 +55,7 @@ proto.getNext = function getNext(callback) {
 		function(doc) {
 			if (doc)
 				return callback(null, doc);
-			return callback(null, DocumentSource.EOF);
+			return callback(null, null);
 		}
 	);
 	return doc;
@@ -79,7 +79,7 @@ proto.redactValue = function redactValue(input) {
 	} else if (input instanceof Object && input.constructor === Object) {
 		this._variables.setValue(this._currentId, input);
 		var result = this.redactObject();
-		if (result !== DocumentSource.EOF)
+		if (result !== null)
 			return result;
 		return null;
 	} else {
@@ -96,7 +96,7 @@ proto.redactObject = function redactObject() {
 	if (expressionResult === KEEP_VAL) {
 		return this._variables.getDocument(this._currentId);
 	} else if (expressionResult === PRUNE_VAL) {
-		return DocumentSource.EOF;
+		return null;
 	} else if (expressionResult === DESCEND_VAL) {
 		var input = this._variables.getDocument(this._currentId);
 		var out = {};

+ 1 - 1
lib/pipeline/documentSources/SkipDocumentSource.js

@@ -89,7 +89,7 @@ proto.getNext = function getNext(callback) {
 				});
 			},
 			function() {
-				return self.count < self.skip || next === DocumentSource.EOF;
+				return self.count < self.skip || next === null;
 			},
 			function (err) {
 				if (err) { return callback(err); }

+ 3 - 3
lib/pipeline/documentSources/UnwindDocumentSource.js

@@ -71,7 +71,7 @@ klass.Unwinder = (function() {
 	 **/
 	proto.getNext = function getNext() {
 		if (this._inputArray === undefined || this._index === this._inputArray.length) {
-			return DocumentSource.EOF;
+			return null;
 		}
 
 		this._document = Document.cloneDeep(this._document);
@@ -113,7 +113,7 @@ proto.getNext = function getNext(callback) {
 
 	async.until(
 		function () {
-			if (out !== DocumentSource.EOF || exhausted) {
+			if (out !== null || exhausted) {
 				return true;
 			}
 
@@ -125,7 +125,7 @@ proto.getNext = function getNext(callback) {
 					return cb(err);
 				}
 
-				if (doc === DocumentSource.EOF) {
+				if (doc === null) {
 					exhausted = true;
 				} else {
 					self._unwinder.resetDocument(doc);

+ 7 - 6
test/lib/pipeline/documentSources/GeoNearDocumentSource.js

@@ -3,13 +3,17 @@ var assert = require("assert"),
 	DocumentSource = require("../../../../lib/pipeline/documentSources/DocumentSource"),
 	GeoNearDocumentSource = require("../../../../lib/pipeline/documentSources/GeoNearDocumentSource"),
 	CursorDocumentSource = require("../../../../lib/pipeline/documentSources/CursorDocumentSource"),
-	Cursor = require("../../../../lib/Cursor"),
+	ArrayRunner = require("../../../../lib/query/ArrayRunner"),
 	FieldPath = require("../../../../lib/pipeline/FieldPath");
 
 var createGeoNear = function(ctx) {
 	var ds = new GeoNearDocumentSource(ctx);
 	return ds;
 };
+var addSource = function addSource(ds, data) {
+	var cds = new CursorDocumentSource(null, new ArrayRunner(data), null);
+	ds.setSource(cds);
+};
 
 module.exports = {
 
@@ -55,14 +59,11 @@ module.exports = {
 		"#setSource()":{
 
 			"check that setting source of GeoNearDocumentSource throws error":function() {
-				var cwc = new CursorDocumentSource.CursorWithContext();
 				var input = [{}];
-				cwc._cursor = new Cursor( input );
-				var cds = new CursorDocumentSource(cwc);
 				var gnds = createGeoNear();
 
 				assert.throws(function(){
-					gnds.setSource(cds);
+					addSource(gnds, input);
 				});
 			}
 
@@ -95,4 +96,4 @@ module.exports = {
 	}
 };
 
-if (!module.parent)(new(require("mocha"))()).ui("exports").reporter("spec").addFile(__filename).grep(process.env.MOCHA_GREP || '').run(process.exit);
+if (!module.parent)(new(require("mocha"))()).ui("exports").reporter("spec").addFile(__filename).grep(process.env.MOCHA_GREP || '').run(process.exit);

+ 12 - 23
test/lib/pipeline/documentSources/LimitDocumentSource.js

@@ -1,8 +1,14 @@
 "use strict";
 var assert = require("assert"),
 	DocumentSource = require("../../../../lib/pipeline/documentSources/DocumentSource"),
-	LimitDocumentSource = require("../../../../lib/pipeline/documentSources/LimitDocumentSource");
+	LimitDocumentSource = require("../../../../lib/pipeline/documentSources/LimitDocumentSource"),
+	CursorDocumentSource = require("../../../../lib/pipeline/documentSources/CursorDocumentSource"),
+	ArrayRunner = require("../../../../lib/query/ArrayRunner");
 
+var addSource = function addSource(ds, data) {
+	var cds = new CursorDocumentSource(null, new ArrayRunner(data), null);
+	ds.setSource(cds);
+};
 
 module.exports = {
 
@@ -73,7 +79,7 @@ module.exports = {
 			"should return the current document source": function currSource(next){
 				var lds = new LimitDocumentSource({"$limit":[{"a":1},{"a":2}]});
 				lds.limit = 1;
-				lds.source = {getNext:function(cb){cb(null,{ item:1 });}};
+				addSource(lds, [{item:1}]);
 				lds.getNext(function(err,val) {
 					assert.deepEqual(val, { item:1 });
 					return next();
@@ -84,19 +90,10 @@ module.exports = {
 			"should return EOF for no sources remaining": function noMoar(next){
 				var lds = new LimitDocumentSource({"$match":[{"a":1},{"a":1}]});
 				lds.limit = 1;
-				lds.source = {
-					calls: 0,
-					getNext:function(cb) {
-						if (lds.source.calls)
-							return cb(null,DocumentSource.EOF);
-						lds.source.calls++;
-						return cb(null,{item:1});
-					},
-					dispose:function() { return true; }
-				};
+				addSource(lds, [{item:1}]);
 				lds.getNext(function(){});
 				lds.getNext(function(err,val) {
-					assert.strictEqual(val, DocumentSource.EOF);
+					assert.strictEqual(val, null);
 					return next();
 				});
 			},
@@ -104,18 +101,10 @@ module.exports = {
 			"should return EOF if we hit our limit": function noMoar(next){
 				var lds = new LimitDocumentSource();
 				lds.limit = 1;
-				lds.source = {
-					calls: 0,
-					getNext:function(cb) {
-						if (lds.source.calls)
-							return cb(null,DocumentSource.EOF);
-						return cb(null,{item:1});
-					},
-					dispose:function() { return true; }
-				};
+				addSource(lds, [{item:1},{item:2}]);
 				lds.getNext(function(){});
 				lds.getNext(function (err,val) {
-					assert.strictEqual(val, DocumentSource.EOF);
+					assert.strictEqual(val, null);
 					return next();
 				});
 			}

+ 15 - 33
test/lib/pipeline/documentSources/MatchDocumentSource.js

@@ -2,13 +2,18 @@
 var assert = require("assert"),
 	async = require("async"),
 	DocumentSource = require("../../../../lib/pipeline/documentSources/DocumentSource"),
-	MatchDocumentSource = require("../../../../lib/pipeline/documentSources/MatchDocumentSource");
+	MatchDocumentSource = require("../../../../lib/pipeline/documentSources/MatchDocumentSource"),
+	CursorDocumentSource = require("../../../../lib/pipeline/documentSources/CursorDocumentSource"),
+	ArrayRunner = require("../../../../lib/query/ArrayRunner");
 
 var testRedactSafe = function testRedactSafe(input, safePortion) {
 	var match = MatchDocumentSource.createFromJson(input);
 	assert.deepEqual(match.redactSafePortion(), safePortion);
 };
-
+var addSource = function addSource(match, data) {
+	var cds = new CursorDocumentSource(null, new ArrayRunner(data), null);
+	match.setSource(cds);
+};
 
 module.exports = {
 
@@ -67,7 +72,7 @@ module.exports = {
 
 			"should return the current document source": function currSource(next){
 				var mds = new MatchDocumentSource({item: 1});
-				mds.source = {getNext:function(cb){cb(null,{ item:1 });}};
+				addSource(mds, [{ item:1 }]);
 				mds.getNext(function(err,val) {
 					assert.deepEqual(val, { item:1 });
 					next();
@@ -77,16 +82,9 @@ module.exports = {
 			"should return matched sources remaining": function (next){
 				var mds = new MatchDocumentSource({ item: {$lt: 5} }),
 					items = [ 1,2,3,4,5,6,7,8,9 ];
-				mds.source = {
-					calls: 0,
-					getNext:function(cb) {
-						if (this.calls >= items.length)
-							return cb(null,DocumentSource.EOF);
-						return cb(null,{item: items[this.calls++]});
-					},
-					dispose:function() { return true; }
-				};
+				addSource(mds, items.map(function(i){return {item:i};}));
 
+				debugger;
 				async.series([
 						mds.getNext.bind(mds),
 						mds.getNext.bind(mds),
@@ -95,7 +93,7 @@ module.exports = {
 						mds.getNext.bind(mds),
 					],
 					function(err,res) {
-						assert.deepEqual([{item:1},{item:2},{item:3},{item:4},DocumentSource.EOF], res);
+						assert.deepEqual([{item:1},{item:2},{item:3},{item:4},null], res);
 						next();
 					}
 				);
@@ -104,15 +102,7 @@ module.exports = {
 			"should not return matched out documents for sources remaining": function (next){
 				var mds = new MatchDocumentSource({ item: {$gt: 5} }),
 					items = [ 1,2,3,4,5,6,7,8,9 ];
-				mds.source = {
-					calls: 0,
-					getNext:function(cb) {
-						if (this.calls >= items.length)
-							return cb(null,DocumentSource.EOF);
-						return cb(null,{item: items[this.calls++]});
-					},
-					dispose:function() { return true; }
-				};
+				addSource(mds, items.map(function(i){return {item:i};}));
 
 				async.series([
 						mds.getNext.bind(mds),
@@ -122,7 +112,7 @@ module.exports = {
 						mds.getNext.bind(mds),
 					],
 					function(err,res) {
-						assert.deepEqual([{item:6},{item:7},{item:8},{item:9},DocumentSource.EOF], res);
+						assert.deepEqual([{item:6},{item:7},{item:8},{item:9},null], res);
 						next();
 					}
 				);
@@ -131,21 +121,13 @@ module.exports = {
 			"should return EOF for no sources remaining": function (next){
 				var mds = new MatchDocumentSource({ item: {$gt: 5} }),
 					items = [ ];
-				mds.source = {
-					calls: 0,
-					getNext:function(cb) {
-						if (this.calls >= items.length)
-							return cb(null,DocumentSource.EOF);
-						return cb(null,{item: items[this.calls++]});
-					},
-					dispose:function() { return true; }
-				};
+				addSource(mds, items.map(function(i){return {item:i};}));
 
 				async.series([
 						mds.getNext.bind(mds),
 					],
 					function(err,res) {
-						assert.deepEqual([DocumentSource.EOF], res);
+						assert.deepEqual([null], res);
 						next();
 					}
 				);

+ 9 - 11
test/lib/pipeline/documentSources/OutDocumentSource.js

@@ -4,12 +4,16 @@ var assert = require("assert"),
 	DocumentSource = require("../../../../lib/pipeline/documentSources/DocumentSource"),
 	OutDocumentSource = require("../../../../lib/pipeline/documentSources/OutDocumentSource"),
 	CursorDocumentSource = require("../../../../lib/pipeline/documentSources/CursorDocumentSource"),
-	Cursor = require("../../../../lib/Cursor");
+	ArrayRunner = require("../../../../lib/query/ArrayRunner");
 
 var createOut = function(ctx) {
 	var ds = new OutDocumentSource(ctx);
 	return ds;
 };
+var addSource = function addSource(ds, data) {
+	var cds = new CursorDocumentSource(null, new ArrayRunner(data), null);
+	ds.setSource(cds);
+};
 
 module.exports = {
 
@@ -43,12 +47,9 @@ module.exports = {
 
 			"should act as passthrough (for now)": function(next) {
 				var ods = OutDocumentSource.createFromJson("test"),
-					cwc = new CursorDocumentSource.CursorWithContext(),
 					l = [{_id:0,a:[{b:1},{b:2}]}, {_id:1,a:[{b:1},{b:1}]} ];
 
-				cwc._cursor = new Cursor( l );
-				var cds = new CursorDocumentSource(cwc);
-				ods.setSource(cds);
+				addSource(ods, l);
 
 				var docs = [], i = 0;
 				async.doWhilst(
@@ -59,10 +60,10 @@ module.exports = {
 						});
 					},
 					function() {
-						return docs[i++] !== DocumentSource.EOF;
+						return docs[i++] !== null;
 					},
 					function(err) {
-						assert.deepEqual([{_id:0,a:[{b:1},{b:2}]}, {_id:1,a:[{b:1},{b:1}]}, DocumentSource.EOF], docs);
+						assert.deepEqual([{_id:0,a:[{b:1},{b:2}]}, {_id:1,a:[{b:1},{b:1}]}, null], docs);
 						next();
 					}
 				);
@@ -83,13 +84,10 @@ module.exports = {
 		"#serialize()":{
 
 			"serialize":function() {
-				var cwc = new CursorDocumentSource.CursorWithContext();
 				var input = [{_id: 0, a: 1}, {_id: 1, a: 2}];
-				cwc._cursor = new Cursor( input );
-				var cds = new CursorDocumentSource(cwc);
 				var title = "CognitiveScientists";
 				var ods = OutDocumentSource.createFromJson(title);
-				ods.setSource(cds);
+				addSource(ods, input);
 				var srcNm = ods.getSourceName();
 				var serialize = {};
 				serialize[srcNm] = title;

+ 9 - 13
test/lib/pipeline/documentSources/RedactDocumentSource.js

@@ -4,7 +4,7 @@ var assert = require("assert"),
 	DocumentSource = require("../../../../lib/pipeline/documentSources/DocumentSource"),
 	RedactDocumentSource = require("../../../../lib/pipeline/documentSources/RedactDocumentSource"),
 	CursorDocumentSource = require("../../../../lib/pipeline/documentSources/CursorDocumentSource"),
-	Cursor = require("../../../../lib/Cursor"),
+	ArrayRunner = require("../../../../lib/query/ArrayRunner"),
 	Expressions = require("../../../../lib/pipeline/expressions");
 
 var exampleRedact = {$cond:{
@@ -15,9 +15,7 @@ var exampleRedact = {$cond:{
 
 var createCursorDocumentSource = function createCursorDocumentSource (input) {
 	if (!input || input.constructor !== Array) throw new Error('invalid');
-	var cwc = new CursorDocumentSource.CursorWithContext();
-	cwc._cursor = new Cursor(input);
-	return new CursorDocumentSource(cwc);
+	return new CursorDocumentSource(null, new ArrayRunner(input), null);
 };
 
 var createRedactDocumentSource = function createRedactDocumentSource (src, expression) {
@@ -55,29 +53,27 @@ module.exports = {
 				var rds = RedactDocumentSource.createFromJson(exampleRedact);
 				rds.setSource({
 					getNext: function getNext(cb) {
-						return cb(null, DocumentSource.EOF);
+						return cb(null, null);
 					}
 				});
 				rds.getNext(function(err, doc) {
-					assert.equal(DocumentSource.EOF, doc);
+					assert.equal(null, doc);
 					next();
 				});
 			},
 
 			"iterator state accessors consistently report the source is exhausted": function assertExhausted() {
-				var cwc = new CursorDocumentSource.CursorWithContext();
 				var input = [{}];
-				cwc._cursor = new Cursor( input );
-				var cds = new CursorDocumentSource(cwc);
+				var cds = createCursorDocumentSource(input);
 				var rds = RedactDocumentSource.createFromJson(exampleRedact);
 				rds.setSource(cds);
 				rds.getNext(function(err, actual) {
 					rds.getNext(function(err, actual1) {
-						assert.equal(DocumentSource.EOF, actual1);
+						assert.equal(null, actual1);
 						rds.getNext(function(err, actual2) {
-							assert.equal(DocumentSource.EOF, actual2);
+							assert.equal(null, actual2);
 							rds.getNext(function(err, actual3) {
-								assert.equal(DocumentSource.EOF, actual3);
+								assert.equal(null, actual3);
 							});
 						});
 					});
@@ -235,4 +231,4 @@ module.exports = {
 
 };
 
-if (!module.parent)(new(require("mocha"))()).ui("exports").reporter("spec").addFile(__filename).grep(process.env.MOCHA_GREP || '').run(process.exit);
+if (!module.parent)(new(require("mocha"))()).ui("exports").reporter("spec").addFile(__filename).grep(process.env.MOCHA_GREP || '').run(process.exit);

+ 17 - 28
test/lib/pipeline/documentSources/SkipDocumentSource.js

@@ -1,10 +1,15 @@
 "use strict";
 var assert = require("assert"),
 	async = require("async"),
-	Cursor = require("../../../../lib/Cursor"),
 	DocumentSource = require("../../../../lib/pipeline/documentSources/DocumentSource"),
+	SkipDocumentSource = require("../../../../lib/pipeline/documentSources/SkipDocumentSource"),
 	CursorDocumentSource = require("../../../../lib/pipeline/documentSources/CursorDocumentSource"),
-	SkipDocumentSource = require("../../../../lib/pipeline/documentSources/SkipDocumentSource");
+	ArrayRunner = require("../../../../lib/query/ArrayRunner");
+
+var addSource = function addSource(ds, data) {
+	var cds = new CursorDocumentSource(null, new ArrayRunner(data), null);
+	ds.setSource(cds);
+};
 
 
 module.exports = {
@@ -84,19 +89,15 @@ module.exports = {
 
 				var expected = [
 					{val:4},
-					DocumentSource.EOF
+					null
 				];
-
-				var cwc = new CursorDocumentSource.CursorWithContext();
 				var input = [
 					{val:1},
 					{val:2},
 					{val:3},
 					{val:4},
 				];
-				cwc._cursor = new Cursor( input );
-				var cds = new CursorDocumentSource(cwc);
-				sds.setSource(cds);
+				addSource(sds, input);
 
 				async.series([
 						sds.getNext.bind(sds),
@@ -108,20 +109,17 @@ module.exports = {
 					}
 				);
 				sds.getNext(function(err, actual) {
-					assert.equal(actual, DocumentSource.EOF);
+					assert.equal(actual, null);
 				});
 			},
 			"should return documents if skip count is not hit and there are more documents": function hitSkip(next){
 				var sds = SkipDocumentSource.createFromJson(1);
 
-				var cwc = new CursorDocumentSource.CursorWithContext();
 				var input = [{val:1},{val:2},{val:3}];
-				cwc._cursor = new Cursor( input );
-				var cds = new CursorDocumentSource(cwc);
-				sds.setSource(cds);
+				addSource(sds, input);
 
 				sds.getNext(function(err,actual) {
-					assert.notEqual(actual, DocumentSource.EOF);
+					assert.notEqual(actual, null);
 					assert.deepEqual(actual, {val:2});
 					next();
 				});
@@ -130,11 +128,8 @@ module.exports = {
 			"should return the current document source": function currSource(){
 				var sds = SkipDocumentSource.createFromJson(1);
 
-				var cwc = new CursorDocumentSource.CursorWithContext();
 				var input = [{val:1},{val:2},{val:3}];
-				cwc._cursor = new Cursor( input );
-				var cds = new CursorDocumentSource(cwc);
-				sds.setSource(cds);
+				addSource(sds, input);
 
 				sds.getNext(function(err, actual) {
 					assert.deepEqual(actual, { val:2 });
@@ -147,17 +142,11 @@ module.exports = {
 
 				var expected = [
 					{item:4},
-					DocumentSource.EOF
+					null
 				];
-
-				var i = 1;
-				sds.source = {
-					getNext:function(cb){
-						if (i>=5)
-							return cb(null,DocumentSource.EOF);
-						return cb(null, { item:i++ });
-					}
-				};
+				
+				var input = [{item:1},{item:2},{item:3},{item:4}];
+				addSource(sds, input);
 
 				async.series([
 						sds.getNext.bind(sds),

+ 9 - 12
test/lib/pipeline/documentSources/UnwindDocumentSource.js

@@ -4,7 +4,7 @@ var assert = require("assert"),
 	DocumentSource = require("../../../../lib/pipeline/documentSources/DocumentSource"),
 	UnwindDocumentSource = require("../../../../lib/pipeline/documentSources/UnwindDocumentSource"),
 	CursorDocumentSource = require("../../../../lib/pipeline/documentSources/CursorDocumentSource"),
-	Cursor = require("../../../../lib/Cursor");
+	ArrayRunner = require("../../../../lib/query/ArrayRunner");
 
 
 //HELPERS
@@ -35,10 +35,7 @@ var createUnwind = function createUnwind(unwind) {
 };
 
 var addSource = function addSource(unwind, data) {
-	var cwc = new CursorDocumentSource.CursorWithContext();
-	cwc._cursor = new Cursor(data);
-	var cds = new CursorDocumentSource(cwc);
-	var pds = new UnwindDocumentSource();
+	var cds = new CursorDocumentSource(null, new ArrayRunner(data), null);
 	unwind.setSource(cds);
 };
 
@@ -53,7 +50,7 @@ var checkResults = function checkResults(data, expectedResults, path, next) {
 
 	expectedResults = expectedResults || [];
 
-	expectedResults.push(DocumentSource.EOF);
+	expectedResults.push(null);
 
 	//Load the results from the DocumentSourceUnwind
 	var docs = [], i = 0;
@@ -65,7 +62,7 @@ var checkResults = function checkResults(data, expectedResults, path, next) {
 			});
 		},
 		function() {
-			return docs[i++] !== DocumentSource.EOF;
+			return docs[i++] !== null;
 		},
 		function(err) {
 			assert.deepEqual(expectedResults, docs);
@@ -111,7 +108,7 @@ module.exports = {
 				var pds = createUnwind();
 				addSource(pds, []);
 				pds.getNext(function(err,doc) {
-					assert.strictEqual(doc, DocumentSource.EOF);
+					assert.strictEqual(doc, null);
 					next();
 				});
 			},
@@ -120,7 +117,7 @@ module.exports = {
 				var pds = createUnwind();
 				addSource(pds, [{_id:0, a:[1]}]);
 				pds.getNext(function(err,doc) {
-					assert.notStrictEqual(doc, DocumentSource.EOF);
+					assert.notStrictEqual(doc, null);
 					next();
 				});
 			},
@@ -129,7 +126,7 @@ module.exports = {
 				var pds = createUnwind();
 				addSource(pds, [{_id:0, a:[1,2]}]);
 				pds.getNext(function(err,doc) {
-					assert.notStrictEqual(doc, DocumentSource.EOF);
+					assert.notStrictEqual(doc, null);
 					assert.strictEqual(doc.a, 1);
 					pds.getNext(function(err,doc) {
 						assert.strictEqual(doc.a, 2);
@@ -151,10 +148,10 @@ module.exports = {
 						});
 					},
 					function() {
-						return docs[i++] !== DocumentSource.EOF;
+						return docs[i++] !== null;
 					},
 					function(err) {
-						assert.deepEqual([{_id:0, a:1},{_id:0, a:2},DocumentSource.EOF], docs);
+						assert.deepEqual([{_id:0, a:1},{_id:0, a:2},null], docs);
 						next();
 					}
 				);