Browse Source

Refs #2138 Updated pipeline to support taking a documentsource and an empty pipeline

Adam Bell 13 years ago
parent
commit
56da245131
2 changed files with 42 additions and 37 deletions
  1. 2 1
      lib/commands/PipelineCommand.js
  2. 40 36
      lib/pipeline/Pipeline.js

+ 2 - 1
lib/commands/PipelineCommand.js

@@ -19,6 +19,7 @@ var PipelineCommand = module.exports = (function(){
     //Put these on the class so they can be overriden.
 	klass.Pipeline = require('../pipeline/Pipeline');
 	klass.PipelineD = require('../pipeline/PipelineD');
+	var DocumentSource = require("../pipeline/documentSources/DocumentSource");
 
 	/**
 	 * Execute the pipeline.
@@ -33,7 +34,7 @@ var PipelineCommand = module.exports = (function(){
 	 * @method runExecute
 	 **/
 	proto.runExecute = function runExecute(db, callback){
-		var pSource = klass.PipelineD.prepareCursorSource(this.pPipeline, db);
+		var pSource = db instanceof DocumentSource ? db : klass.PipelineD.prepareCursorSource(this.pPipeline, db);
         return this.executePipeline(this.pPipeline, pSource, callback);
 	};
 	

+ 40 - 36
lib/pipeline/Pipeline.js

@@ -156,44 +156,48 @@ var Pipeline = module.exports = (function(){
 	proto.run = function run(source, callback){
 		if(!callback)
 			throw new Error("run requires callback");
+		var self = this;
+		source.setSource(undefined, function(){	//TODO: HACK: temp solution to the fact that we need to initialize our source since we're using setSource as a workaround for the lack of real async cursors
+			async.eachSeries(self.sourceVector,
+				function(item, next){
+					console.debug("Set source on %s", source.constructor.name);
+					item.setSource(source, function(err){
+						if(err) return next(err);
+						source = item;
+						return next();
+					});
+				},
+				function(err){
+					if(err) return callback(err);
+					var result = {};
+					/* source is left pointing at the last source in the chain */
 
-		async.eachSeries(this.sourceVector, function(item, next){
-				item.setSource(source, function(err){
-					if(err) return next(err);
-					source = item;
-					return next();
-				});
-			},
-			function(err){
-				if(err) return callback(err);
-				var result = {};
-				/* source is left pointing at the last source in the chain */
-
-				/*
-				Iterate through the resulting documents, and add them to the result.
-				We do this even if we're doing an explain, in order to capture the document counts and other stats.
-				However, we don't capture the result documents for explain.
-				*/
-				// the array in which the aggregation results reside
-				// cant use subArrayStart() due to error handling
-				var resultArray = [];
-				for(var hasDoc = !source.eof(); hasDoc; hasDoc = source.advance()) {
-					var document = source.getCurrent();
-					/* add the document to the result set */
-					resultArray.push(document);
-					
-					//Commenting out this assertion for munge.  MUHAHAHA!!!
-					
-					// object will be too large, assert. the extra 1KB is for headers
-		//			uassert(16389,
-		//					str::stream() << "aggregation result exceeds maximum document size (" << BSONObjMaxUserSize / (1024 * 1024) << "MB)",
-		//					resultArray.len() < BSONObjMaxUserSize - 1024);
+					/*
+					Iterate through the resulting documents, and add them to the result.
+					We do this even if we're doing an explain, in order to capture the document counts and other stats.
+					However, we don't capture the result documents for explain.
+					*/
+					// the array in which the aggregation results reside
+					// cant use subArrayStart() due to error handling
+					var resultArray = [];
+					for(var hasDoc = !source.eof(); hasDoc; hasDoc = source.advance()) {
+						var document = source.getCurrent();
+						/* add the document to the result set */
+						resultArray.push(document);
+						
+						//Commenting out this assertion for munge.  MUHAHAHA!!!
+						
+						// object will be too large, assert. the extra 1KB is for headers
+			//			uassert(16389,
+			//					str::stream() << "aggregation result exceeds maximum document size (" << BSONObjMaxUserSize / (1024 * 1024) << "MB)",
+			//					resultArray.len() < BSONObjMaxUserSize - 1024);
+					}
+					result.result = resultArray;
+					result.ok = true;
+					return callback(null, result);
 				}
-				result.result = resultArray;
-				result.ok = true;
-				return callback(null, result);
-			}
-		);
+			);
+		});
 	};
 	
 	return klass;