浏览代码

EAGLESIX-5075: fix stack overflow in async due to some unintentionally deeply recursive sync code

tl;dr -- the async lib does not make you async, it allows you to be... stop putting your sync code in my async library ;-)
Kyle P Davis 10 年之前
父节点
当前提交
526ddf29fa
共有 3 个文件被更改,包括 31 次插入14 次删除
  1. 3 2
      lib/AggregationCursor.js
  2. 15 4
      lib/pipeline/Pipeline.js
  3. 13 8
      test/lib/aggregate_test.js

+ 3 - 2
lib/AggregationCursor.js

@@ -19,8 +19,9 @@ var AggregationCursor = module.exports = function(pipelineInst) {
  * @return {Array} documents (when no callback is provided)
  * @return {Array} documents (when no callback is provided)
  */
  */
 proto.toArray = function(callback) {
 proto.toArray = function(callback) {
-	var batch = [];
-	this.pipelineInst.run(function(err, doc) {
+	var batch = [],
+		isAsync = typeof callback === "function";
+	this.pipelineInst.run(isAsync, function(err, doc) {
 		if (err && callback) return callback(err), callback = undefined;
 		if (err && callback) return callback(err), callback = undefined;
 		if (err && !callback) throw err;
 		if (err && !callback) throw err;
 		if (doc === null && callback) return callback(null, batch), callback = undefined;
 		if (doc === null && callback) return callback(null, batch), callback = undefined;

+ 15 - 4
lib/pipeline/Pipeline.js

@@ -407,9 +407,12 @@ proto.stitch = function stitch() {
 /**
 /**
  * Run the pipeline
  * Run the pipeline
  * @method run
  * @method run
+ * @param [isAsync] {Boolean} whether or not to use setImmediate to force async calls (to avoid stack overflows)
  * @param callback {Function} gets called once for each document result from the pipeline
  * @param callback {Function} gets called once for each document result from the pipeline
  */
  */
-proto.run = function run(callback) {
+proto.run = function run(isAsync, callback) {
+	if (typeof isAsync === "function") callback = isAsync, isAsync = true;
+
 	// should not get here in the explain case
 	// should not get here in the explain case
 	if (this.explain) return callback(new Error("Assertion error: don't run pipeline in explain mode"));
 	if (this.explain) return callback(new Error("Assertion error: don't run pipeline in explain mode"));
 
 
@@ -419,9 +422,17 @@ proto.run = function run(callback) {
 	async.doWhilst(
 	async.doWhilst(
 		function iterator(next) {
 		function iterator(next) {
 			return finalSource.getNext(function(err, doc) {
 			return finalSource.getNext(function(err, doc) {
-				nextDoc = doc;
-				callback(err, nextDoc);
-				next(err);
+				if (isAsync) {
+					async.setImmediate(function() {
+						nextDoc = doc;
+						callback(err, nextDoc);
+						next(err);
+					});
+				} else { // sync mode; only for small sets, stack overflow on large sets
+					nextDoc = doc;
+					callback(err, nextDoc);
+					next(err);
+				}
 			});
 			});
 		},
 		},
 		function test() {
 		function test() {

+ 13 - 8
test/lib/aggregate_test.js

@@ -4,8 +4,6 @@ if (!module.parent) return require.cache[__filename] = 0, (new(require("mocha"))
 var assert = require("assert"),
 var assert = require("assert"),
 	aggregate = require("../../");
 	aggregate = require("../../");
 
 
-aggregate.cmdDefaults.batchSize = Infinity;
-
 // Utility to test the various use cases of `aggregate`
 // Utility to test the various use cases of `aggregate`
 function testAggregate(opts){
 function testAggregate(opts){
 
 
@@ -366,20 +364,27 @@ exports.aggregate = {
 		});
 		});
 	},
 	},
 
 
-	"should be able to handle a large array of inputs": function(next){
+	"should be able to handle a large array of inputs (async only, sync will crash)": function(next) {
+		this.timeout(10000);
 		var inputs = [],
 		var inputs = [],
 			expected = [];
 			expected = [];
 		for(var i = 0; i < 10000; i++){
 		for(var i = 0; i < 10000; i++){
-			inputs.push({a:i});
-			expected.push({foo:i});
+			inputs.push({a:i, b:[i,i,i,i,i]});
+			expected.push({foo:i, bar:i});
+			expected.push({foo:i, bar:i});
+			expected.push({foo:i, bar:i});
+			expected.push({foo:i, bar:i});
+			expected.push({foo:i, bar:i});
 		}
 		}
 		testAggregate({
 		testAggregate({
 			asyncOnly: true,
 			asyncOnly: true,
 			inputs: inputs,
 			inputs: inputs,
 			pipeline: [
 			pipeline: [
+				{$unwind:"$b"},
 				{$project:{
 				{$project:{
-					foo: "$a"
-				}}
+					foo: "$a",
+					bar: "$b",
+				}},
 			],
 			],
 			expected: expected,
 			expected: expected,
 			next: next
 			next: next
@@ -533,6 +538,6 @@ exports.aggregate = {
 				done();
 				done();
 			};
 			};
 		aggregate([{$limit:2}], docs).forEach(iterator, callback);
 		aggregate([{$limit:2}], docs).forEach(iterator, callback);
-	}
+	},
 
 
 };
 };