Просмотр исходного кода

EAGLESIX-812: initial stab at updating top level API

Phil Murray 11 лет назад
Родитель
Сommit
bd73dd30ed
3 измененных файлов с 83 добавлено и 42 удалено
  1. 78 42
      lib/index.js
  2. 2 0
      lib/pipeline/Pipeline.js
  3. 3 0
      test/lib/aggregate.js

+ 78 - 42
lib/index.js

@@ -1,5 +1,4 @@
 "use strict";
 "use strict";
-
 /**
 /**
  * Used to aggregate `inputs` using a MongoDB-style `pipeline`
  * Used to aggregate `inputs` using a MongoDB-style `pipeline`
  *
  *
@@ -18,52 +17,85 @@
  * @param   callback.err           {Error}                                    The Error if one occurred
  * @param   callback.err           {Error}                                    The Error if one occurred
  * @param   callback.docs          {Array}                                    The resulting documents
  * @param   callback.docs          {Array}                                    The resulting documents
  **/
  **/
-exports = module.exports = function aggregate(pipeline, ctx, inputs, callback) {	// function-style interface; i.e., return the utility function directly as the require
+exports = module.exports = function aggregate(cmdObject, ctx, inputs, callback) {	// function-style interface; i.e., return the utility function directly as the require
 	var DocumentSource = exports.pipeline.documentSources.DocumentSource;
 	var DocumentSource = exports.pipeline.documentSources.DocumentSource;
 	if (ctx instanceof Array || ctx instanceof DocumentSource) callback = inputs, inputs = ctx, ctx = {};
 	if (ctx instanceof Array || ctx instanceof DocumentSource) callback = inputs, inputs = ctx, ctx = {};
-	var pipelineInst = exports.pipeline.Pipeline.parseCommand({
-			pipeline: pipeline
-		}, ctx),
-		aggregator = function aggregator(ctx, inputs, callback) {
+	var pipelineInst;
+	
+	try {
+		//Set up the command Object
+		cmdObject = (cmdObject instanceof Array) ? {pipeline: cmdObject} : cmdObject;
+		if (!cmdObject instanceof Object) throw new Error("cmdObject must be either an Object or an Array");
+		for (var key in exports.cmdDefaults){
+			if (exports.cmdDefaults.hasOwnProperty(key) && cmdObject[key] === undefined){
+				cmdObject[key] = exports.cmdDefaults[key];
+			}
+		}
+		pipelineInst = exports.pipeline.Pipeline.parseCommand(cmdObject, ctx);
+	} catch(ex) {
+		// Error handling is funky since this can be used multiple different ways
+		if (callback){
+			if (inputs) return callback(ex);
+			else {
+				return function aggregator(ctx, inputs, callback) {
+					if (ctx instanceof Array || ctx instanceof DocumentSource) callback = inputs, inputs = ctx, ctx = {};
+					return callback(ex);
+				};
+			}
+		} else {
+			throw ex;
+		}
+	}
+	
+	if (cmdObject.explain){
+		return pipelineInst.writeExplainOps();
+	}
+	
+	var aggregator = function aggregator(ctx, inputs, callback) {
 			if (ctx instanceof Array || ctx instanceof DocumentSource) callback = inputs, inputs = ctx, ctx = {};
 			if (ctx instanceof Array || ctx instanceof DocumentSource) callback = inputs, inputs = ctx, ctx = {};
-			if (!callback) callback = exports.SYNC_CALLBACK;
-			if (!inputs) return callback("arg `inputs` is required");
-
-			// rebuild the pipeline on subsequent calls
-			if (!pipelineInst) {
-				pipelineInst = exports.pipeline.Pipeline.parseCommand({
-					pipeline: pipeline
-				}, ctx);
+			var batchSize = cmdObject.batchSize;
+			if (!callback) {
+				batchSize = Infinity;
+				callback = exports.SYNC_CALLBACK;
 			}
 			}
-
-			// use or build input src
-			var src;
-			if(inputs instanceof DocumentSource){
-				src = inputs;
-			}else{
-				try{
-					ctx.ns = inputs;	//NOTE: use the given `inputs` directly; hacking so that the cursor source will be our inputs instead of the context namespace
-					src = exports.pipeline.PipelineD.prepareCursorSource(pipelineInst, ctx);
-				}catch(err){
-					return callback(err);
+			if (!inputs) return callback("arg `inputs` is required");
+			
+			try {
+				// rebuild the pipeline on subsequent calls
+				if (!pipelineInst) {
+					pipelineInst = exports.pipeline.Pipeline.parseCommand(cmdObject, ctx);
 				}
 				}
-			}
+				ctx.ns = inputs;	//NOTE: use the given `inputs` directly; hacking so that the cursor source will be our inputs instead of the context namespace
+				exports.pipeline.PipelineD.prepareCursorSource(pipelineInst, ctx);
 
 
-			var runCallback;
-			if (!callback) {
-				runCallback = exports.SYNC_CALLBACK;
-				pipelineInst.SYNC_MODE = true;
-			} else {
-				runCallback = function aggregated(err, results){
-				if(err) return callback(err);
-				return callback(null, results.result);
-		};
+				// run the pipeline against
+				pipelineInst.stitch();
+			} catch(err) {
+				return callback(err);
 			}
 			}
 
 
-			// run the pipeline against
-			pipelineInst.stitch();
-			var results = pipelineInst.run(runCallback);
-			return results ? results.result : undefined;
+			var batch = [];
+			var runCallback = function aggregated(err, document){
+				if (!callback) return;//make sure the callback doesn't get called anymore after an error
+				if(err) {
+					pipelineInst = null;
+					callback(err);
+					callback = undefined;
+					return;
+				}
+				if (document === null){
+					pipelineInst = null;
+					callback(null, batch);
+					callback = undefined;
+					return;
+				}
+				batch.push(document);
+				if (batch.length >= batchSize){
+					return callback(null, batch);
+				}
+			};
+			pipelineInst.run(runCallback);
+			return batch;
 		};
 		};
 	if(inputs) return aggregator(ctx, inputs, callback);
 	if(inputs) return aggregator(ctx, inputs, callback);
 	return aggregator;
 	return aggregator;
@@ -75,17 +107,21 @@ exports.SYNC_CALLBACK = function(err, docs){
 	return docs;
 	return docs;
 };
 };
 
 
+exports.cmdDefaults = {
+	batchSize: 150,
+	explain: false
+};
+
 // package-style interface; i.e., return a function underneath of the require
 // package-style interface; i.e., return a function underneath of the require
 exports.aggregate = exports;
 exports.aggregate = exports;
 
 
 //Expose these so that mungedb-aggregate can be extended.
 //Expose these so that mungedb-aggregate can be extended.
-exports.Cursor = require("./Cursor");
 exports.pipeline = require("./pipeline/");
 exports.pipeline = require("./pipeline/");
 exports.query = require("./query/");
 exports.query = require("./query/");
 
 
 // version info
 // version info
-exports.version = "r2.5.4";
-exports.gitVersion = "ffd52e5f46cf2ba74ba931c78da62d4a7f480d8e";
+exports.version = "r2.6.5";
+exports.gitVersion = "e99d4fcb4279c0279796f237aa92fe3b64560bf6";
 
 
 // error code constants
 // error code constants
 exports.ERRORS = require('./Errors.js');
 exports.ERRORS = require('./Errors.js');

+ 2 - 0
lib/pipeline/Pipeline.js

@@ -34,6 +34,7 @@ klass.EXPLAIN_NAME = "explain";
 klass.FROM_ROUTER_NAME = "fromRouter";
 klass.FROM_ROUTER_NAME = "fromRouter";
 klass.SERVER_PIPELINE_NAME = "serverPipeline";
 klass.SERVER_PIPELINE_NAME = "serverPipeline";
 klass.MONGOS_PIPELINE_NAME = "mongosPipeline";
 klass.MONGOS_PIPELINE_NAME = "mongosPipeline";
+klass.BATCH_SIZE_NAME = "batchSize";
 
 
 klass.stageDesc = {};//attaching this to the class for test cases
 klass.stageDesc = {};//attaching this to the class for test cases
 klass.stageDesc[GeoNearDocumentSource.geoNearName] = GeoNearDocumentSource.createFromJson;
 klass.stageDesc[GeoNearDocumentSource.geoNearName] = GeoNearDocumentSource.createFromJson;
@@ -335,6 +336,7 @@ klass.parseCommand = function parseCommand(cmdObj, ctx){
 		if(fieldName[0] == "$")									continue;
 		if(fieldName[0] == "$")									continue;
 		else if(fieldName == "cursor")							continue;
 		else if(fieldName == "cursor")							continue;
 		else if(fieldName == klass.COMMAND_NAME)				continue;										//look for the aggregation command
 		else if(fieldName == klass.COMMAND_NAME)				continue;										//look for the aggregation command
+		else if(fieldName == klass.BATCH_SIZE_NAME)				continue;
 		else if(fieldName == klass.PIPELINE_NAME)				pipeline = cmdElement;							//check for the pipeline of JSON doc srcs
 		else if(fieldName == klass.PIPELINE_NAME)				pipeline = cmdElement;							//check for the pipeline of JSON doc srcs
 		else if(fieldName == klass.EXPLAIN_NAME)				pipelineInst.explain = cmdElement;				//check for explain option
 		else if(fieldName == klass.EXPLAIN_NAME)				pipelineInst.explain = cmdElement;				//check for explain option
 		else if(fieldName == klass.FROM_ROUTER_NAME)			ctx.inShard = cmdElement;						//if the request came from the router, we're in a shard
 		else if(fieldName == klass.FROM_ROUTER_NAME)			ctx.inShard = cmdElement;						//if the request came from the router, we're in a shard

+ 3 - 0
test/lib/aggregate.js

@@ -74,6 +74,7 @@ module.exports = {
 	"aggregate": {
 	"aggregate": {
 
 
 		"should be able to use an empty pipeline (no-op)": function(next){
 		"should be able to use an empty pipeline (no-op)": function(next){
+			debugger;
 			testAggregate({
 			testAggregate({
 				inputs: [1, 2, 3],
 				inputs: [1, 2, 3],
 				pipeline: [],
 				pipeline: [],
@@ -209,6 +210,8 @@ module.exports = {
 
 
 		"should be able to construct an instance with $group operators properly": function(next){
 		"should be able to construct an instance with $group operators properly": function(next){
 			// NOTE: Test case broken until expression is fixed
 			// NOTE: Test case broken until expression is fixed
+			
+			debugger;
 			testAggregate({
 			testAggregate({
 				inputs: [
 				inputs: [
 					{_id:0, a:1},
 					{_id:0, a:1},