Browse Source

Refs #3042: Revert changes moved to another branch.

Revert "Refs #5122: Make serialize (explain) not crashy"
This reverts commit 27f68aceebb9e888b269e7477d0133c1c1c0a4c1.
Revert "Refs #5122: Update CursorDocumentSource"
This reverts commit b30f7eed87f476ccfcc1e72fcda1680e8c8e310c.
Revert "Refs #5121: Update DocumentSource"
This reverts commit 70982ba61ebe4fed649fae67e1ee373395fdc99c.
Chris Sexton 11 years ago
parent
commit
946e6984d7

+ 90 - 101
lib/pipeline/documentSources/CursorDocumentSource.js

@@ -1,9 +1,5 @@
 "use strict";
 
-// Mimicking max memory size from mongo/db/query/new_find.cpp
-// Need to actually decide some size for this?
-var MAX_BATCH_DOCS = 150;
-
 /**
  * Constructs and returns Documents from the objects produced by a supplied Cursor.
  * An object of this type may only be used by one thread, see SERVER-6123.
@@ -34,10 +30,6 @@ var CursorDocumentSource = module.exports = CursorDocumentSource = function Curs
 	this._projection = null;
 
 	this._cursorWithContext = cursorWithContext;
-	this._curIdx = 0;
-	this._currentBatch = [];
-	this._limit = undefined;
-	this._docsAddedToBatches = 0;
 
 	if (!this._cursorWithContext || !this._cursorWithContext._cursor) throw new Error("CursorDocumentSource requires a valid cursorWithContext");
 
@@ -67,41 +59,6 @@ klass.CursorWithContext = (function (){
  **/
 proto.dispose = function dispose() {
 	this._cursorWithContext = null;
-	this._currentBatch = [];
-	this._curIdx = 0;
-};
-
-proto.getSourceName = function getSourceName() {
-	return "$cursor";
-};
-
-proto.getNext = function getNext(callback) {
-	if (this._currentBatch.length <= this._curIdx) {
-		this.loadBatch();
-
-		if (!this._currentBatch) {
-			if (callback)
-				return callback(null);
-			return null;
-		}
-	}
-
-	// Don't unshift. It's expensiver.
-	var out = this._currentBatch[this._curIdx];
-	this._curIdx++;
-
-	if (callback)
-		return callback(out);
-	return out;
-};
-
-proto.coalesce = function coalesce(nextSource) {
-	if (!this._limit) {
-		this._limit = nextSource;
-		return this._limit;
-	} else {
-		return this._limit.coalesce(nextSource);
-	}
 };
 
 ///**
@@ -121,10 +78,9 @@ proto.coalesce = function coalesce(nextSource) {
 // * @method	setQuery
 // * @param	{Object}	pBsonObj	the query to record
 // **/
-proto.setQuery = function setQuery(query) {
-	this._query = query;
-};
-
+//proto.setQuery = function setQuery(pBsonObj) {};
+//
+//
 ///**
 // * Record the sort that was specified for the cursor this wraps, if any.
 // * This should be captured after any optimizations are applied to
@@ -142,7 +98,7 @@ proto.setQuery = function setQuery(query) {
  * @method	setProjection
  * @param	{Object}	projection
  **/
-proto.setProjection = function setProjection(projection, deps) {
+proto.setProjection = function setProjection(projection) {
 
 	if (this._projection){
 		throw new Error("projection is already set");
@@ -157,10 +113,42 @@ proto.setProjection = function setProjection(projection, deps) {
 //	this.cursor().fields = this._projection;
 
 	this._projection = projection;  //just for testing
-	this._dependencies = deps;
 };
 
 //----------------virtuals from DocumentSource--------------
+/**
+ * Is the source at EOF?
+ * @method	eof
+ **/
+proto.eof = function eof() {
+	if (!this.current) this.findNext(); // if we haven't gotten the first one yet, do so now
+	return (this.current === null);
+};
+
+/**
+ * Advance the state of the DocumentSource so that it will return the next Document.
+ * The default implementation returns false, after checking for interrupts.
+ * Derived classes can call the default implementation in their own implementations in order to check for interrupts.
+ *
+ * @method	advance
+ * @returns	{Boolean}	whether there is another document to fetch, i.e., whether or not getCurrent() will succeed.  This default implementation always returns false.
+ **/
+proto.advance = function advance() {
+	base.prototype.advance.call(this); // check for interrupts
+	if (!this.current) this.findNext(); // if we haven't gotten the first one yet, do so now
+	this.findNext();
+	return (this.current !== null);
+};
+
+/**
+ * some implementations do the equivalent of verify(!eof()) so check eof() first
+ * @method	getCurrent
+ * @returns	{Document}	the current Document without advancing
+ **/
+proto.getCurrent = function getCurrent() {
+	if (!this.current) this.findNext(); // if we haven't gotten the first one yet, do so now
+	return this.current;
+};
 
 /**
  * Set the underlying source this source should use to get Documents
@@ -177,73 +165,74 @@ proto.setProjection = function setProjection(projection, deps) {
  * @param source   {DocumentSource}  the underlying source to use
  * @param callback  {Function}        a `mungedb-aggregate`-specific extension to the API to half-way support reading from async sources
  **/
-proto.setSource = function setSource(theSource) {
+proto.setSource = function setSource(theSource, callback) {
 	if (theSource) throw new Error("CursorDocumentSource doesn't take a source"); //TODO: This needs to put back without the if once async is fully and properly supported
+	if (callback) return setTimeout(callback, 0);
 };
 
-proto.serialize = function serialize(explain) {
-	if (!explain)
-		return null;
-
-	if (!this._cursorWithContext)
-		throw new Error("code 17135; Cursor deleted.");
-
-	// A stab at what mongo wants
-	return {
-		query: this._query,
-		sort: this._sort ? this._sort : null,
-		limit: this._limit ? this._limit : null,
-		fields: this._projection ? this._projection : null,
-		indexonly: false,
-		cursorType: this._cursorWithContext ? "cursor" : null
-	};
-};
-
-// LimitDocumentSource has the setLimit function which trickles down to any documentsource
-proto.getLimit = function getLimit() {
-	return this._limit;
+/**
+ * Create an object that represents the document source.  The object
+ * will have a single field whose name is the source's name.  This
+ * will be used by the default implementation of addToBsonArray()
+ * to add this object to a pipeline being represented in BSON.
+ *
+ * @method	sourceToJson
+ * @param	{Object} pBuilder	BSONObjBuilder: a blank object builder to write to
+ * @param	{Boolean}	explain	create explain output
+ **/
+proto.sourceToJson = function sourceToJson(pBuilder, explain) {
+	/* this has no analog in the BSON world, so only allow it for explain */
+	//if (explain){
+	////we are not currently supporting explain in mungedb-aggregate
+	//}
 };
 
 //----------------private--------------
 
-//proto.chunkMgr = function chunkMgr(){};
-
-//proto.canUseCoveredIndex = function canUseCoveredIndex(){};
+proto.findNext = function findNext(){
 
-//proto.yieldSometimes = function yieldSometimes(){};
+	if ( !this._cursorWithContext ) {
+		this.current = null;
+		return;
+	}
 
-proto.loadBatch = function loadBatch() {
-	var nDocs = 0,
-		cursor = this._cursorWithContext ? this._cursorWithContext._cursor : null;
+	for( ; this.cursor().ok(); this.cursor().advance() ) {
 
-	if (!cursor)
-		return this.dispose();
+		//yieldSometimes();
+//		if ( !this.cursor().ok() ) {
+//			// The cursor was exhausted during the yield.
+//			break;
+//		}
 
-	for(;cursor.ok(); cursor.advance()) {
-		if (!cursor.ok())
-			break;
+//		if ( !this.cursor().currentMatches() || this.cursor().currentIsDup() )
+//			continue;
 
-		// these methods do not exist
-		// if (!cursor.currentMatches() || cursor.currentIsDup())
-		// continue;
 
-		var next = cursor.current();
-		this._currentBatch.push(this._projection ? this.documentFromBsonDeps(next, this._dependencies) : next);
+		// grab the matching document
+		var documentObj;
+//		if (this.canUseCoveredIndex()) { ...  Dont need any of this, I think
 
-		if (this._limit) {
-			if (++this._docsAddedToBatches == this._limit.getLimit())
-				break;
+		documentObj = this.cursor().current();
+		this.current = documentObj;
+		this.cursor().advance();
+		return;
+	}
 
-			if (this._docsAddedToBatches >= this._limit.getLimit()) {
-				throw new Error("added documents to the batch over limit size");
-			}
-		}
+	// If we got here, there aren't any more documents.
+	// The CursorWithContext (and its read lock) must be released, see SERVER-6123.
+	this.dispose();
+	this.current = null;
+};
 
-		// Mongo uses number of bytes, but that doesn't make sense here. Yield when nDocs is over a threshold
-		if (nDocs > MAX_BATCH_DOCS) {
-			this._curIdx++; // advance the deque
-			nDocs++;
-			return;
-		}
+proto.cursor = function cursor(){
+	if( this._cursorWithContext && this._cursorWithContext._cursor){
+		return this._cursorWithContext._cursor;
 	}
+	throw new Error("cursor not defined");
 };
+
+//proto.chunkMgr = function chunkMgr(){};
+
+//proto.canUseCoveredIndex = function canUseCoveredIndex(){};
+
+//proto.yieldSometimes = function yieldSometimes(){};

+ 59 - 112
lib/pipeline/documentSources/DocumentSource.js

@@ -66,12 +66,33 @@ proto.getPipelineStep = function getPipelineStep() {
 	return this.step;
 };
 
+/**
+ * Is the source at EOF?
+ * @method	eof
+ **/
+proto.eof = function eof() {
+	throw new Error("not implemented");
+};
+
+/**
+ * Advance the state of the DocumentSource so that it will return the next Document.
+ * The default implementation returns false, after checking for interrupts.
+ * Derived classes can call the default implementation in their own implementations in order to check for interrupts.
+ *
+ * @method	advance
+ * @returns	{Boolean}	whether there is another document to fetch, i.e., whether or not getCurrent() will succeed.  This default implementation always returns false.
+ **/
+proto.advance = function advance() {
+	//pExpCtx->checkForInterrupt(); // might not return
+	return false;
+};
+
 /**
  * some implementations do the equivalent of verify(!eof()) so check eof() first
- * @method	getNExt
+ * @method	getCurrent
  * @returns	{Document}	the current Document without advancing
  **/
-proto.getNext = function getNext(callback) {
+proto.getCurrent = function getCurrent() {
 	throw new Error("not implemented");
 };
 
@@ -115,9 +136,10 @@ proto.getSourceName = function getSourceName() {
  * @method	setSource
  * @param	{DocumentSource}	source	the underlying source to use
  **/
-proto.setSource = function setSource(theSource) {
+proto.setSource = function setSource(theSource, callback) {
 	if (this.source) throw new Error("It is an error to set the source more than once");
 	this.source = theSource;
+	if (callback) return setTimeout(callback, 0);
 };
 
 /**
@@ -196,117 +218,42 @@ klass.depsToProjection = function depsToProjection(deps) {
 	return bb;
 };
 
-proto._serialize = function _serialize(explain) {
-	throw new Error("not implemented");
-};
-
-proto.serializeToArray = function serializeToArray(array, explain) {
-	var entry = this.serialize(explain);
-	if (!entry) {
-		array.push(entry);
-	}
-};
-
-proto.depsToProjection = function depsToProjection(deps) {
-	var needId = false,
-		last,
-		bb = {};
-
-	for (var i = 0; i < deps.length; i++) {
-		var it = deps[i];
-		if (it.starsWith('_id') && (it.length === 3 || it[3] === '.')) {
-			needId = true;
-			continue;
-		} else {
-			if (!last && it.starsWith(last)) {
-                // we are including a parent of *it so we don't need to include this field
-                // explicitly. In fact, due to SERVER-6527 if we included this field, the parent
-                // wouldn't be fully included.  This logic relies on on set iterators going in
-                // lexicographic order so that a string is always directly before of all fields it
-                // prefixes.
-				continue;
-			}
-			last = it + '.';
-			bb[it] = 1;
-		}
-	}
-
-	if (needId) // we are explicit either way
-		bb._id = 1;
-	else
-		bb._id = 0;
-
-	return JSON.stringy(bb);
+/**
+ * Add the DocumentSource to the array builder.
+ * The default implementation calls sourceToJson() in order to
+ * convert the inner part of the object which will be added to the
+ * array being built here.
+ *
+ * @method	addToJsonArray
+ * @param	{Array} pBuilder	JSONArrayBuilder: the array builder to add the operation to.
+ * @param	{Boolean}	explain	create explain output
+ * @returns	{Object}
+ **/
+proto.addToJsonArray = function addToJsonArray(pBuilder, explain) {
+	pBuilder.push(this.sourceToJson({}, explain));
 };
 
-// Taken as a whole, these three functions should produce the same output document given the
-// same deps set as mongo::Projection::transform would on the output of depsToProjection. The
-// only exceptions are that we correctly handle the case where no fields are needed and we don't
-// need to work around the above mentioned bug with subfields of _id (SERVER-7502). This is
-// tested in a DEV block in DocumentSourceCursor::findNext().
-//
-// Output from this function is input for the next two
-//
-// ParsedDeps is a simple recursive look-up table. For each field in a ParsedDeps:
-//      If the value has type==Bool, the whole field is needed
-//      If the value has type==Object, the fields in the subobject are needed
-//      All other fields should be missing which means not needed
-// DocumentSource::ParsedDeps DocumentSource::parseDeps(const set<string>& deps) {
-//  MutableDocument md;
-proto.parseDeps = function parseDeps(deps) {
-	var doc,
-		last;
-
-	for (var i = 0; i < deps.length; i++) {
-		var it = deps[i];
-		if (!last && it.startsWith(last)) {
-			// we are including a parent of *it so we don't need to include this field
-			// explicitly. In fact, if we included this field, the parent wouldn't be fully
-			// included.  This logic relies on on set iterators going in lexicographic order so
-			// that a string is always directly before of all fields it prefixes.
-			continue;
-		}
-		last = it + '.';
-		Object.setAtPath(doc, it, true);
-	}
-
-	return doc;
+/**
+ * Create an object that represents the document source.  The object
+ * will have a single field whose name is the source's name.  This
+ * will be used by the default implementation of addToJsonArray()
+ * to add this object to a pipeline being represented in JSON.
+ *
+ * @method	sourceToJson
+ * @param	{Object} pBuilder	JSONObjBuilder: a blank object builder to write to
+ * @param	{Boolean}	explain	create explain output
+ **/
+proto.sourceToJson = function sourceToJson(pBuilder, explain) {
+	throw new Error("not implemented");
 };
 
-proto.documentFromBsonWithDeps = function documentFromBsonWithDeps(obj, deps) {
-	var doc = {},
-		self = this;
-
-	var arrayHelper = function(field, isNeeded) {
-		return field.map(function(f) {
-			self.documentFromBsonWithDeps(f, isNeeded);
-		});
-	};
-
-	for (var i = 0; i < obj.keys().length; i++) {
-		var fieldName = obj.keys()[i],
-			field = obj[fieldName],
-			isNeeded = deps[fieldName];
-
-		if (!isNeeded)
-			continue;
-
-		if (typeof isNeeded === Boolean) {
-			Object.setAtPath(doc, fieldName, field);
-			continue;
-		}
-
-		if (!(isNeeded instanceof Object))
-			throw new Error("dependencies missing for object");
-
-		if (field instanceof Array)
-			Object.setAtPath(doc, fieldName, arrayHelper(field, isNeeded));
-
-		if (field instanceof Object) { // everything is...
-			var sub = this.documentFromBsonWithDeps(field, isNeeded);
-			Object.setAtPath(doc, fieldName, sub);
-		}
-	}
-
-	return doc;
+/**
+ * Convert the DocumentSource instance to it's JSON Object representation; Used by the standard JSON.stringify() function
+ * @method toJSON
+ * @return {String} a JSON-encoded String that represents the DocumentSource
+ **/
+proto.toJSON = function toJSON(){
+	var obj = {};
+	this.sourceToJson(obj);
+	return obj;
 };

+ 54 - 47
test/lib/pipeline/documentSources/CursorDocumentSource.js

@@ -1,6 +1,5 @@
 "use strict";
 var assert = require("assert"),
-	async = require("async"),
 	CursorDocumentSource = require("../../../../lib/pipeline/documentSources/CursorDocumentSource"),
 	Cursor = require("../../../../lib/Cursor");
 
@@ -18,71 +17,79 @@ module.exports = {
 			"should get a accept a CursorWithContext and set it internally": function(){
 				var cwc = new CursorDocumentSource.CursorWithContext();
 				cwc._cursor = new Cursor( [] );
-
+				
 				var cds = new CursorDocumentSource(cwc);
-
+				
 				assert.ok(cds._cursorWithContext);
 			}
 		},
 
-		"#getNext": {
-			"should return the current cursor value sync": function(){
+		"#eof": {
+			"should return true if the cursor is empty": function(){
 				var cwc = new CursorDocumentSource.CursorWithContext();
-				cwc._cursor = new Cursor( [1,2,3,4] );
-
+				cwc._cursor = new Cursor( [] );
+				
 				var cds = new CursorDocumentSource(cwc);
-				assert.equal(cds.getNext(), 1);
-				assert.equal(cds.getNext(), 2);
-				assert.equal(cds.getNext(), 3);
-				assert.equal(cds.getNext(), 4);
-				assert.equal(cds.getNext(), undefined);
+				
+				assert.equal(cds.eof(), true);
 			},
-			"should return the current cursor value async": function(next){
+			"should return false if the cursor is non-empty": function(){
 				var cwc = new CursorDocumentSource.CursorWithContext();
-				cwc._cursor = new Cursor( [1,2,3,4] );
-
+				cwc._cursor = new Cursor( [1,2,3] );
+				
 				var cds = new CursorDocumentSource(cwc);
-				cds.getNext(function(val) {
-					assert.equal(val, 1);
-					cds.getNext(function(val) {
-						assert.equal(val, 2);
-						cds.getNext(function(val) {
-							assert.equal(val, 3);
-							cds.getNext(function(val) {
-								assert.equal(val, 4);
-								cds.getNext(function(val) {
-									assert.equal(val, undefined);
-									return next();
-								});
-							});
-						});
-					});
-				});
-			},
-			"should return values past the batch limit": function(){
-				var cwc = new CursorDocumentSource.CursorWithContext(),
-					n = 0,
-					arr = Array.apply(0, new Array(200)).map(function() { return n++; });
-				cwc._cursor = new Cursor( arr );
-
+				
+				assert.equal(cds.eof(), false);
+			}
+		},
+		"#advance": {
+			"should return true if the cursor was advanced": function(){
+				var cwc = new CursorDocumentSource.CursorWithContext();
+				cwc._cursor = new Cursor( [1,2,3] );
+				
 				var cds = new CursorDocumentSource(cwc);
-				arr.forEach(function(v) {
-					assert.equal(cds.getNext(), v);
-				});
-				assert.equal(cds.getNext(), undefined);
+				
+				assert.equal(cds.advance(), true);
 			},
+			"should return false if the cursor is empty": function(){
+				var cwc = new CursorDocumentSource.CursorWithContext();
+				cwc._cursor = new Cursor( [1,2,3] );
+				
+				var cds = new CursorDocumentSource(cwc);
+				cds.advance();cds.advance();cds.advance();
+				assert.equal(cds.advance(), false);
+			}
+		},
+		"#getCurrent": {
+			"should return the current cursor value": function(){
+				var cwc = new CursorDocumentSource.CursorWithContext();
+				cwc._cursor = new Cursor( [1,2,3,4] );
+				
+				var cds = new CursorDocumentSource(cwc);
+				assert.equal(cds.getCurrent(), 1);
+				cds.advance();
+				assert.equal(cds.getCurrent(), 2);
+				cds.advance();
+				assert.equal(cds.getCurrent(), 3);
+				cds.advance();
+				assert.equal(cds.getCurrent(), 4);
+				cds.advance();
+				assert.equal(cds.getCurrent(), undefined);
+			}
 		},
 		"#dispose": {
 			"should empty the current cursor": function(){
 				var cwc = new CursorDocumentSource.CursorWithContext();
 				cwc._cursor = new Cursor( [1,2,3] );
-
+				
 				var cds = new CursorDocumentSource(cwc);
-				assert.equal(cds.getNext(), 1);
-				assert.equal(cds.getNext(), 2);
-
+				assert.equal(cds.getCurrent(), 1);
+				cds.advance();
+				assert.equal(cds.getCurrent(), 2);
+				
 				cds.dispose();
-				assert.equal(cds.getNext(), undefined);
+				assert.equal(cds.advance(), false);
+				assert.equal(cds.eof(), true);
 			}
 		}