Browse Source

EAGLESIX-3505 Add AggregationCursor

Chris Sexton 11 years ago
parent
commit
5119a43f50
3 changed files with 40 additions and 98 deletions
  1. 25 0
      lib/AggregationCursor.js
  2. 7 32
      lib/index.js
  3. 8 66
      test/lib/aggregate_test.js

+ 25 - 0
lib/AggregationCursor.js

@@ -0,0 +1,25 @@
+var AggregationCursor = module.exports = function(pipelineInst) {
+	this.pipelineInst = pipelineInst;
+}, klass = AggregationCursor, proto = klass.prototype;
+
+proto.toArray = function(callback) {
+	var batch = [];
+	this.pipelineInst.run(function(err, doc) {
+		if (err && callback) return callback(err), callback = undefined;
+		if (err && !callback) throw err;
+		if (doc === null && callback) return callback(null, batch), callback = undefined;
+		else if (doc !== null) batch.push(doc);
+	});
+	if (!callback) return batch;
+};
+
+proto.forEach = function(iterator, callback) {
+	this.pipelineInst.run(function(err, doc) {
+		if (err || doc === null) return callback(err);
+		iterator(doc);
+	});
+};
+
+proto.each = function(callback) {
+	this.pipelineInst.run(callback);
+};

+ 7 - 32
lib/index.js

@@ -1,4 +1,5 @@
 "use strict";
+var AggregationCursor = require("./AggregationCursor");
 /**
  * Used to aggregate `inputs` using a MongoDB-style `pipeline`
  *
@@ -57,12 +58,6 @@ exports = module.exports = function aggregate(pipelineObj, ctx, inputs, callback
 
 	var aggregator = function aggregator(ctx, inputs, callback) {
 			if (ctx instanceof Array || ctx instanceof DocumentSource) callback = inputs, inputs = ctx, ctx = {};
-			var batchSize = pipelineObj.batchSize;
-
-			if (!callback) {
-				batchSize = Infinity;
-				callback = exports.SYNC_CALLBACK;
-			}
 			if (!inputs) return callback("arg `inputs` is required");
 
 			try {
@@ -77,37 +72,17 @@ exports = module.exports = function aggregate(pipelineObj, ctx, inputs, callback
 				return callback(err);
 			}
 
-			var batch = [];
-			pipelineInst.run(function aggregated(err, doc){
-				pipelineInst = null;
-				if (!callback) return;
-				if (err) return callback(err), callback = undefined;
-				if (doc === null) {
-					callback(null, batch);
-					if (batchSize !== Infinity) {
-						callback(null, null); //this is to tell the caller that that we are done aggregating
-					}
-					callback = undefined; //we are officially done. make sure the callback doesn't get called anymore
-					return;
-				}
-				batch.push(doc);
-				if (batch.length >= batchSize) {
-					callback(null, batch);
-					batch = [];
-					return;
-				}
-			});
-			return batch;
+			var tmpInst= pipelineInst;
+			pipelineInst = null;
+
+			if (!callback) return new AggregationCursor(tmpInst);
+			else return new AggregationCursor(tmpInst).toArray(callback);
 		};
 	if (inputs) return aggregator(ctx, inputs, callback);
 	return aggregator;
 };
 
-// sync callback for aggregate if none was provided
-exports.SYNC_CALLBACK = function(err, docs){
-	if (err) throw err;
-	return docs;
-};
+exports.AggregationCursor = AggregationCursor;
 
 exports.cmdDefaults = {
 	batchSize: 150,

+ 8 - 66
test/lib/aggregate_test.js

@@ -11,25 +11,25 @@ function testAggregate(opts){
 
 	if (!opts.asyncOnly){
 		// SYNC: test one-off usage
-		var results = aggregate(opts.pipeline, opts.inputs);
+		var results = aggregate(opts.pipeline, opts.inputs).toArray();
 		assert.equal(JSON.stringify(results), JSON.stringify(opts.expected));
 
 		// SYNC: test one-off usage with context
-		results = aggregate(opts.pipeline, {hi: "there"}, opts.inputs);
+		results = aggregate(opts.pipeline, {hi: "there"}, opts.inputs).toArray();
 		assert.equal(JSON.stringify(results), JSON.stringify(opts.expected));
 
 		// SYNC: test use with context
 		var aggregator = aggregate(opts.pipeline, {hi: "there"});
-		results = aggregator(opts.inputs);
+		results = aggregator(opts.inputs).toArray();
 		assert.equal(JSON.stringify(results), JSON.stringify(opts.expected));
 
 		// SYNC: test reusable aggregator functionality
 		aggregator = aggregate(opts.pipeline);
-		results = aggregator(opts.inputs);
+		results = aggregator(opts.inputs).toArray();
 		assert.equal(JSON.stringify(results), JSON.stringify(opts.expected));
 
 		// SYNC: test that it is actually reusable
-		results = aggregator(opts.inputs);
+		results = aggregator(opts.inputs).toArray();
 		assert.equal(JSON.stringify(results), JSON.stringify(opts.expected), "should allow sync aggregator reuse");
 	}
 	// ASYNC: test one-off usage
@@ -72,40 +72,6 @@ function testAggregate(opts){
 	});
 }
 
-function testBatches(opts){
-	var inputs = [],
-		actual = [],
-		eachExpected = [],
-		expected = [];
-
-	for(var i = 0; i < opts.documents; i++){
-		inputs.push({a:i});
-		eachExpected.push({foo:i});
-		if (eachExpected.length % opts.batchSize === 0){
-			expected.push(eachExpected);
-			eachExpected = [];
-		}
-	}
-	expected.push(eachExpected);
-	aggregate({
-			batchSize:opts.batchSize,
-			pipeline: [
-			{$project:{
-				foo: "$a"
-			}}
-		]},
-		inputs,
-		function(err, results){
-			assert.ifError(err);
-			if (results) {
-				actual.push(results);
-			} else {
-				assert.deepEqual(actual, expected);
-				opts.next();
-			}
-		});
-}
-
 exports.aggregate = {
 
 	"should be able to use an empty pipeline (no-op)": function(next){
@@ -420,30 +386,6 @@ exports.aggregate = {
 		});
 	},
 
-	"should be able to handle a small arrays in batches": function(next){
-		testBatches({
-			documents: 5,
-			batchSize: 100,
-			next: next
-		});
-	},
-
-	"should be able to handle an array equal to the batch size": function(next){
-		testBatches({
-			documents: 100,
-			batchSize: 100,
-			next: next
-		});
-	},
-
-	"should be able to handle a large array in batches": function(next){
-		testBatches({
-			documents: 10000,
-			batchSize: 100,
-			next: next
-		});
-	},
-
 	"should be able to explain an empty pipeline": function(){
 		var pipeline = [],
 			expected = [],
@@ -540,15 +482,15 @@ exports.aggregate = {
 
 	"should throw pipeline errors if called sync-ly": function(){
 		assert.throws(function(){
-			aggregate([{"$project":{"sum":{"$add":["$foo", "$bar"]}}}], [{"foo":1, "bar":"baz"}]);
+			aggregate([{"$project":{"sum":{"$add":["$foo", "$bar"]}}}], [{"foo":1, "bar":"baz"}]).toArray();
 		});
 
 		var agg = aggregate([{"$project":{"sum":{"$add":["$foo", "$bar"]}}}]);
 		assert.throws(function(){
-			agg([{"foo":1, "bar":"baz"}]);
+			agg([{"foo":1, "bar":"baz"}]).toArray();
 		});
 		assert.doesNotThrow(function(){
-			agg([{"foo":1, "bar":2}]);
+			agg([{"foo":1, "bar":2}]).toArray();
 		});
 	},