Browse Source

Merge branch 'feature/mongo_2.6.5_documentSource_PipelineD' of https://github.com/RiveraGroup/mungedb-aggregate into feature/mongo_2.6.5_documentSource_PipelineD

Jason Walton 11 năm trước cách đây
mục cha
commit
995b9d552c
2 tập tin đã thay đổi với 40 bổ sung82 xóa
  1. 22 60
      lib/pipeline/Pipeline.js
  2. 18 22
      test/lib/pipeline/Pipeline.js

+ 22 - 60
lib/pipeline/Pipeline.js

@@ -1,5 +1,5 @@
 "use strict";
-
+var async = require('async');
 /**
  * mongodb "commands" (sent via db.$cmd.findOne(...)) subclass to make a command.  define a singleton object for it.
  * @class Pipeline
@@ -13,7 +13,6 @@ var Pipeline = module.exports = function Pipeline(theCtx){
 	this.explain = false;
 	this.splitMongodPipeline = false;
 	this.ctx = theCtx;
-	this.SYNC_MODE = false;
 }, klass = Pipeline, base = Object, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
 
 var DocumentSource = require("./documentSources/DocumentSource"),
@@ -360,12 +359,6 @@ klass.parseCommand = function parseCommand(cmdObj, ctx){
 	return pipelineInst;
 };
 
-// sync callback for Pipeline#run if omitted
-klass.SYNC_CALLBACK = function(err, results){
-	if (err) throw err;
-	return results.result;
-};
-
 function ifError(err) {
 	if (err) throw err;
 }
@@ -434,20 +427,31 @@ proto.stitch = function stitch() {
 /**
  * Run the pipeline
  * @method run
- * @param callback {Function} Optional. Run the pipeline in async mode; callback(err, result)
- * @return result {Object} The result of executing the pipeline
+ * @param callback {Function} gets called once for each document result from the pipeline
  */
 proto.run = function run(callback) {
 	// should not get here in the explain case
 	if(this.explain) throw new Error("Should not be running a pipeline in explain mode!");
-
-	/* NOTE: DEVIATION FROM MONGO SOURCE. WE'RE SUPPORTING SYNC AND ASYNC */
-	if(this.SYNC_MODE) {
-		callback();
-		return this._runSync();
-	} else {
-		return this._runAsync(callback);
-	}
+	
+	var doc = null,
+		error = null,
+		finalSource = this._getFinalSource();
+	
+	async.doWhilst(
+		function iterator(next){
+			return finalSource.getNext(function (err, obj){
+				callback(err, obj);
+				doc = obj;
+				error = err;
+				next();
+			});
+		},
+		function test(){
+			return doc !== null && !error;
+		},
+		function done(err){
+			//nothing to do here
+		});
 };
 
 /**
@@ -460,48 +464,6 @@ proto._getFinalSource = function _getFinalSource() {
 	return this.sources[this.sources.length - 1];
 };
 
-/**
- * Run the pipeline synchronously
- * @method _runSync
- * @return {Object}		The results object {result:resultArray}
- * @private
- */
-proto._runSync = function _runSync(callback) {
-	var resultArray = [],
-		finalSource = this._getFinalSource(),
-		handleErr = function(err) {
-			if(err) throw err;
-		},
-		next;
-	while((next = finalSource.getNext(handleErr)) !== DocumentSource.EOF) {
-		resultArray.push(next);
-	}
-	return {result:resultArray};
-};
-
-/**
- * Run the pipeline asynchronously
- * @method _runAsync
- * @param callback {Function} callback(err, resultObject)
- * @private
- */
-proto._runAsync = function _runAsync(callback) {
-	var resultArray = [],
-		finalSource = this._getFinalSource(),
-		gotNext = function(err, doc) {
-			if(err) return callback(err);
-			if(doc !== DocumentSource.EOF) {
-				resultArray.push(doc);
-				return setImmediate(function() { //setImmediate to avoid callstack size issues
-					finalSource.getNext(gotNext);
-				});
-			} else {
-				return callback(null, {result:resultArray});
-			}
-		};
-	finalSource.getNext(gotNext);
-};
-
 /**
  * Get the pipeline explanation
  * @method writeExplainOps

+ 18 - 22
test/lib/pipeline/Pipeline.js

@@ -62,7 +62,7 @@ module.exports = {
 				};
 
 				proto.getNext = function(callback){
-					var answer = this.current > 0 ? {val:this.current--} : DocumentSource.EOF,
+					var answer = this.current > 0 ? {val:this.current--} : null,
 						err = null;
 
 					if (!this.works)
@@ -179,32 +179,28 @@ module.exports = {
 			}
 		},
 
-		"#_runSync": {
+		"#run": {
 
-			"should iterate through sources and return resultant array": function () {
+			"should iterate through sources and return resultant array": function (done) {
 				var p = Pipeline.parseCommand({pipeline:[{$test:{coalesce:false}}, {$test:{coalesce:false}}, {$test:{coalesce:false}}]}),
-					results = p.run(function(err, results) {
-						assert.deepEqual(results.result, [ { val: 5 }, { val: 4 }, { val: 3 }, { val: 2 }, { val: 1 } ]);
+					results = [];
+				p.run(function(err, doc) {
+					if (err) throw err;
+					if (!doc){
+						assert.deepEqual(results, [ { val: 5 }, { val: 4 }, { val: 3 }, { val: 2 }, { val: 1 } ]);
+						done();
+					} else {
+						results.push(doc);
+					}
 				});
 			},
-
-			"should catch parse errors": function () {
-				// The $foo part is invalid and causes a throw.
-				assert.throws(function () {
-					Pipeline.parseCommand({pipeline: [
-						{$foo: {bar: "baz"}}
-					]});
+			"should handle sources that return errors": function (done) {
+				var p = Pipeline.parseCommand({pipeline:[{$test:{works:false}}]}),
+					results = [];
+				p.run(function(err, doc) {
+					assert(err);
+					done();
 				});
-			},
-
-		},
-
-		"#_runAsync": {
-			"should iterate through sources and return resultant array asynchronously": function () {
-				var p = Pipeline.parseCommand({pipeline:[{$test:{coalesce:false}}, {$test:{coalesce:false}}, {$test:{coalesce:false}}]}),
-					results = p.run(function(err, results) {
-						assert.deepEqual(results.result, [ { val: 5 }, { val: 4 }, { val: 3 }, { val: 2 }, { val: 1 } ]);
-					});
 			}
 		},