浏览代码

EAGLESIX-4483: rm async doc src stuff for testing 2

Kyle P Davis 10 年之前
父节点
当前提交
dc68d8f8af

+ 25 - 26
lib/pipeline/documentSources/ProjectDocumentSource.js

@@ -32,32 +32,31 @@ proto.getSourceName = function getSourceName() {
 	return klass.projectName;
 	return klass.projectName;
 };
 };
 
 
-proto.getNext = function getNext(callback) {
-	if (!callback) throw new Error(this.getSourceName() + " #getNext() requires callback");
-
-	var self = this;
-
-	return this.source.getNext(function(err, input) {
-		if (err) return callback(err);
-		if (input === null) return callback(null, null);
-
-		var out = {};
-		/**
-		 * Use the ExpressionObject to create the base result.
-		 *
-		 * If we're excluding fields at the top level, leave out the _id if
-		 * it is found, because we took care of it above.
-		 */
-		try {
-			self._variables.setRoot(input);
-			self.OE.addToDocument(out, input, self._variables);
-			self._variables.clearRoot();
-		} catch (ex){
-			return callback(ex);
-		}
-
-		return callback(null, out);
-	});
+proto.getNext = function getNext() {
+	if (this.expCtx && this.expCtx.checkForInterrupt) this.expCtx.checkForInterrupt();
+
+	var input = this.source.getNext();
+	if (!input)
+		return null;
+
+	// create the result document
+	//var sizeHint = 0
+	var out = {};
+	//Document.copyMetaDataFrom(out);
+
+	/**
+	 * Use the ExpressionObject to create the base result.
+	 *
+	 * If we're excluding fields at the top level, leave out the _id if
+	 * it is found, because we took care of it above.
+	 */
+	this._variables.setRoot(input);
+	this.OE.addToDocument(out, input, this._variables);
+	this._variables.clearRoot();
+
+	//NOTE: DEVIATION: skipped debug bit here
+
+	return out;
 };
 };
 
 
 /**
 /**

+ 12 - 34
lib/pipeline/documentSources/RedactDocumentSource.js

@@ -31,40 +31,18 @@ var DESCEND_VAL = "descend",
 	PRUNE_VAL = "prune",
 	PRUNE_VAL = "prune",
 	KEEP_VAL = "keep";
 	KEEP_VAL = "keep";
 
 
-proto.getNext = function getNext(callback) {
-	var self = this,
-		doc;
-	async.whilst(
-		function() {
-			return doc !== null;
-		},
-		function(cb) {
-			self.source.getNext(function(err, input) {
-				doc = input;
-				if (input === null)
-					return cb();
-				var result;
-				try {
-					self._variables.setRoot(input);
-					self._variables.setValue(self._currentId, input);
-					result = self.redactObject();
-				} catch (ex) {
-					return cb(ex);
-				}
-				if (result !== null)
-					return cb(result); //Using the err argument to pass the result document; this lets us break out without having EOF
-				return cb();
-			});
-		},
-		function(doc) {
-			if (doc){
-				if (doc instanceof Error) return callback(doc);
-				else return callback(null, doc);
-			}
-			return callback(null, null);
-		}
-	);
-	return doc;
+proto.getNext = function getNext() {
+	// if (this.expCtx && this.expCtx.checkForInterrupt) this.expCtx.checkForInterrupt();
+
+	var input, result;
+	while ((input = this.source.getNext())) {
+		this._variables.setRoot(input);
+		this._variables.setValue(this._currentId, input);
+		if ((result = this.redactObject()))
+			return result;
+	}
+
+	return null;
 };
 };
 
 
 proto.redactValue = function redactValue(input) {
 proto.redactValue = function redactValue(input) {

+ 10 - 36
lib/pipeline/documentSources/SkipDocumentSource.js

@@ -57,47 +57,21 @@ proto.coalesce = function coalesce(nextSource) {
 };
 };
 
 
 /**
 /**
- * Get next source.
- *
- * @param callback
- * @returns {*}
+ * Returns the next Documnet if there is one or null if at EOF
+ * @method getNext
  */
  */
-proto.getNext = function getNext(callback) {
-	if (!callback) {
-		throw new Error(this.getSourceName() + " #getNext() requires callback.");
-	}
-
-	if (this.expCtx.checkForInterrupt && this.expCtx.checkForInterrupt() === false) {
-		return callback(new Error("Interrupted"));
-	}
-
-	var self = this,
-		next;
+proto.getNext = function getNext() {
+	if (this.expCtx && this.expCtx.checkForInterrupt) this.expCtx.checkForInterrupt();
 
 
-	if (this.needToSkip) { // May be unnecessary.
+	if (this.needToSkip) {
 		this.needToSkip = false;
 		this.needToSkip = false;
-
-		async.doWhilst(
-			function (cb) {
-				self.source.getNext(function (err, val) {
-					if (err) { return cb(err); }
-
-					++self.count;
-					next = val;
-
-					return cb();
-				});
-			},
-			function() {
-				return self.count < self.skip || next === null;
-			},
-			function (err) {
-				if (err) { return callback(err); }
-			}
-		);
+		for (var i = 0; i < this._skip; i++) {
+			if (!this.source.getNext())
+				return null;
+		}
 	}
 	}
 
 
-	return this.source.getNext(callback);
+	return this.source.getNext();
 };
 };
 
 
 /**
 /**

+ 21 - 79
lib/pipeline/documentSources/SortDocumentSource.js

@@ -72,48 +72,17 @@ proto.coalesce = function coalesce(nextSource) {
 	}
 	}
 };
 };
 
 
-proto.getNext = function getNext(callback) {
-	if (!callback) throw new Error(this.getSourceName() + " #getNext() requires callback");
-
-	if (this.expCtx instanceof Object && this.expCtx.checkForInterrupt && this.expCtx.checkForInterrupt() === false)
-		return callback(new Error("Interrupted"));
-
-	var self = this,
-		out;
-	async.series(
-		[
-			function(next) {
-				if (!self.populated)
-				{
-					self.populate(function(err) {
-						return next(err);
-					});
-				} else {
-					return next();
-				}
-			},
-			function(next) {
-				if (self.docIterator >= self.documents.length) {
-					out = null;
-					return next(null, null);
-				}
-
-				var output = self.documents[self.docIterator++];
-				if (!output || output === null) {
-					out = null;
-					return next(null, null);
-				}
-
-				out = output;
-				return next(null, output);
-			}
-		],
-		function(err, results) {
-			return callback(err, out);
-		}
-	);
+proto.getNext = function getNext() {
+	if (this.expCtx && this.expCtx.checkForInterrupt) this.expCtx.checkForInterrupt();
+
+	if (!this.populated)
+		this.populate();
+
+	var output = this.documents[this.docIterator++];
+	if (!output)
+		return null;
 
 
-	return out;
+	return output;
 };
 };
 
 
 /**
 /**
@@ -184,48 +153,21 @@ proto.makeSortOptions = function makeSortOptions(){
 };
 };
 
 
 
 
-proto.populate = function populate(callback) {
-	if ( this._mergePresorted ){
-		// Skipping stuff about mergeCursors and commandShards
+proto.populate = function populate() {
+	if (this._mergePresorted) {
+		//NOTE: DEVIATION: skipping stuff about mergeCursors and commandShards
 		throw new Error("Merge presorted not implemented.");
 		throw new Error("Merge presorted not implemented.");
 	} else {
 	} else {
-		/* pull everything from the underlying source */
-		var self = this,
-			next;
-
-		async.doWhilst(
-			function (cb) {
-				self.source.getNext(function(err, doc) {
-					next = doc;
-
-					// Don't add EOF; it doesn't sort well.
-					if (doc !== null)
-						self.documents.push(doc);
-
-					return cb();
-				});
-			},
-			function() {
-				return next !== null;
-			},
-			function(err) {
-				try {
-					/* sort the list */
-					self.documents.sort(SortDocumentSource.prototype.compare.bind(self));
-				} catch (ex) {
-					return callback(ex);
-				}
-				/* start the sort iterator */
-				self.docIterator = 0;
-
-				self.populated = true;
-				//self._output.reset(true);
-				return callback();
+		var doc;
+		while((doc = this.source.getNext())) {
+			// Don't add EOF; it doesn't sort well.
+			if (doc !== null)
+				this.documents.push(doc);
 		}
 		}
-		);
-
-
+		// sort the list
+		this.documents.sort(SortDocumentSource.prototype.compare.bind(this));
 	}
 	}
+	this.docIterator = 0;
 	this.populated = true;
 	this.populated = true;
 };
 };
 
 

+ 13 - 53
lib/pipeline/documentSources/UnwindDocumentSource.js

@@ -99,62 +99,22 @@ proto.getSourceName = function getSourceName() {
  * @param callback
  * @param callback
  * @returns {*}
  * @returns {*}
  */
  */
-proto.getNext = function getNext(callback) {
-	if (!callback) {
-		throw new Error(this.getSourceName() + " #getNext() requires callback.");
-	}
-
-	if (this.expCtx.checkForInterrupt && this.expCtx.checkForInterrupt() === false) {
-		return callback(new Error("Interrupted"));
-	}
-
-	var self = this,
-		out,
-		exhausted = false;
-
-	try {
+proto.getNext = function getNext() {
+	if (this.expCtx && this.expCtx.checkForInterrupt) this.expCtx.checkForInterrupt();
+
+	var out = this._unwinder.getNext();
+	while (!out) {
+		// No more elements in array currently being unwound. This will loop if the input
+		// document is missing the unwind field or has an empty array.
+		var input = this.source.getNext();
+		if (!input)
+			return null; // input exhausted
+
+		// Try to extract an output document from the new input document.
+		this._unwinder.resetDocument(input);
 		out = this._unwinder.getNext();
 		out = this._unwinder.getNext();
-	} catch (ex) {
-		return callback(ex);
 	}
 	}
 
 
-	async.until(
-		function () {
-			if (out !== null || exhausted) {
-				return true;
-			}
-
-			return false;
-		},
-		function (cb) {
-			self.source.getNext(function (err, doc) {
-				if (err) {
-					return cb(err);
-				}
-
-				try {
-					if (doc === null) {
-						exhausted = true;
-					} else {
-						self._unwinder.resetDocument(doc);
-						out = self._unwinder.getNext();
-					}
-				} catch (ex) {
-					return cb(ex);
-				}
-
-				return cb();
-			});
-		},
-		function(err) {
-			if (err) {
-				return callback(err);
-			}
-
-			return callback(null, out);
-		}
-	);
-
 	return out;
 	return out;
 };
 };
 
 

+ 2 - 2
lib/query/ArrayRunner.js

@@ -25,7 +25,7 @@ var klass = module.exports = function ArrayRunner(array){
  * @param callback {Function}
  * @param callback {Function}
  */
  */
 proto.getNext = function getNext(callback) {
 proto.getNext = function getNext(callback) {
-	var obj, err;
+	var obj;
 	if (this._state === Runner.RunnerState.RUNNER_ADVANCED) {
 	if (this._state === Runner.RunnerState.RUNNER_ADVANCED) {
 		if (this._position < this._array.length){
 		if (this._position < this._array.length){
 			obj = this._array[this._position++];
 			obj = this._array[this._position++];
@@ -33,7 +33,7 @@ proto.getNext = function getNext(callback) {
 			this._state = Runner.RunnerState.RUNNER_EOF;
 			this._state = Runner.RunnerState.RUNNER_EOF;
 		}
 		}
 	}
 	}
-	return callback(err, obj, this._state);
+	return obj;
 };
 };
 
 
 /**
 /**