Browse Source

EAGLESIX-4483: rm async doc src stuff for testing

Kyle P Davis 10 years ago
parent
commit
23a4fe2c62

+ 4 - 4
lib/AggregationCursor.js

@@ -21,11 +21,11 @@ var AggregationCursor = module.exports = function(pipelineInst) {
 proto.toArray = function(callback) {
 	var batch = [],
 		isAsync = typeof callback === "function";
+	if(!isAsync) return this.pipelineInst.run().result;
 	this.pipelineInst.run(isAsync, function(err, doc) {
-		if (err && callback) return callback(err), callback = undefined;
-		if (err && !callback) throw err;
-		if (doc === null && callback) return callback(null, batch), callback = undefined;
-		else if (doc !== null) batch.push(doc);
+		if (err) if (callback) return callback(err); else throw err;
+		if (doc !== null) return batch.push(doc);
+		if (callback) return callback(null, batch);
 	});
 	if (!callback) return batch;
 };

+ 11 - 31
lib/pipeline/Pipeline.js

@@ -407,41 +407,21 @@ proto.stitch = function stitch() {
 /**
  * Run the pipeline
  * @method run
- * @param [isAsync] {Boolean} whether or not to use setImmediate to force async calls (to avoid stack overflows)
- * @param callback {Function} gets called once for each document result from the pipeline
  */
-proto.run = function run(isAsync, callback) {
-	if (typeof isAsync === "function") callback = isAsync, isAsync = true;
-
+proto.run = function run() {
 	// should not get here in the explain case
 	if (this.explain) return callback(new Error("Assertion error: don't run pipeline in explain mode"));
 
-	var nextDoc = null,
-		finalSource = this.sources[this.sources.length - 1];
-
-	async.doWhilst(
-		function iterator(next) {
-			return finalSource.getNext(function(err, doc) {
-				if (isAsync) {
-					async.setImmediate(function() {
-						nextDoc = doc;
-						callback(err, nextDoc);
-						next(err);
-					});
-				} else { // sync mode; only for small sets, stack overflow on large sets
-					nextDoc = doc;
-					callback(err, nextDoc);
-					next(err);
-				}
-			});
-		},
-		function test() {
-			return nextDoc !== null;
-		},
-		function done(err) {
-			//nothing to do here
-		}
-	);
+	var resultArray = [],
+		finalSource = this.sources[this.sources.length - 1],
+		next;
+
+	while ((next = finalSource.getNext())) {
+		resultArray.push(next);
+	}
+	return {
+		result: resultArray
+	};
 };
 
 /**

+ 50 - 75
lib/pipeline/documentSources/CursorDocumentSource.js

@@ -44,27 +44,22 @@ proto.getSourceName = function getSourceName() {
 	return "$cursor";
 };
 
-proto.getNext = function getNext(callback) {
-	if (this.expCtx && this.expCtx.checkForInterrupt && this.expCtx.checkForInterrupt()) {
-		return callback(new Error("Interrupted"));
-	}
+proto.getNext = function getNext() {
+	if (this.expCtx && this.expCtx.checkForInterrupt) this.expCtx.checkForInterrupt();
 
-	var self = this;
-	if (self._currentBatchIndex >= self._currentBatch.length) {
-		self._currentBatchIndex = 0;
-		self._currentBatch = [];
-		return self._loadBatch(function(err){
-			if (err) return callback(err);
+	if (this._currentBatchIndex >= this._currentBatch.length) {
+		this._currentBatchIndex = 0;
+		this._currentBatch = [];
 
-			if (self._currentBatch.length === 0) // exhausted the cursor
-				return callback(null, null);
+		this._loadBatch();
 
-			var out = self._currentBatch[self._currentBatchIndex];
-			self._currentBatchIndex++;
-			return callback(null, out);
-		});
+		if (this._currentBatch.length === 0)
+			return null;
 	}
-	return callback(null, self._currentBatch[self._currentBatchIndex++]);
+
+	var out = this._currentBatch[this._currentBatchIndex];
+	this._currentBatchIndex++;
+	return out;
 };
 
 proto.dispose = function dispose() {
@@ -74,73 +69,53 @@ proto.dispose = function dispose() {
 	this._currentBatch = [];
 };
 
-proto._loadBatch = function _loadBatch(callback) {
+proto._loadBatch = function _loadBatch() {
 	if (!this._runner) {
 		this.dispose();
-		return callback();
+		return;
 	}
 
 	this._runner.restoreState();
 
-	var self = this,
-		whileShouldBreak = false, // mimic while loop break in async land
-		whileShouldReturn = false; // mimic while loop return in async land
-	return async.whilst(
-		function test() {
-			return !whileShouldBreak && !whileShouldReturn;
-		},
-		function(next) {
-			return self._runner.getNext(function(err, obj, state){
-				if (err) return next(err);
-
-				//NOTE: DEVIATION FROM MONGO: they check state in the loop condition we check it inside (due to async stuff)
-				if (state !== Runner.RunnerState.RUNNER_ADVANCED) return whileShouldBreak = true, next();
-
-				if (self._dependencies) {
-					self._currentBatch.push(self._dependencies.extractFields(obj));
-				} else {
-					self._currentBatch.push(obj);
-				}
-
-				if (self._limit) {
-					if (++self._docsAddedToBatches === self._limit.getLimit()) {
-						return whileShouldBreak = true, next();
-					}
-					if (self._docsAddedToBatches > self._limit.getLimit()) return next(new Error("Assertion failure: end of limit"));
-				}
-
-				var memUsageDocs = self._currentBatch.length;
-				if (memUsageDocs >= klass.MaxDocumentsToReturnToClientAtOnce) {
-					// End self batch and prepare Runner for yielding.
-					self._runner.saveState();
-					return whileShouldReturn = true, next();
-				}
-
-				return next();
-			});
-		},
-		function(err) {
-			if (whileShouldReturn) {
-				return async.nextTick(function() {
-					callback(err);
-				});
-			}
+	var obj;
+	while ((obj = this._runner.getNext()) && this._runner._state === Runner.RunnerState.RUNNER_ADVANCED) {
+
+		if (this._dependencies) {
+			this._currentBatch.push(this._dependencies.extractFields(obj));
+		} else {
+			this._currentBatch.push(obj);
+		}
 
-			// If we got here, there won't be any more documents, so destroy the runner. Can't use
-			// dispose since we want to keep the _currentBatch.
-			self._runner = undefined;
-
-			//NOTE: DEVIATION FROM MONGO: to ensure that the callstack does not get too large if the Runner does things syncronously
-			if (self._firstRun || !self._currentBatch.length) {
-				self._firstRun = false;
-				callback(err);
-			} else {
-				return async.nextTick(function() {
-					callback(err);
-				});
+		if (this._limit) {
+			if (++this._docsAddedToBatches === this._limit.getLimit()) {
+				break;
 			}
+			if (this._docsAddedToBatches > this._limit.getLimit()) return new Error("Assertion failure: end of limit");
+		}
+
+		var memUsageDocs = this._currentBatch.length;
+
+		if (memUsageDocs >= klass.MaxDocumentsToReturnToClientAtOnce) {
+			// End self batch and prepare Runner for yielding.
+			this._runner.saveState();
+			return;
 		}
-	);
+	}
+	var state = this._runner._state;
+
+	// If we got here, there won't be any more documents, so destroy the runner. Can't use
+	// dispose since we want to keep the _currentBatch.
+	this._runner = undefined;
+
+	if (state === Runner.RunnerState.RUNNER_DEAD)
+		throw new Error("collection or index disappeared when cursor yielded; uassert code 16028");
+
+	if (state === Runner.RunnerState.RUNNER_ERROR)
+		throw new Error("cursor encountered an error; uassert code 17285");
+
+	if (state !== Runner.RunnerState.RUNNER_EOF && state !== Runner.RunnerState.RUNNER_ADVANCED){
+		throw new Error("Unexpected return from Runner::getNext " + JSON.stringify(state) + "; massert code 17286");
+	}
 };
 
 proto.setSource = function setSource(theSource) {

+ 1 - 22
lib/pipeline/documentSources/DocumentSource.js

@@ -73,7 +73,7 @@ proto.getPipelineStep = function getPipelineStep() {
  * @method	getNext
  * @returns	{Document}	the current Document without advancing
  **/
-proto.getNext = function getNext(callback) {
+proto.getNext = function getNext() {
 	throw new Error("not implemented");
 };
 
@@ -183,24 +183,3 @@ proto.serializeToArray = function serializeToArray(array, explain) {
 		array.push(entry);
 	}
 };
-
-/**
- * A function compatible as a getNext for document sources.
- * Does nothing except pass the documents through. To use,
- * Attach this function on a DocumentSource prototype.
- *
- * @method GET_NEXT_PASS_THROUGH
- * @param callback {Function}
- * @param callback.err {Error} An error or falsey
- * @param callback.doc {Object} The source's next object or null
- **/
-klass.GET_NEXT_PASS_THROUGH = function GET_NEXT_PASS_THROUGH(callback) {
-	if (!callback) throw new Error(this.getSourceName() + " #getNext() requires callback");
-
-	var out;
-	this.source.getNext(function(err, doc) {
-		out = doc;
-		return callback(err, doc);
-	});
-	return out; // For the sync people in da house
-};

+ 3 - 1
lib/pipeline/documentSources/GeoNearDocumentSource.js

@@ -32,7 +32,9 @@ proto.getSourceName = function() {
 	return klass.geoNearName;
 };
 
-proto.getNext = DocumentSource.GET_NEXT_PASS_THROUGH;
+proto.getNext = function getNext() {
+	throw new Error("Not implemented yet");
+};
 
 proto.setSource = function(docSource) {
 	throw new Error("code 16602; $geoNear is only allowed as the first pipeline stage");

+ 68 - 103
lib/pipeline/documentSources/GroupDocumentSource.js

@@ -83,51 +83,29 @@ proto.getSourceName = function getSourceName() {
  * @method getNext
  * @return {Object}
  **/
-proto.getNext = function getNext(callback) {
-	if (!callback) throw new Error(this.getSourceName() + " #getNext() requires callback.");
-	if (this.expCtx.checkForInterrupt && this.expCtx.checkForInterrupt() === false)
-		return callback(new Error("Interrupted"));
+proto.getNext = function getNext() {
+	if (this.expCtx && this.expCtx.checkForInterrupt) this.expCtx.checkForInterrupt();
 
-	var self = this;
-	async.series([
-		function(next) {
-			if (!self.populated)
-				self.populate(function(err) {
-					return next(err);
-				});
-			else
-				return next();
-		},
-		function(next) {
-			// NOTE: Skipped the spilled functionality
-			if (self.spilled) {
-				throw new Error("Spilled is not implemented.");
-			} else {
-				if(self.currentGroupsKeysIndex === self.groupsKeys.length) {
-					return next(null, null);
-				}
+	if (!this.populated)
+		this.populate();
 
-				var out;
-				try {
-					var id = self.originalGroupsKeys[self.currentGroupsKeysIndex],
-						stringifiedId = self.groupsKeys[self.currentGroupsKeysIndex],
-						accumulators = self.groups[stringifiedId];
+	// NOTE: Skipped the spilled functionality
+	if (this.spilled) {
+		throw new Error("Spilled is not implemented.");
+	} else {
+		if (this.currentGroupsKeysIndex === this.groupsKeys.length)
+			return null;
 
-					out = self.makeDocument(id, accumulators, self.expCtx.inShard);
+		var id = this.originalGroupsKeys[this.currentGroupsKeysIndex],
+			stringifiedId = this.groupsKeys[this.currentGroupsKeysIndex],
+			accumulators = this.groups[stringifiedId],
+			out = this.makeDocument(id, accumulators, this.expCtx.inShard);
 
-					if(++self.currentGroupsKeysIndex === self.groupsKeys.length) {
-						self.dispose();
-					}
-				} catch (ex) {
-					return next(ex);
-				}
+		if (++this.currentGroupsKeysIndex === this.groupsKeys.length)
+			this.dispose();
 
-				return next(null, out);
-			}
-		}
-	], function(err, results) {
-		callback(err, results[1]);
-	});
+		return out;
+	}
 };
 
 /**
@@ -289,80 +267,67 @@ klass.createFromJson = function createFromJson(elem, expCtx) { //jshint maxcompl
  * Populates the GroupDocumentSource by grouping all of the input documents at once.
  *
  * @method populate
- * @param callback {Function} Required. callback(err) when done populating.
  * @async
  **/
-proto.populate = function populate(callback) {
+proto.populate = function populate() {
 	var numAccumulators = this.accumulatorFactories.length;
-	// NOTE: this is not in mongo, does it belong here?
-	if(numAccumulators !== this.expressions.length) {
-		callback(new Error("Must have equal number of accumulators and expressions"));
-	}
+	if (numAccumulators !== this.expressions.length) throw new Error("dassert: Must have equal number of accumulators and expressions");
 
-	var input,
-		self = this;
-	async.whilst(
-		function() {
-			return input !== null;
-		},
-		function(cb) {
-			self.source.getNext(function(err, doc) {
-				if(err) return cb(err);
-				if(doc === null) {
-					input = doc;
-					return cb(); //Need to stop now, no new input
-				}
-				try {
-					input = doc;
-					self.variables.setRoot(input);
-
-					/* get the _id value */
-					var id = self.computeId(self.variables);
-
-					if(undefined === id) id = null;
-
-					var groupKey = JSON.stringify(id),
-						group = self.groups[groupKey];
-
-					if(!group) {
-						self.originalGroupsKeys.push(id);
-						self.groupsKeys.push(groupKey);
-						group = [];
-						self.groups[groupKey] = group;
-						// Add the accumulators
-						for(var afi = 0; afi<self.accumulatorFactories.length; afi++) {
-							group.push(new self.accumulatorFactories[afi]());
-						}
-					}
-					//NOTE: Skipped memory usage stuff for case when group already existed
+	// SKIPPED: DEVIATION FROM MONGO: spill and mem usage stuff
+	// pushed to on spill()
+	//var sortedFiles;
+	//int memoryUsageBytes = 0;
 
-					if(numAccumulators !== group.length) {
-						throw new Error("Group must have one of each accumulator");
-					}
+	// This loop consumes all input from pSource and buckets it based on pIdExpression.
+	var input;
+	while ((input = this.source.getNext())) {
+		// SKIPPED: DEVIATION FROM MONGO: mem usage stuff
 
-					//NOTE: passing the input to each accumulator
-					for(var gi=0; gi<group.length; gi++) {
-						group[gi].process(self.expressions[gi].evaluate(self.variables, self.doingMerge));
-					}
+		this.variables.setRoot(input);
 
-					// We are done with the ROOT document so release it.
-					self.variables.clearRoot();
+		// get the _id value
+		var id = this.computeId(this.variables);
 
-					//NOTE: Skipped the part about sorted files
-				} catch (ex) {
-					return cb(ex);
-				}
-				return cb();
-			});
-		},
-		function(err) {
-			if(err) return callback(err);
+		// treat missing values the same as NULL SERVER-4674
+		if (id === undefined)
+			id = null;
 
-			self.populated = true;
+		// Look for the _id value in the map; if it's not there, add a
+		// new entry with a blank accumulator.
+		var groupKey = JSON.stringify(id),
+			group = this.groups[groupKey];
 
-			return callback();
+		if (!group) {
+			this.originalGroupsKeys.push(id);
+			this.groupsKeys.push(groupKey);
+			group = [];
+			this.groups[groupKey] = group;
+
+			// Add the accumulators
+			for (var afi = 0, afl = this.accumulatorFactories.length; afi < afl; afi++) {
+				group.push(new this.accumulatorFactories[afi]());
+			}
+		}
+		// else {
+		// NOTE: Skipped memory usage stuff for case when group already existed
+		// }
+
+		// tickle all the accumulators for the group we found
+		if (numAccumulators !== group.length) throw new Error("Group must have one of each accumulator");
+		for (var gi = 0, gl = group.length; gi < gl; gi++) {
+			group[gi].process(this.expressions[gi].evaluate(this.variables, this.doingMerge));
 		}
-	);
+
+		// We are done with the ROOT document so release it.
+		this.variables.clearRoot();
+
+		//NOTE: DEVIATION: skipped dev stuff
+
+	}
+
+	//NOTE: DEVIATION: skipped the part about sorted files
+
+	this.populated = true;
 };
 
 /**

+ 7 - 12
lib/pipeline/documentSources/LimitDocumentSource.js

@@ -44,24 +44,19 @@ proto.coalesce = function coalesce(nextSource) {
 	return true;
 };
 
-/* Returns the execution of the callback against
-* the next documentSource
-* @param {function} callback
-* @return {bool} indicating end of document reached
-*/
-proto.getNext = function getNext(callback) {
-	if (!callback) throw new Error(this.getSourceName() + " #getNext() requires callback");
-
-	if (this.expCtx instanceof Object && this.expCtx.checkForInterrupt && this.expCtx.checkForInterrupt() === false)
-		return callback(new Error("Interrupted"));
+/**
+ * Returns the next Documnet if there is one or null if at EOF
+ * @method getNext
+ */
+proto.getNext = function getNext() {
+	if (this.expCtx && this.expCtx.checkForInterrupt) this.expCtx.checkForInterrupt();
 
 	if (++this.count > this.limit) {
 		this.source.dispose();
-		callback(null, null);
 		return null;
 	}
 
-	return this.source.getNext(callback);
+	return this.source.getNext();
 };
 
 /**

+ 14 - 41
lib/pipeline/documentSources/MatchDocumentSource.js

@@ -36,56 +36,29 @@ proto.getSourceName = function getSourceName(){
 	return klass.matchName;
 };
 
-proto.getNext = function getNext(callback) {
-	if (!callback) throw new Error(this.getSourceName() + " #getNext() requires callback");
+proto.getNext = function getNext() {
+	if (this.expCtx && this.expCtx.checkForInterrupt) this.expCtx.checkForInterrupt();
 
-	if (this.expCtx.checkForInterrupt && this.expCtx.checkForInterrupt() === false) {
-		return callback(new Error("Interrupted"));
+	// The user facing error should have been generated earlier.
+	if (this._isTextQuery) throw new Error("Should never call getNext on a $match stage with $text clause; massert code 17309");
+
+	var next;
+	while((next = this.source.getNext())) {
+		if (this.matcher.matches(next))
+			return next;
 	}
 
-	var self = this,
-		next,
-		test = function test(doc) {
-			return self.matcher.matches(doc);
-		},
-		makeReturn = function makeReturn(doc) {
-			if(doc !== null && test(doc)) { // Passes the match criteria
-				return doc;
-			} else if(doc === null){ // Got EOF
-				return doc;
-			}
-			return undefined; // Didn't match, but not EOF
-		};
-	async.doUntil(
-		function(cb) {
-			self.source.getNext(function(err, doc) {
-				if(err) return cb(err);
-				try {
-					if (makeReturn(doc) !== undefined) {
-						next = doc;
-					}
-				} catch (ex) {
-					return cb(ex);
-				}
-				return cb();
-			});
-		},
-		function() {
-			var foundDoc = (next === null || next !== undefined);
-			return foundDoc; //keep going until doc is found
-		},
-		function(err) {
-			return callback(err, next);
-		}
-	);
-	return next;
+	// Nothing matched
+	return null;
 };
 
 proto.coalesce = function coalesce(nextSource) {
 	if (!(nextSource instanceof MatchDocumentSource))
 		return false;
 
-	this.matcher = new matcher({"$and": [this.getQuery(), nextSource.getQuery()]});
+	this.matcher = new matcher({
+		$and: [this.getQuery(), nextSource.getQuery()]
+	});
 
 	return true;
 };

+ 3 - 1
lib/pipeline/documentSources/OutDocumentSource.js

@@ -24,7 +24,9 @@ proto.getSourceName = function() {
 	return klass.outName;
 };
 
-proto.getNext = DocumentSource.GET_NEXT_PASS_THROUGH;
+proto.getNext = function() {
+	throw new Error("not implemented yet");
+};
 
 proto.serialize = function(explain) {
 	var doc = {},