فهرست منبع

Refs #2449: sync mode; callback is now optional

Kyle P Davis 12 سال پیش
والد
کامیت
8578b8e89e
4فایلهای تغییر یافته به همراه113 افزوده شده و 52 حذف شده
  1. 4 5
      README.md
  2. 11 4
      lib/index.js
  3. 82 40
      lib/pipeline/Pipeline.js
  4. 16 3
      test/lib/aggregate.js

+ 4 - 5
README.md

@@ -108,9 +108,8 @@ Here is a list of global items that I know about that may need to be done in the
   * Make sure that all of the pure `virtual`s (i.e., `/virtual .* = 0;$/`) are implemented as a proto with a throw new Error("NOT IMPLEMENTED BY INHERITOR") or similar
   * Go through uses of `throw` and make them actually use `UserException` vs `SystemException` (or whatever they're called)
   * Currently using the `sift` package to fake the `MatchDocumentSource` class but need to actually port the real code
-  * Async support has been partially implemented but this needs to go deeper into the APIs; all layers need async capabilities (but not requirements?), and layers that require it but recieve no callback should throw an Error()
-  * Remove `reset` stuff if we're not going to use it
+  * Async support has been partially implemented but this needs to go deeper into the APIs; all layers need async capabilities (but not requirements), and layers that require it but recieve no callback should throw an Error()
   * Consider ditching `PipelineD` entirely here; might be more confusing than helpful and can still achieve the same results with ease
-  * UNWRAP ALL CLASSES SINCE THIS IS NODE AFTERALL (A BUILD STEP COULD REWRAP THEM FOR BROWSER-LAND IF NEEDED)
-  * MAKE THIS WORK IN A BROWSER -- THAT'D BE NICE
-  * $group and $group.$addToSet both use JSON.stringify for key checks but really need a deepEqual (via Document.compare) or maybe use jsonPlus (faster?) ... fix me now!
+  * Setup a browserify build step to create a browser version of this or something
+  * $group and $group.$addToSet both use JSON.stringify for key checks but really need a deepEqual (via Document.compare) or maybe use jsonplus (faster?) ... fix me now!
+  * Consider moving async stuff out of here and up to a higher level package if possible just to keep things clean and as close to the MongoDB implementations as possible

+ 11 - 4
lib/index.js

@@ -13,7 +13,7 @@
  * @module mungedb-aggregate
  * @param pipeline  {Array}  The list of pipeline document sources in JSON format
  * @param [inputs]  {Array}  Optional inputs to pass through the `docSrcs` pipeline
- * @param callback               {Function}                                 Called when done
+ * @param [callback]               {Function}                                 Optional callback if using async extensions, called when done
  * @param   callback.err           {Error}                                    The Error if one occurred
  * @param   callback.docs          {Array}                                    The resulting documents
  **/
@@ -23,7 +23,7 @@ exports = module.exports = function aggregate(pipeline, inputs, callback) {	// f
 			pipeline: pipeline
 		}, ctx),
 		aggregator = function aggregator(inputs, callback) {
-			if (!callback) throw new Error("arg `callback` is required");
+			if (!callback) callback = exports.SYNC_CALLBACK;
 			if (!inputs) return callback("arg `inputs` is required");
 
 			// rebuild the pipeline on subsequent calls
@@ -47,16 +47,23 @@ exports = module.exports = function aggregate(pipeline, inputs, callback) {	// f
 			}
 
 			// run the pipeline against the input src
-			return pipelineInst.run(src, function aggregated(err, results){
-				pipelineInst = null; // unset so that subsequent calls can rebuild the pipeline
+			var results = pipelineInst.run(src, callback === exports.SYNC_CALLBACK ? undefined : function aggregated(err, results){
 				if(err) return callback(err);
 				return callback(null, results.result);
 			});
+			pipelineInst = null; // unset so that subsequent calls can rebuild the pipeline
+			return results;
 		};
 	if(inputs) return aggregator(inputs, callback);
 	return aggregator;
 };
 
+// sync callback for aggregate if none was provided
+exports.SYNC_CALLBACK = function(err, docs){
+	if (err) throw err;
+	return docs;
+};
+
 // package-style interface; i.e., return a function underneath of the require
 exports.aggregate = exports;
 

+ 82 - 40
lib/pipeline/Pipeline.js

@@ -175,55 +175,97 @@ klass.parseCommand = function parseCommand(cmdObj, ctx){
 	return pipelineInst;
 };
 
+// sync callback for Pipeline#run if omitted
+klass.SYNC_CALLBACK = function(err, results){
+	if (err) throw err;
+	return results.result;
+};
+
+function ifError(err) {
+	if (err) throw err;
+}
+
 /**
  * Run the pipeline
  * @method run
  * @param	inputSource		{DocumentSource}	The input document source for the pipeline
- * @param	callback		{Function}			The callback function
+ * @param	[callback]		{Function}			Optional callback function if using async extensions
 **/
 proto.run = function run(inputSource, callback){
 	if (inputSource && !(inputSource instanceof DocumentSource)) throw new Error("arg `inputSource` must be an instance of DocumentSource");
-	if (!callback) throw new Error("arg `callback` required");
+	if (!callback) callback = klass.SYNC_CALLBACK;
 	var self = this;
-	inputSource.setSource(undefined, function(err){	//TODO: HACK: temp solution to the fact that we need to initialize our source since we're using setSource as a workaround for the lack of real async cursors
-		if (err) return callback(err);
-		// chain together the sources we found
+	if (callback === klass.SYNC_CALLBACK) { // SYNCHRONOUS MODE
+		inputSource.setSource(undefined, ifError);	//TODO: HACK: temp solution to the fact that we need to initialize our source since we're using setSource as a workaround for the lack of real async cursors
 		var source = inputSource;
-		async.eachSeries(
-			self.sourceVector,
-			function eachSrc(temp, next){
-				temp.setSource(source, function(err){
-					if (err) return next(err);
-					source = temp;
-					return next();
-				});
-			},
-			function doneSrcs(err){ //source is left pointing at the last source in the chain
-				if (err) return callback(err);
-				/*
-				Iterate through the resulting documents, and add them to the result.
-				We do this even if we're doing an explain, in order to capture the document counts and other stats.
-				However, we don't capture the result documents for explain.
-				*/
-				// the array in which the aggregation results reside
-				var resultArray = [];
-				try{
-					for(var hasDoc = !source.eof(); hasDoc; hasDoc = source.advance()) {
-						var document = source.getCurrent();
-						resultArray.push(document);	// add the document to the result set
-						//Commenting out this assertion for munge.  MUHAHAHA!!!
-						// object will be too large, assert. the extra 1KB is for headers
-						//if(resultArray.len() < BSONObjMaxUserSize - 1024) throw new Error("aggregation result exceeds maximum document size (" + BSONObjMaxUserSize / (1024 * 1024) + "MB); code 16389");
+		for(var i = 0, l = self.sourceVector.length; i < l; i++){
+			var temp = self.sourceVector[i];
+			temp.setSource(source, ifError);
+			source = temp;
+		}
+		/*
+		Iterate through the resulting documents, and add them to the result.
+		We do this even if we're doing an explain, in order to capture the document counts and other stats.
+		However, we don't capture the result documents for explain.
+		*/
+		var resultArray = [];
+		try{
+			for(var hasDoc = !source.eof(); hasDoc; hasDoc = source.advance()) {
+				var document = source.getCurrent();
+				resultArray.push(document);	// add the document to the result set
+				//Commenting out this assertion for munge.  MUHAHAHA!!!
+				// object will be too large, assert. the extra 1KB is for headers
+				//if(resultArray.len() < BSONObjMaxUserSize - 1024) throw new Error("aggregation result exceeds maximum document size (" + BSONObjMaxUserSize / (1024 * 1024) + "MB); code 16389");
+			}
+		} catch (err) {
+			return callback(err);
+		}
+		var result = {
+			result: resultArray
+//			,ok: true;	//not actually in here... where does this come from?
+		};
+		return callback(null, result);
+	} else {	// ASYNCHRONOUS MODE	//TODO: move this up to a higher level package?
+		return inputSource.setSource(undefined, function(err){	//TODO: HACK: temp solution to the fact that we need to initialize our source since we're using setSource as a workaround for the lack of real async cursors
+			if (err) return callback(err);
+			// chain together the sources we found
+			var source = inputSource;
+			async.eachSeries(
+				self.sourceVector,
+				function eachSrc(temp, next){
+					temp.setSource(source, function(err){
+						if (err) return next(err);
+						source = temp;
+						return next();
+					});
+				},
+				function doneSrcs(err){ //source is left pointing at the last source in the chain
+					if (err) return callback(err);
+					/*
+					Iterate through the resulting documents, and add them to the result.
+					We do this even if we're doing an explain, in order to capture the document counts and other stats.
+					However, we don't capture the result documents for explain.
+					*/
+					// the array in which the aggregation results reside
+					var resultArray = [];
+					try{
+						for(var hasDoc = !source.eof(); hasDoc; hasDoc = source.advance()) {
+							var document = source.getCurrent();
+							resultArray.push(document);	// add the document to the result set
+							//Commenting out this assertion for munge.  MUHAHAHA!!!
+							// object will be too large, assert. the extra 1KB is for headers
+							//if(resultArray.len() < BSONObjMaxUserSize - 1024) throw new Error("aggregation result exceeds maximum document size (" + BSONObjMaxUserSize / (1024 * 1024) + "MB); code 16389");
+						}
+					} catch (err) {
+						return callback(err);
 					}
-				} catch (err) {
-					return callback(err);
+					var result = {
+						result: resultArray
+	//					,ok: true;	//not actually in here... where does this come from?
+					};
+					return callback(null, result);
 				}
-				var result = {
-					result: resultArray
-//					,ok: true;	//not actually in here... where does this come from?
-				};
-				return callback(null, result);
-			}
-		);
-	});
+			);
+		});
+	}
 };

+ 16 - 3
test/lib/aggregate.js

@@ -6,18 +6,31 @@ var assert = require("assert"),
 // Utility to test the various use cases of `aggregate`
 function testAggregate(opts){
 
-	// test one-off usage
+	// SYNC: test one-off usage
+	var results = aggregate(opts.pipeline, opts.inputs);
+	assert.equal(JSON.stringify(results), JSON.stringify(opts.expected));
+
+	// SYNC: test reusable aggregator functionality
+	var aggregator = aggregate(opts.pipeline);
+	results = aggregator(opts.inputs);
+	assert.equal(JSON.stringify(results), JSON.stringify(opts.expected));
+
+	// SYNC: test that it is actually reusable
+	results = aggregator(opts.inputs); 
+	assert.equal(JSON.stringify(results), JSON.stringify(opts.expected), "Reuse of aggregator should yield the same results!");
+
+	// ASYNC: test one-off usage
 	aggregate(opts.pipeline, opts.inputs, function(err, results){
 		assert.ifError(err);
 		assert.equal(JSON.stringify(results), JSON.stringify(opts.expected));
 
-		// test reusable aggregator functionality
+		// ASYNC: test reusable aggregator functionality
 		var aggregator = aggregate(opts.pipeline);
 		aggregator(opts.inputs, function(err, results){
 			assert.ifError(err);
 			assert.equal(JSON.stringify(results), JSON.stringify(opts.expected));
 
-			// test that it is actually reusable
+			// ASYNC: test that it is actually reusable
 			aggregator(opts.inputs, function(err, results){
 				assert.ifError(err);
 				assert.equal(JSON.stringify(results), JSON.stringify(opts.expected), "Reuse of aggregator should yield the same results!");