浏览代码

Fixes #2085 Added async to DocumentSource.setSource and Pipline.run. Modified a test case for Aggregate.js that should have never worked in the first place

Adam Bell 12 年之前
父节点
当前提交
08866b7d7d

+ 6 - 5
lib/Aggregator.js

@@ -15,11 +15,12 @@ var Aggregator = module.exports = (function(){
 	proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
 
 	// PROTOTYPE MEMBERS
-	proto.execute = function execute(inputs){
-		var result = {};
-		result.ok = this.pipeline.run(inputs, result);
-		//return result;	//TODO: figure out if we want mongo style result or a simpler one.
-		return result.result;
+	proto.execute = function execute(inputs, callback){
+		this.pipeline.run(inputs,function(err, result){
+			//return result;	//TODO: figuere out if we want mongo style result or a simpler one.
+			return callback(err, result);
+		});
+
 	};
 
 	//Expose these so that mungedb-aggregate can be extended.

+ 7 - 7
lib/commands/PipelineCommand.js

@@ -23,28 +23,28 @@ var PipelineCommand = module.exports = (function(){
 	 * Execute the pipeline.
 	 * @method executePipeline
 	 **/
-	proto.executePipeline = function executePipeline(result, pPipeline, pSource){
-		return pPipeline.run(result, pSource);
+	proto.executePipeline = function executePipeline(pPipeline, pSource, callback){
+		return pPipeline.run(pSource, callback);
 	};
 	
 	/**
 	 * Documents are retrieved until the cursor is exhausted (or another termination condition occurs).
 	 * @method runExecute
 	 **/
-	proto.runExecute = function runExecute(result, db){
+	proto.runExecute = function runExecute(db, callback){
 		var pSource = PipelineD.prepareCursorSource(this.pPipeline, db);
-        return this.executePipeline(result, this.pPipeline, pSource);
+        return this.executePipeline(this.pPipeline, pSource, callback);
 	};
 	
 	/**
 	 * run the command
 	 * @method run
 	 **/
-	proto.run = function run(db, result){
+	proto.run = function run(db, callback){
 		if (!this.pPipeline){
-			return false;
+			return callback(new Error('Pipeline not defined'));
 		}
-        return this.runExecute(result, db);
+        this.runExecute(db, callback);
 	};
 
 	return klass;

+ 3 - 3
lib/index.js

@@ -3,10 +3,10 @@ var Aggregator = require("./Aggregator");
 
 module.exports = (function(){
 	// functional-style interface
-	function aggregate(ops, inputs) {
+	function aggregate(ops, inputs, callback) {
 		var aggregator = new Aggregator(ops);
-		if(inputs)
-			return aggregator.execute(inputs);
+		if(inputs && callback)
+			return aggregator.execute(inputs, callback);
 		return aggregator.execute.bind(aggregator);
 	}
 	// package-style interface

+ 42 - 31
lib/pipeline/Pipeline.js

@@ -1,4 +1,5 @@
 "use strict";
+var async = require("async");
 var Pipeline = module.exports = (function(){
 	// CONSTRUCTOR
 	/**
@@ -37,7 +38,7 @@ var Pipeline = module.exports = (function(){
 	 * @param	{Object} cmdObj the command object sent from the client
 	 * @returns	{Array}	the pipeline, if created, otherwise a NULL reference
 	 **/
-	klass.parseCommand = function parseCommand(cmdObj){
+	klass.parseCommand = function parseCommand(cmdObj, callback){
 		var pipelineInstance = new Pipeline(),
 			pipeline = cmdObj;//munge: skipping the command parsing since all we care about is the pipeline
 		
@@ -137,6 +138,7 @@ var Pipeline = module.exports = (function(){
 			iter.optimize();
 		}
 
+
 		return pipelineInstance;
 	};
 
@@ -147,38 +149,47 @@ var Pipeline = module.exports = (function(){
 	 * @param	{Object}	result	the results of running the pipeline will be stored on this object
 	 * @param	{CursorDocumentSource}	source	the primary document source of the data
 	**/
-	proto.run = function run(result, source){
-		for(var i = 0, l = this.sourceVector.length; i<l; i++) {
-			var temp = this.sourceVector[i];
-			temp.setSource(source);
-			source = temp;
-		}
-		/* source is left pointing at the last source in the chain */
+	proto.run = function run(source, callback){
+		if(!callback)
+			throw new Error("run requires callback");
 
-		/*
-		Iterate through the resulting documents, and add them to the result.
-		We do this even if we're doing an explain, in order to capture the document counts and other stats.
-		However, we don't capture the result documents for explain.
-		*/
-		// the array in which the aggregation results reside
-		// cant use subArrayStart() due to error handling
-		var resultArray = [];
-		for(var hasDoc = !source.eof(); hasDoc; hasDoc = source.advance()) {
-			var document = source.getCurrent();
-			/* add the document to the result set */
-			resultArray.push(document);
-			
-			//Commenting out this assertion for munge.  MUHAHAHA!!!
-			
-			// object will be too large, assert. the extra 1KB is for headers
-//			uassert(16389,
-//					str::stream() << "aggregation result exceeds maximum document size (" << BSONObjMaxUserSize / (1024 * 1024) << "MB)",
-//					resultArray.len() < BSONObjMaxUserSize - 1024);
-		}
+		async.eachSeries(this.sourceVector, function(item, callback){
+				item.setSource(source, function(err, pSource){
+					if(err) return callback(err);
+					source = item;
+					return callback();
+				});
+			},
+			function(err){
+				if(err) return callback(err);
+				var result = {};
+				/* source is left pointing at the last source in the chain */
 
-		result.result = resultArray;
-		
-		return true;
+				/*
+				Iterate through the resulting documents, and add them to the result.
+				We do this even if we're doing an explain, in order to capture the document counts and other stats.
+				However, we don't capture the result documents for explain.
+				*/
+				// the array in which the aggregation results reside
+				// cant use subArrayStart() due to error handling
+				var resultArray = [];
+				for(var hasDoc = !source.eof(); hasDoc; hasDoc = source.advance()) {
+					var document = source.getCurrent();
+					/* add the document to the result set */
+					resultArray.push(document);
+					
+					//Commenting out this assertion for munge.  MUHAHAHA!!!
+					
+					// object will be too large, assert. the extra 1KB is for headers
+		//			uassert(16389,
+		//					str::stream() << "aggregation result exceeds maximum document size (" << BSONObjMaxUserSize / (1024 * 1024) << "MB)",
+		//					resultArray.len() < BSONObjMaxUserSize - 1024);
+				}
+				result.result = resultArray;
+				result.ok = true;
+				return callback(null, result);
+			}
+		);
 	};
 	
 	return klass;

+ 3 - 1
lib/pipeline/documentSources/DocumentSource.js

@@ -145,11 +145,13 @@ public:
      * @method	setSource
      * @param	{DocumentSource}	pSource	the underlying source to use
     **/
-    proto.setSource = function setSource(pTheSource) {
+    proto.setSource = function setSource(pTheSource, callback) {
 		if(this.pSource){
 			throw new Error("It is an error to set the source more than once");
 		}
         this.pSource = pTheSource;
+        if(callback)
+            return callback(null, this.pSource);
     };
     
     

+ 2 - 1
package.json

@@ -24,7 +24,8 @@
   "dependencies": {
     "stream-utils": "*",
     "es6-shim": "*",
-    "sift": "*"
+    "sift": "*",
+    "async":"*"
   },
   "devDependencies": {
     "mocha": "*",

+ 171 - 69
test/lib/aggregate.js

@@ -6,67 +6,117 @@ module.exports = {
 
 	"aggregate": {
 
-		"should be able to use an empty pipeline (no-op)": function(){
+		"should be able to use an empty pipeline (no-op)": function(next){
 			var i = [1, 2, 3],
 				p = [],
 				e = [1, 2, 3],
-				aggregater = aggregate(p),
-				a = aggregater(i);
-			assert.equal(JSON.stringify(a), JSON.stringify(e), "Unexpected value!");
-			assert.deepEqual(a, e, "Unexpected value (not deepEqual)!");
-			assert.equal(JSON.stringify(aggregater(i)), JSON.stringify(e), "Reuse of aggregater should yield the same results!");
-			assert.equal(JSON.stringify(aggregate(p, i)), JSON.stringify(e), "Alternate use of aggregate should yield the same results!");
+				aggregater = aggregate(p);	
+
+			aggregater(i, function(err, results){
+				var a = results.result;
+				assert.equal(JSON.stringify(a), JSON.stringify(e), "Unexpected value!");
+				assert.deepEqual(a, e, "Unexpected value (not deepEqual)!");
+
+				aggregater(i, function(err, results){
+					assert.equal(JSON.stringify(results.result), JSON.stringify(e), "Reuse of aggregater should yield the same results!");
+
+					aggregate(p, i, function(err, results){
+						assert.equal(JSON.stringify(results.result), JSON.stringify(e), "Alternate use of aggregate should yield the same results!");
+						next();
+					});
+				});
+			});
 		},
 
 
-		"should be able to use a $limit operator": function(){
+		"should be able to use a $limit operator": function(next){
 			var i = [{_id:0}, {_id:1}, {_id:2}, {_id:3}, {_id:4}, {_id:5}],
 				p = [{$limit:2}],
 				e = [{_id:0}, {_id:1}],
-				aggregater = aggregate(p),
-				a = aggregater(i);
-			assert.equal(JSON.stringify(a), JSON.stringify(e), "Unexpected value!");
-			assert.deepEqual(a, e, "Unexpected value (not deepEqual)!");
-			assert.equal(JSON.stringify(aggregater(i)), JSON.stringify(e), "Reuse of aggregater should yield the same results!");
-			assert.equal(JSON.stringify(aggregate(p, i)), JSON.stringify(e), "Alternate use of aggregate should yield the same results!");
+				aggregater = aggregate(p);	
+
+			aggregater(i, function(err, results){
+				var a = results.result;
+				assert.equal(JSON.stringify(a), JSON.stringify(e), "Unexpected value!");
+				assert.deepEqual(a, e, "Unexpected value (not deepEqual)!");
+
+				aggregater(i, function(err, results){
+					assert.equal(JSON.stringify(results.result), JSON.stringify(e), "Reuse of aggregater should yield the same results!");
+
+					aggregate(p, i, function(err, results){
+						assert.equal(JSON.stringify(results.result), JSON.stringify(e), "Alternate use of aggregate should yield the same results!");
+						next();
+					});
+				});
+			});
 		},
 
-		"should be able to use a $match operator": function(){
+		"should be able to use a $match operator": function(next){
 			var i = [{_id:0, e:1}, {_id:1, e:0}, {_id:2, e:1}, {_id:3, e:0}, {_id:4, e:1}, {_id:5, e:0}],
 				p = [{$match:{e:1}}],
 				e = [{_id:0, e:1}, {_id:2, e:1}, {_id:4, e:1}],
-				aggregater = aggregate(p),
-				a = aggregater(i);
-			assert.equal(JSON.stringify(a), JSON.stringify(e), "Unexpected value!");
-			assert.deepEqual(a, e, "Unexpected value (not deepEqual)!");
-			assert.equal(JSON.stringify(aggregater(i)), JSON.stringify(e), "Reuse of aggregater should yield the same results!");
-			assert.equal(JSON.stringify(aggregate(p, i)), JSON.stringify(e), "Alternate use of aggregate should yield the same results!");
+				aggregater = aggregate(p);	
+
+			aggregater(i, function(err, results){
+				var a = results.result;
+				assert.equal(JSON.stringify(a), JSON.stringify(e), "Unexpected value!");
+				assert.deepEqual(a, e, "Unexpected value (not deepEqual)!");
+
+				aggregater(i, function(err, results){
+					assert.equal(JSON.stringify(results.result), JSON.stringify(e), "Reuse of aggregater should yield the same results!");
+
+					aggregate(p, i, function(err, results){
+						assert.equal(JSON.stringify(results.result), JSON.stringify(e), "Alternate use of aggregate should yield the same results!");
+						next();
+					});
+				});
+			});
 		},
 		
-		"should be able to use a $skip operator": function(){
+		"should be able to use a $skip operator": function(next){
 			var i = [{_id:0}, {_id:1}, {_id:2}, {_id:3}, {_id:4}, {_id:5}],
 				p = [{$skip:2}, {$skip:1}],	//testing w/ 2 ensures independent state variables
 				e = [{_id:3}, {_id:4}, {_id:5}],
-				aggregater = aggregate(p),
-				a = aggregater(i);
-			assert.equal(JSON.stringify(a), JSON.stringify(e), "Unexpected value!");
-			assert.deepEqual(a, e, "Unexpected value (not deepEqual)!");
-			assert.equal(JSON.stringify(aggregater(i)), JSON.stringify(e), "Reuse of aggregater should yield the same results!");
-			assert.equal(JSON.stringify(aggregate(p, i)), JSON.stringify(e), "Alternate use of aggregate should yield the same results!");
+				aggregater = aggregate(p);	
+
+			aggregater(i, function(err, results){
+				var a = results.result;
+				assert.equal(JSON.stringify(a), JSON.stringify(e), "Unexpected value!");
+				assert.deepEqual(a, e, "Unexpected value (not deepEqual)!");
+
+				aggregater(i, function(err, results){
+					assert.equal(JSON.stringify(results.result), JSON.stringify(e), "Reuse of aggregater should yield the same results!");
+
+					aggregate(p, i, function(err, results){
+						assert.equal(JSON.stringify(results.result), JSON.stringify(e), "Alternate use of aggregate should yield the same results!");
+						next();
+					});
+				});
+			});
 		},
-		"should be able to use a $skip and then a $limit operator together in the same pipeline": function(){
+		"should be able to use a $skip and then a $limit operator together in the same pipeline": function(next){
 			var i = [{_id:0, e:1}, {_id:1, e:0}, {_id:2, e:1}, {_id:3, e:0}, {_id:4, e:1}, {_id:5, e:0}],
 				p = [{$skip:2}, {$limit:1}],
 				e = [{_id:2, e:1}],
-				aggregater = aggregate(p),
-				a = aggregater(i);
-			assert.equal(JSON.stringify(a), JSON.stringify(e), "Unexpected value!");
-			assert.deepEqual(a, e, "Unexpected value (not deepEqual)!");
-			assert.equal(JSON.stringify(aggregater(i)), JSON.stringify(e), "Reuse of aggregater should yield the same results!");
-			assert.equal(JSON.stringify(aggregate(p, i)), JSON.stringify(e), "Alternate use of aggregate should yield the same results!");
+				aggregater = aggregate(p);	
+
+			aggregater(i, function(err, results){
+				var a = results.result;
+				assert.equal(JSON.stringify(a), JSON.stringify(e), "Unexpected value!");
+				assert.deepEqual(a, e, "Unexpected value (not deepEqual)!");
+
+				aggregater(i, function(err, results){
+					assert.equal(JSON.stringify(results.result), JSON.stringify(e), "Reuse of aggregater should yield the same results!");
+
+					aggregate(p, i, function(err, results){
+						assert.equal(JSON.stringify(results.result), JSON.stringify(e), "Alternate use of aggregate should yield the same results!");
+						next();
+					});
+				});
+			});
 		},
 
-		"should be able to construct an instance with $unwind operators properly": function(){
+		"should be able to construct an instance with $unwind operators properly": function(next){
 			var i = [
 					{_id:0, nodes:[
 						{one:[11], two:[2,2]},
@@ -84,16 +134,26 @@ module.exports = {
 					{_id:0,nodes:{one:[1,1],two:22}},
 					{_id:1,nodes:{two:22,three:[333]}}
 				],
-				aggregater = aggregate(p),
-				a = aggregater(i);
-			assert.equal(JSON.stringify(a), JSON.stringify(e), "Unexpected value!");
-			assert.deepEqual(a, e, "Unexpected value (not deepEqual)!");
-			assert.equal(JSON.stringify(aggregater(i)), JSON.stringify(e), "Reuse of aggregater should yield the same results!");
-			assert.equal(JSON.stringify(aggregate(p, i)), JSON.stringify(e), "Alternate use of aggregate should yield the same results!");
+				aggregater = aggregate(p);	
+
+			aggregater(i, function(err, results){
+				var a = results.result;
+				assert.equal(JSON.stringify(a), JSON.stringify(e), "Unexpected value!");
+				assert.deepEqual(a, e, "Unexpected value (not deepEqual)!");
+
+				aggregater(i, function(err, results){
+					assert.equal(JSON.stringify(results.result), JSON.stringify(e), "Reuse of aggregater should yield the same results!");
+
+					aggregate(p, i, function(err, results){
+						assert.equal(JSON.stringify(results.result), JSON.stringify(e), "Alternate use of aggregate should yield the same results!");
+						next();
+					});
+				});
+			});
 		},
 
 
-		"should be able to use a $project operator": function(){
+		"should be able to use a $project operator": function(next){
 			var i = [{_id:0, e:1, f:23}, {_id:2, e:2, g:34}, {_id:4, e:3}],
 				p = [{$project:{
 						e:1, 
@@ -101,16 +161,28 @@ module.exports = {
 						b:{$cond:[{$eq:["$e", 2]}, "two", "not two"]}
 						//TODO: high level test of all other expression operators
 					}}],
-				e = [{_id:0, e:1, b:"not two", a:2}, {_id:2, e:2, b:"two", a:4}, {_id:4, e:3, b:"not two", a:6}],
-				aggregater = aggregate(p),
-				a = aggregater(i);
-			assert.deepEqual(a, e, "Unexpected value (not deepEqual)!");
-			assert.deepEqual(aggregater(i), e, "Reuse of aggregater should yield the same results!");
-			assert.deepEqual(aggregate(p, i), e, "Alternate use of aggregate should yield the same results!");
+				e = [{_id:0, e:1, a:2, b:"not two"}, {_id:2, e:2, a:4, b:"two"}, {_id:4, e:3, a:6, b:"not two"}],
+				aggregater = aggregate(p);	
+
+
+			aggregater(i, function(err, results){
+				var a = results.result;
+				assert.equal(JSON.stringify(a), JSON.stringify(e), "Unexpected value!");
+				assert.deepEqual(a, e, "Unexpected value (not deepEqual)!");
+
+				aggregater(i, function(err, results){
+					assert.equal(JSON.stringify(results.result), JSON.stringify(e), "Reuse of aggregater should yield the same results!");
+
+					aggregate(p, i, function(err, results){
+						assert.equal(JSON.stringify(results.result), JSON.stringify(e), "Alternate use of aggregate should yield the same results!");
+						next();
+					});
+				});
+			});
 		},
 		
 		
-		"should be able to use a $project operator to exclude the _id field": function(){
+		"should be able to use a $project operator to exclude the _id field": function(next){
 			var i = [{_id:0, e:1, f:23}, {_id:2, e:2, g:34}, {_id:4, e:3}],
 				p = [{$project:{
 						_id:0,
@@ -118,14 +190,25 @@ module.exports = {
 						//TODO: high level test of all other expression operators
 					}}],
 				e = [{e:1}, {e:2}, {e:3}],
-				aggregater = aggregate(p),
-				a = aggregater(i);
-			assert.deepEqual(a, e, "Unexpected value (not deepEqual)!");
-			assert.deepEqual(aggregater(i), e, "Reuse of aggregater should yield the same results!");
-			assert.deepEqual(aggregate(p, i), e, "Alternate use of aggregate should yield the same results!");
+				aggregater = aggregate(p);	
+
+			aggregater(i, function(err, results){
+				var a = results.result;
+				assert.equal(JSON.stringify(a), JSON.stringify(e), "Unexpected value!");
+				assert.deepEqual(a, e, "Unexpected value (not deepEqual)!");
+
+				aggregater(i, function(err, results){
+					assert.equal(JSON.stringify(results.result), JSON.stringify(e), "Reuse of aggregater should yield the same results!");
+
+					aggregate(p, i, function(err, results){
+						assert.equal(JSON.stringify(results.result), JSON.stringify(e), "Alternate use of aggregate should yield the same results!");
+						next();
+					});
+				});
+			});
 		},
 
-		"should be able to construct an instance with $sort operators properly (ascending)": function(){
+		"should be able to construct an instance with $sort operators properly (ascending)": function(next){
 			var i = [
 						{_id:3.14159}, {_id:-273.15},
 						{_id:42}, {_id:11}, {_id:1},
@@ -136,14 +219,23 @@ module.exports = {
 						{_id:null}, {_id:NaN},
 						{_id:-273.15}, {_id:1}, {_id:3.14159}, {_id:11}, {_id:42}
 					],
-				aggregater = aggregate(p),
-				a = aggregater(i);
-			assert.equal(JSON.stringify(a), JSON.stringify(e), "Unexpected value!");
-			//assert.deepEqual(a, e); //does not work with NaN
-			assert.equal(JSON.stringify(aggregater(i)), JSON.stringify(e), "Reuse of aggregater should yield the same results!");
-			assert.equal(JSON.stringify(aggregate(p, i)), JSON.stringify(e), "Alternate use of aggregate should yield the same results!");
+				aggregater = aggregate(p);	
+
+			aggregater(i, function(err, results){
+				var a = results.result;
+				assert.equal(JSON.stringify(a), JSON.stringify(e), "Unexpected value!");
+
+				aggregater(i, function(err, results){
+					assert.equal(JSON.stringify(results.result), JSON.stringify(e), "Reuse of aggregater should yield the same results!");
+
+					aggregate(p, i, function(err, results){
+						assert.equal(JSON.stringify(results.result), JSON.stringify(e), "Alternate use of aggregate should yield the same results!");
+						next();
+					});
+				});
+			});
 		},
-		"should be able to construct an instance with $group operators properly": function(){
+		"should be able to construct an instance with $group operators properly": function(next){
 			var i = [
 						{_id:0, a:1},
 						{_id:0, a:2},
@@ -191,12 +283,22 @@ module.exports = {
 							push_b:["a", "b", "b", "c"]
 						}
 					],
-				aggregater = aggregate(p),
-				a = aggregater(i);
-			assert.equal(JSON.stringify(a), JSON.stringify(e), "Unexpected value!");
-			//assert.deepEqual(a, e); //does not work with NaN
-			assert.equal(JSON.stringify(aggregater(i)), JSON.stringify(e), "Reuse of aggregater should yield the same results!");
-			assert.equal(JSON.stringify(aggregate(p, i)), JSON.stringify(e), "Alternate use of aggregate should yield the same results!");
+				aggregater = aggregate(p);	
+
+			aggregater(i, function(err, results){
+				var a = results.result;
+				assert.equal(JSON.stringify(a), JSON.stringify(e), "Unexpected value!");
+				assert.deepEqual(a, e, "Unexpected value (not deepEqual)!");
+
+				aggregater(i, function(err, results){
+					assert.equal(JSON.stringify(results.result), JSON.stringify(e), "Reuse of aggregater should yield the same results!");
+
+					aggregate(p, i, function(err, results){
+						assert.equal(JSON.stringify(results.result), JSON.stringify(e), "Alternate use of aggregate should yield the same results!");
+						next();
+					});
+				});
+			});
 		}
 
 	}

+ 10 - 5
test/lib/pipeline/Pipeline.js

@@ -102,15 +102,20 @@ module.exports = {
 		"#run": {
 			"should set the parent source for all sources in the pipeline except the first one":function(){
 				var p = Pipeline.parseCommand([{$test:{coalesce:false}}, {$test:{coalesce:false}}, {$test:{coalesce:false}}]);
-				p.run({}, []);
-				assert.equal(p.sourceVector[1].pSource, p.sourceVector[0]);
-				assert.equal(p.sourceVector[2].pSource, p.sourceVector[1]);
+				p.run({}, [], function(err, result){
+					assert.equal(p.sourceVector[1].pSource, p.sourceVector[0]);
+					assert.equal(p.sourceVector[2].pSource, p.sourceVector[1]);
+				});
+
 			},
 			"should iterate through sources and return resultant array":function(){
 				var p = Pipeline.parseCommand([{$test:{coalesce:false}}, {$test:{coalesce:false}}, {$test:{coalesce:false}}]),
 					result = {};
-				p.run(result, []);
-				assert.deepEqual(result.result, [5,4,3,2,1,0]);//see the test source for why this should be so
+				p.run(result, [], function(err, result){
+
+					assert.deepEqual(result.result, [5,4,3,2,1,0]);//see the test source for why this should be so
+				});
+
 			}
 		}