Browse Source

EAGLESIX-2656: fix runner reset too early in CursorDocumentSource

Kyle P Davis 11 years ago
parent
commit
52586cbc5c
1 changed files with 32 additions and 28 deletions
  1. 32 28
      lib/pipeline/documentSources/CursorDocumentSource.js

+ 32 - 28
lib/pipeline/documentSources/CursorDocumentSource.js

@@ -83,55 +83,59 @@ proto._loadBatch = function _loadBatch(callback) {
 	this._runner.restoreState();
 
 	var self = this,
-		whileBreak = false,		// since we are in an async loop instead of a normal while loop, need to mimic the
-		whileReturn = false;	// functionality.  These flags are similar to saying 'break' or 'return' from inside the loop
+		whileShouldBreak = false, // mimic while loop break in async land
+		whileShouldReturn = false; // mimic while loop return in async land
 	return async.whilst(
 		function test(){
-			return !whileBreak && !whileReturn;
+			return !whileShouldBreak && !whileShouldReturn;
 		},
 		function(next) {
 			return self._runner.getNext(function(err, obj, state){
 				if (err) return next(err);
-				if (state === Runner.RunnerState.RUNNER_ADVANCED) {
-					if (self._dependencies) {
-						self._currentBatch.push(self._dependencies.extractFields(obj));
-					} else {
-						self._currentBatch.push(obj);
-					}
 
-					if (self._limit) {
-						if (++self._docsAddedToBatches === self._limit.getLimit()) {
-							whileBreak = true;
-							return next();
-						}
-						//this was originally a 'verify' in the mongo code
-						if (self._docsAddedToBatches > self._limit.getLimit()){
-							return next(new Error("documents collected past the end of the limit"));
-						}
-					}
+				//NOTE: DEVIATION FROM MONGO: they check state in the loop condition we check it inside (due to async stuff)
+				if (state !== Runner.RunnerState.RUNNER_ADVANCED) return whileShouldBreak = true, next();
 
-					if (self._currentBatch.length >= klass.MaxDocumentsToReturnToClientAtOnce) {
-						// End self batch and prepare Runner for yielding.
-						self._runner.saveState();
-						whileReturn = true;
-					}
+				if (self._dependencies) {
+					self._currentBatch.push(self._dependencies.extractFields(obj));
 				} else {
-					whileBreak = true;
+					self._currentBatch.push(obj);
+				}
+
+				if (self._limit) {
+					if (++self._docsAddedToBatches === self._limit.getLimit()) {
+						return whileShouldBreak = true, next();
+					}
+					if (self._docsAddedToBatches > self._limit.getLimit()) return next(new Error("Assertion failure: end of limit"));
 				}
+
+				var memUsageDocs = self._currentBatch.length;
+				if (memUsageDocs >= klass.MaxDocumentsToReturnToClientAtOnce) {
+					// End self batch and prepare Runner for yielding.
+					self._runner.saveState();
+					return whileShouldReturn = true, next();
+				}
+
 				return next();
 			});
 		},
 		function(err){
-			if (!whileReturn) {
-				self._runner = undefined;
+			if (whileShouldReturn){
+				return setImmediate(function() {
+					callback(err);
+				});
 			}
 
+			// If we got here, there won't be any more documents, so destroy the runner. Can't use
+			// dispose since we want to keep the _currentBatch.
+			self._runner = undefined;
+
 			//NOTE: DEVIATION FROM MONGO: to ensure that the callstack does not get too large if the Runner does things syncronously
 			if (self._firstRun || !self._currentBatch.length){
 				self._firstRun = false;
 				callback(err);
 			} else {
-				setTimeout(function(){
+				return setImmediate(function(){
 					callback(err);
 				});
 			}