Browse Source

Refs #5122: Update CursorDocumentSource

Chris Sexton 12 years ago
parent
commit
b30f7eed87

+ 93 - 90
lib/pipeline/documentSources/CursorDocumentSource.js

@@ -1,5 +1,9 @@
 "use strict";
 
+// Mimicking max memory size from mongo/db/query/new_find.cpp
+// Need to actually decide some size for this?
+var MAX_BATCH_DOCS = 150;
+
 /**
  * Constructs and returns Documents from the objects produced by a supplied Cursor.
  * An object of this type may only be used by one thread, see SERVER-6123.
@@ -30,6 +34,10 @@ var CursorDocumentSource = module.exports = CursorDocumentSource = function Curs
 	this._projection = null;
 
 	this._cursorWithContext = cursorWithContext;
+	this._curIdx = 0;
+	this._currentBatch = [];
+	this._limit = undefined;
+	this._docsAddedToBatches = 0;
 
 	if (!this._cursorWithContext || !this._cursorWithContext._cursor) throw new Error("CursorDocumentSource requires a valid cursorWithContext");
 
@@ -59,6 +67,41 @@ klass.CursorWithContext = (function (){
  **/
 proto.dispose = function dispose() {
 	this._cursorWithContext = null;
+	this._currentBatch = [];
+	this._curIdx = 0;
+};
+
+proto.getSourceName = function getSourceName() {
+	return "$cursor";
+};
+
+proto.getNext = function getNext(callback) {
+	if (this._currentBatch.length <= this._curIdx) {
+		this.loadBatch();
+
+		if (!this._currentBatch) {
+			if (callback)
+				return callback(null);
+			return null;
+		}
+	}
+
+	// Don't unshift. It's expensiver.
+	var out = this._currentBatch[this._curIdx];
+	this._curIdx++;
+
+	if (callback)
+		return callback(out);
+	return out;
+};
+
+proto.coalesce = function coalesce(nextSource) {
+	if (!this._limit) {
+		this._limit = nextSource;
+		return this._limit;
+	} else {
+		return this._limit.coalesce(nextSource);
+	}
 };
 
 ///**
@@ -78,9 +121,10 @@ proto.dispose = function dispose() {
 // * @method	setQuery
 // * @param	{Object}	pBsonObj	the query to record
 // **/
-//proto.setQuery = function setQuery(pBsonObj) {};
-//
-//
+proto.setQuery = function setQuery(query) {
+	this._query = query;
+};
+
 ///**
 // * Record the sort that was specified for the cursor this wraps, if any.
 // * This should be captured after any optimizations are applied to
@@ -98,7 +142,7 @@ proto.dispose = function dispose() {
  * @method	setProjection
  * @param	{Object}	projection
  **/
-proto.setProjection = function setProjection(projection) {
+proto.setProjection = function setProjection(projection, deps) {
 
 	if (this._projection){
 		throw new Error("projection is already set");
@@ -113,42 +157,10 @@ proto.setProjection = function setProjection(projection) {
 //	this.cursor().fields = this._projection;
 
 	this._projection = projection;  //just for testing
+	this._dependencies = deps;
 };
 
 //----------------virtuals from DocumentSource--------------
-/**
- * Is the source at EOF?
- * @method	eof
- **/
-proto.eof = function eof() {
-	if (!this.current) this.findNext(); // if we haven't gotten the first one yet, do so now
-	return (this.current === null);
-};
-
-/**
- * Advance the state of the DocumentSource so that it will return the next Document.
- * The default implementation returns false, after checking for interrupts.
- * Derived classes can call the default implementation in their own implementations in order to check for interrupts.
- *
- * @method	advance
- * @returns	{Boolean}	whether there is another document to fetch, i.e., whether or not getCurrent() will succeed.  This default implementation always returns false.
- **/
-proto.advance = function advance() {
-	base.prototype.advance.call(this); // check for interrupts
-	if (!this.current) this.findNext(); // if we haven't gotten the first one yet, do so now
-	this.findNext();
-	return (this.current !== null);
-};
-
-/**
- * some implementations do the equivalent of verify(!eof()) so check eof() first
- * @method	getCurrent
- * @returns	{Document}	the current Document without advancing
- **/
-proto.getCurrent = function getCurrent() {
-	if (!this.current) this.findNext(); // if we haven't gotten the first one yet, do so now
-	return this.current;
-};
 
 /**
  * Set the underlying source this source should use to get Documents
@@ -165,74 +177,65 @@ proto.getCurrent = function getCurrent() {
  * @param source   {DocumentSource}  the underlying source to use
  * @param callback  {Function}        a `mungedb-aggregate`-specific extension to the API to half-way support reading from async sources
  **/
-proto.setSource = function setSource(theSource, callback) {
+proto.setSource = function setSource(theSource) {
 	if (theSource) throw new Error("CursorDocumentSource doesn't take a source"); //TODO: This needs to put back without the if once async is fully and properly supported
-	if (callback) return setTimeout(callback, 0);
 };
 
-/**
- * Create an object that represents the document source.  The object
- * will have a single field whose name is the source's name.  This
- * will be used by the default implementation of addToBsonArray()
- * to add this object to a pipeline being represented in BSON.
- *
- * @method	sourceToJson
- * @param	{Object} pBuilder	BSONObjBuilder: a blank object builder to write to
- * @param	{Boolean}	explain	create explain output
- **/
-proto.sourceToJson = function sourceToJson(pBuilder, explain) {
-	/* this has no analog in the BSON world, so only allow it for explain */
-	//if (explain){
-	////we are not currently supporting explain in mungedb-aggregate
-	//}
+proto.serialize = function serialize(explain) {
+	if (!explain)
+		return this.value();
+
+	if (!this._cursorWithContext)
+		throw new Error("code 17135; Cursor deleted.");
+
+	return this.value(); // big stuff here
+};
+
+// LimitDocumentSource has the setLimit function which trickles down to any documentsource
+proto.getLimit = function getLimit() {
+	return this._limit;
 };
 
 //----------------private--------------
 
-proto.findNext = function findNext(){
+//proto.chunkMgr = function chunkMgr(){};
 
-	if ( !this._cursorWithContext ) {
-		this.current = null;
-		return;
-	}
+//proto.canUseCoveredIndex = function canUseCoveredIndex(){};
 
-	for( ; this.cursor().ok(); this.cursor().advance() ) {
+//proto.yieldSometimes = function yieldSometimes(){};
 
-		//yieldSometimes();
-//		if ( !this.cursor().ok() ) {
-//			// The cursor was exhausted during the yield.
-//			break;
-//		}
+proto.loadBatch = function loadBatch() {
+	var nDocs = 0,
+		cursor = this._cursorWithContext ? this._cursorWithContext._cursor : null;
 
-//		if ( !this.cursor().currentMatches() || this.cursor().currentIsDup() )
-//			continue;
+	if (!cursor)
+		return this.dispose();
 
+	for(;cursor.ok(); cursor.advance()) {
+		if (!cursor.ok())
+			break;
 
-		// grab the matching document
-		var documentObj;
-//		if (this.canUseCoveredIndex()) { ...  Dont need any of this, I think
+		// these methods do not exist
+		// if (!cursor.currentMatches() || cursor.currentIsDup())
+		// continue;
 
-		documentObj = this.cursor().current();
-		this.current = documentObj;
-		this.cursor().advance();
-		return;
-	}
+		var next = cursor.current();
+		this._currentBatch.push(this._projection ? this.documentFromBsonDeps(next, this._dependencies) : next);
 
-	// If we got here, there aren't any more documents.
-	// The CursorWithContext (and its read lock) must be released, see SERVER-6123.
-	this.dispose();
-	this.current = null;
-};
+		if (this._limit) {
+			if (++this._docsAddedToBatches == this._limit.getLimit())
+				break;
+
+			if (this._docsAddedToBatches >= this._limit.getLimit()) {
+				throw new Error("added documents to the batch over limit size");
+			}
+		}
 
-proto.cursor = function cursor(){
-	if( this._cursorWithContext && this._cursorWithContext._cursor){
-		return this._cursorWithContext._cursor;
+		// Mongo uses number of bytes, but that doesn't make sense here. Yield when nDocs is over a threshold
+		if (nDocs > MAX_BATCH_DOCS) {
+			this._curIdx++; // advance the deque
+			nDocs++;
+			return;
+		}
 	}
-	throw new Error("cursor not defined");
 };
-
-//proto.chunkMgr = function chunkMgr(){};
-
-//proto.canUseCoveredIndex = function canUseCoveredIndex(){};
-
-//proto.yieldSometimes = function yieldSometimes(){};

+ 47 - 54
test/lib/pipeline/documentSources/CursorDocumentSource.js

@@ -1,5 +1,6 @@
 "use strict";
 var assert = require("assert"),
+	async = require("async"),
 	CursorDocumentSource = require("../../../../lib/pipeline/documentSources/CursorDocumentSource"),
 	Cursor = require("../../../../lib/Cursor");
 
@@ -17,79 +18,71 @@ module.exports = {
 			"should get a accept a CursorWithContext and set it internally": function(){
 				var cwc = new CursorDocumentSource.CursorWithContext();
 				cwc._cursor = new Cursor( [] );
-				
+
 				var cds = new CursorDocumentSource(cwc);
-				
+
 				assert.ok(cds._cursorWithContext);
 			}
 		},
 
-		"#eof": {
-			"should return true if the cursor is empty": function(){
+		"#getNext": {
+			"should return the current cursor value sync": function(){
 				var cwc = new CursorDocumentSource.CursorWithContext();
-				cwc._cursor = new Cursor( [] );
-				
+				cwc._cursor = new Cursor( [1,2,3,4] );
+
 				var cds = new CursorDocumentSource(cwc);
-				
-				assert.equal(cds.eof(), true);
+				assert.equal(cds.getNext(), 1);
+				assert.equal(cds.getNext(), 2);
+				assert.equal(cds.getNext(), 3);
+				assert.equal(cds.getNext(), 4);
+				assert.equal(cds.getNext(), undefined);
 			},
-			"should return false if the cursor is non-empty": function(){
-				var cwc = new CursorDocumentSource.CursorWithContext();
-				cwc._cursor = new Cursor( [1,2,3] );
-				
-				var cds = new CursorDocumentSource(cwc);
-				
-				assert.equal(cds.eof(), false);
-			}
-		},
-		"#advance": {
-			"should return true if the cursor was advanced": function(){
+			"should return the current cursor value async": function(next){
 				var cwc = new CursorDocumentSource.CursorWithContext();
-				cwc._cursor = new Cursor( [1,2,3] );
-				
+				cwc._cursor = new Cursor( [1,2,3,4] );
+
 				var cds = new CursorDocumentSource(cwc);
-				
-				assert.equal(cds.advance(), true);
+				cds.getNext(function(val) {
+					assert.equal(val, 1);
+					cds.getNext(function(val) {
+						assert.equal(val, 2);
+						cds.getNext(function(val) {
+							assert.equal(val, 3);
+							cds.getNext(function(val) {
+								assert.equal(val, 4);
+								cds.getNext(function(val) {
+									assert.equal(val, undefined);
+									return next();
+								});
+							});
+						});
+					});
+				});
 			},
-			"should return false if the cursor is empty": function(){
-				var cwc = new CursorDocumentSource.CursorWithContext();
-				cwc._cursor = new Cursor( [1,2,3] );
-				
-				var cds = new CursorDocumentSource(cwc);
-				cds.advance();cds.advance();cds.advance();
-				assert.equal(cds.advance(), false);
-			}
-		},
-		"#getCurrent": {
-			"should return the current cursor value": function(){
-				var cwc = new CursorDocumentSource.CursorWithContext();
-				cwc._cursor = new Cursor( [1,2,3,4] );
-				
+			"should return values past the batch limit": function(){
+				var cwc = new CursorDocumentSource.CursorWithContext(),
+					n = 0,
+					arr = Array.apply(0, new Array(200)).map(function() { return n++; });
+				cwc._cursor = new Cursor( arr );
+
 				var cds = new CursorDocumentSource(cwc);
-				assert.equal(cds.getCurrent(), 1);
-				cds.advance();
-				assert.equal(cds.getCurrent(), 2);
-				cds.advance();
-				assert.equal(cds.getCurrent(), 3);
-				cds.advance();
-				assert.equal(cds.getCurrent(), 4);
-				cds.advance();
-				assert.equal(cds.getCurrent(), undefined);
-			}
+				arr.forEach(function(v) {
+					assert.equal(cds.getNext(), v);
+				});
+				assert.equal(cds.getNext(), undefined);
+			},
 		},
 		"#dispose": {
 			"should empty the current cursor": function(){
 				var cwc = new CursorDocumentSource.CursorWithContext();
 				cwc._cursor = new Cursor( [1,2,3] );
-				
+
 				var cds = new CursorDocumentSource(cwc);
-				assert.equal(cds.getCurrent(), 1);
-				cds.advance();
-				assert.equal(cds.getCurrent(), 2);
-				
+				assert.equal(cds.getNext(), 1);
+				assert.equal(cds.getNext(), 2);
+
 				cds.dispose();
-				assert.equal(cds.advance(), false);
-				assert.equal(cds.eof(), true);
+				assert.equal(cds.getNext(), undefined);
 			}
 		}