| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271 | 
							- "use strict";
 
- var async = require("async");
 
- /**
 
-  * mongodb "commands" (sent via db.$cmd.findOne(...)) subclass to make a command.  define a singleton object for it.
 
-  * @class Pipeline
 
-  * @namespace mungedb-aggregate.pipeline
 
-  * @module mungedb-aggregate
 
-  * @constructor
 
-  **/
 
- // CONSTRUCTOR
 
- var Pipeline = module.exports = function Pipeline(theCtx){
 
- 	this.collectionName = null;
 
- 	this.sourceVector = null;
 
- 	this.explain = false;
 
- 	this.splitMongodPipeline = false;
 
- 	this.ctx = theCtx;
 
- }, klass = Pipeline, base = Object, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
 
- var DocumentSource = require("./documentSources/DocumentSource"),
 
- 	LimitDocumentSource = require('./documentSources/LimitDocumentSource'),
 
- 	MatchDocumentSource = require('./documentSources/MatchDocumentSource'),
 
- 	ProjectDocumentSource = require('./documentSources/ProjectDocumentSource'),
 
- 	SkipDocumentSource = require('./documentSources/SkipDocumentSource'),
 
- 	UnwindDocumentSource = require('./documentSources/UnwindDocumentSource'),
 
- 	GroupDocumentSource = require('./documentSources/GroupDocumentSource'),
 
- 	SortDocumentSource = require('./documentSources/SortDocumentSource');
 
- klass.COMMAND_NAME = "aggregate";
 
- klass.PIPELINE_NAME = "pipeline";
 
- klass.EXPLAIN_NAME = "explain";
 
- klass.FROM_ROUTER_NAME = "fromRouter";
 
- klass.SPLIT_MONGOD_PIPELINE_NAME = "splitMongodPipeline";
 
- klass.SERVER_PIPELINE_NAME = "serverPipeline";
 
- klass.MONGOS_PIPELINE_NAME = "mongosPipeline";
 
- klass.stageDesc = {};//attaching this to the class for test cases
 
- klass.stageDesc[LimitDocumentSource.limitName] = LimitDocumentSource.createFromJson;
 
- klass.stageDesc[MatchDocumentSource.matchName] = MatchDocumentSource.createFromJson;
 
- klass.stageDesc[ProjectDocumentSource.projectName] = ProjectDocumentSource.createFromJson;
 
- klass.stageDesc[SkipDocumentSource.skipName] = SkipDocumentSource.createFromJson;
 
- klass.stageDesc[UnwindDocumentSource.unwindName] = UnwindDocumentSource.createFromJson;
 
- klass.stageDesc[GroupDocumentSource.groupName] = GroupDocumentSource.createFromJson;
 
- klass.stageDesc[SortDocumentSource.sortName] = SortDocumentSource.createFromJson;
 
- /**
 
-  * Create an `Array` of `DocumentSource`s from the given JSON pipeline
 
-  * // NOTE: DEVIATION FROM MONGO: split out into a separate function to better allow extensions (was in parseCommand)
 
-  * @static
 
-  * @method parseDocumentSources
 
-  * @param pipeline  {Array}  The JSON pipeline
 
-  * @returns {Array}  The parsed `DocumentSource`s
 
-  **/
 
- klass.parseDocumentSources = function parseDocumentSources(pipeline, ctx){
 
- 	var sourceVector = [];
 
- 	for (var nSteps = pipeline.length, iStep = 0; iStep < nSteps; ++iStep) {
 
- 		// pull out the pipeline element as an object
 
- 		var pipeElement = pipeline[iStep];
 
- 		if (!(pipeElement instanceof Object)) throw new Error("pipeline element " + iStep + " is not an object; code 15942");
 
- 		var obj = pipeElement;
 
- 		// Parse a pipeline stage from 'obj'.
 
- 		if (Object.keys(obj).length !== 1) throw new Error("A pipeline stage specification object must contain exactly one field; code 16435");
 
- 		var stageName = Object.keys(obj)[0],
 
- 			stageSpec = obj[stageName];
 
- 		// Create a DocumentSource pipeline stage from 'stageSpec'.
 
- 		var desc = klass.stageDesc[stageName];
 
- 		if (!desc) throw new Error("Unrecognized pipeline stage name: '" + stageName + "'; code 16435");
 
- 		// Parse the stage
 
- 		var stage = desc(stageSpec, ctx);
 
- 		if (!stage) throw new Error("Stage must not be undefined!");
 
- 		stage.setPipelineStep(iStep);
 
- 		sourceVector.push(stage);
 
- 	}
 
- 	return sourceVector;
 
- };
 
- /**
 
-  * Create a pipeline from the command.
 
-  * @static
 
-  * @method parseCommand
 
-  * @param cmdObj  {Object}  The command object sent from the client
 
-  * @param   cmdObj.aggregate            {Array}    the thing to aggregate against;	// NOTE: DEVIATION FROM MONGO: expects an Array of inputs rather than a collection name
 
-  * @param   cmdObj.pipeline             {Object}   the JSON pipeline of `DocumentSource` specs
 
-  * @param   cmdObj.explain              {Boolean}  should explain?
 
-  * @param   cmdObj.fromRouter           {Boolean}  is from router?
 
-  * @param   cmdObj.splitMongodPipeline	{Boolean}  should split?
 
-  * @param ctx     {Object}  Not used yet in mungedb-aggregate
 
-  * @returns	{Array}	the pipeline, if created, otherwise a NULL reference
 
-  **/
 
- klass.parseCommand = function parseCommand(cmdObj, ctx){
 
- 	var pipelineNamespace = require("./"),
 
- 		Pipeline = pipelineNamespace.Pipeline,	// using require in case Pipeline gets replaced with an extension
 
- 		pipelineInst = new Pipeline(ctx);
 
- 	//gather the specification for the aggregation
 
- 	var pipeline;
 
- 	for(var fieldName in cmdObj){
 
- 		var cmdElement = cmdObj[fieldName];
 
- 		if(fieldName == klass.COMMAND_NAME)						pipelineInst.collectionName = cmdElement;		//look for the aggregation command
 
- 		else if(fieldName == klass.PIPELINE_NAME)				pipeline = cmdElement;							//check for the pipeline of JSON doc srcs
 
- 		else if(fieldName == klass.EXPLAIN_NAME)				pipelineInst.explain = cmdElement;				//check for explain option
 
- 		else if(fieldName == klass.FROM_ROUTER_NAME)			pipelineInst.fromRouter = cmdElement;			//if the request came from the router, we're in a shard
 
- 		else if(fieldName == klass.SPLIT_MONGOD_PIPELINE_NAME)	pipelineInst.splitMongodPipeline = cmdElement;	//check for debug options
 
- 		// NOTE: DEVIATION FROM MONGO: Not implementing: "Ignore $auth information sent along with the command. The authentication system will use it, it's not a part of the pipeline."
 
- 		else throw new Error("unrecognized field " + JSON.stringify(fieldName));
 
- 	}
 
- 	/**
 
- 	 * If we get here, we've harvested the fields we expect for a pipeline
 
- 	 * Set up the specified document source pipeline.
 
- 	 **/
 
- 	// NOTE: DEVIATION FROM MONGO: split this into a separate function to simplify and better allow for extensions (now in parseDocumentSources)
 
- 	var sourceVector = pipelineInst.sourceVector = Pipeline.parseDocumentSources(pipeline, ctx);
 
- 	/* if there aren't any pipeline stages, there's nothing more to do */
 
- 	if (!sourceVector.length) return pipelineInst;
 
- 	/* Move filters up where possible.
 
- 	CW TODO -- move filter past projections where possible, and noting corresponding field renaming.
 
- 	*/
 
- 	/*
 
- 	Wherever there is a match immediately following a sort, swap them.
 
- 	This means we sort fewer items.  Neither changes the documents in the stream, so this transformation shouldn't affect the result.
 
- 	We do this first, because then when we coalesce operators below, any adjacent matches will be combined.
 
- 	*/
 
- 	for(var srcn = sourceVector.length, srci = 1; srci < srcn; ++srci) {
 
- 		var source = sourceVector[srci];
 
- 		if (source instanceof MatchDocumentSource) {
 
- 			var previous = sourceVector[srci - 1];
 
- 			if (previous instanceof SortDocumentSource) {
 
- 				/* swap this item with the previous */
 
- 				sourceVector[srci - 1] = source;
 
- 				sourceVector[srci] = previous;
 
- 			}
 
- 		}
 
- 	}
 
- 	/*
 
- 	Coalesce adjacent filters where possible.  Two adjacent filters are equivalent to one filter whose predicate is the conjunction of the two original filters' predicates.
 
- 	For now, capture this by giving any DocumentSource the option to absorb it's successor; this will also allow adjacent projections to coalesce when possible.
 
- 	Run through the DocumentSources, and give each one the opportunity to coalesce with its successor.  If successful, remove the successor.
 
- 	Move all document sources to a temporary list.
 
- 	*/
 
- 	var tempVector = sourceVector.slice(0);
 
- 	sourceVector.length = 0;
 
- 	// move the first one to the final list
 
- 	sourceVector.push(tempVector[0]);
 
- 	// run through the sources, coalescing them or keeping them
 
- 	for(var tempn = tempVector.length, tempi = 1; tempi < tempn; ++tempi) {
 
- 		/*
 
- 		If we can't coalesce the source with the last, then move it to the final list, and make it the new last.
 
- 		(If we succeeded, then we're still on the same last, and there's no need to move or do anything with the source -- the destruction of tempVector will take care of the rest.)
 
- 		*/
 
- 		var lastSource = sourceVector[sourceVector.length - 1],
 
- 			temp = tempVector[tempi];
 
- 		if (!temp || !lastSource) throw new Error("null document sources found");
 
- 		if (!lastSource.coalesce(temp)){
 
- 			sourceVector.push(temp);
 
- 		}
 
- 	}
 
- 	// optimize the elements in the pipeline
 
- 	for(var i = 0, l = sourceVector.length; i<l; i++) {
 
- 		var iter = sourceVector[i];
 
- 		if (!iter) throw new Error("Pipeline received empty document as argument");
 
- 		iter.optimize();
 
- 	}
 
- 	return pipelineInst;
 
- };
 
- // sync callback for Pipeline#run if omitted
 
- klass.SYNC_CALLBACK = function(err, results){
 
- 	if (err) throw err;
 
- 	return results.result;
 
- };
 
- function ifError(err) {
 
- 	if (err) throw err;
 
- }
 
- /**
 
-  * Run the pipeline
 
-  * @method run
 
-  * @param	inputSource		{DocumentSource}	The input document source for the pipeline
 
-  * @param	[callback]		{Function}			Optional callback function if using async extensions
 
- **/
 
- proto.run = function run(inputSource, callback){
 
- 	if (inputSource && !(inputSource instanceof DocumentSource)) throw new Error("arg `inputSource` must be an instance of DocumentSource");
 
- 	if (!callback) callback = klass.SYNC_CALLBACK;
 
- 	var self = this;
 
- 	if (callback === klass.SYNC_CALLBACK) { // SYNCHRONOUS MODE
 
- 		inputSource.setSource(undefined, ifError);	//TODO: HACK: temp solution to the fact that we need to initialize our source since we're using setSource as a workaround for the lack of real async cursors
 
- 		var source = inputSource;
 
- 		for(var i = 0, l = self.sourceVector.length; i < l; i++){
 
- 			var temp = self.sourceVector[i];
 
- 			temp.setSource(source, ifError);
 
- 			source = temp;
 
- 		}
 
- 		/*
 
- 		Iterate through the resulting documents, and add them to the result.
 
- 		We do this even if we're doing an explain, in order to capture the document counts and other stats.
 
- 		However, we don't capture the result documents for explain.
 
- 		*/
 
- 		var resultArray = [];
 
- 		try{
 
- 			for(var hasDoc = !source.eof(); hasDoc; hasDoc = source.advance()) {
 
- 				var document = source.getCurrent();
 
- 				resultArray.push(document);	// add the document to the result set
 
- 				//Commenting out this assertion for munge.  MUHAHAHA!!!
 
- 				// object will be too large, assert. the extra 1KB is for headers
 
- 				//if(resultArray.len() < BSONObjMaxUserSize - 1024) throw new Error("aggregation result exceeds maximum document size (" + BSONObjMaxUserSize / (1024 * 1024) + "MB); code 16389");
 
- 			}
 
- 		} catch (err) {
 
- 			return callback(err);
 
- 		}
 
- 		var result = {
 
- 			result: resultArray
 
- //			,ok: true;	//not actually in here... where does this come from?
 
- 		};
 
- 		return callback(null, result);
 
- 	} else {	// ASYNCHRONOUS MODE	//TODO: move this up to a higher level package?
 
- 		return inputSource.setSource(undefined, function(err){	//TODO: HACK: temp solution to the fact that we need to initialize our source since we're using setSource as a workaround for the lack of real async cursors
 
- 			if (err) return callback(err);
 
- 			// chain together the sources we found
 
- 			var source = inputSource;
 
- 			async.eachSeries(
 
- 				self.sourceVector,
 
- 				function eachSrc(temp, next){
 
- 					temp.setSource(source, function(err){
 
- 						if (err) return next(err);
 
- 						source = temp;
 
- 						return next();
 
- 					});
 
- 				},
 
- 				function doneSrcs(err){ //source is left pointing at the last source in the chain
 
- 					if (err) return callback(err);
 
- 					/*
 
- 					Iterate through the resulting documents, and add them to the result.
 
- 					We do this even if we're doing an explain, in order to capture the document counts and other stats.
 
- 					However, we don't capture the result documents for explain.
 
- 					*/
 
- 					// the array in which the aggregation results reside
 
- 					var resultArray = [];
 
- 					try{
 
- 						for(var hasDoc = !source.eof(); hasDoc; hasDoc = source.advance()) {
 
- 							var document = source.getCurrent();
 
- 							resultArray.push(document);	// add the document to the result set
 
- 							//Commenting out this assertion for munge.  MUHAHAHA!!!
 
- 							// object will be too large, assert. the extra 1KB is for headers
 
- 							//if(resultArray.len() < BSONObjMaxUserSize - 1024) throw new Error("aggregation result exceeds maximum document size (" + BSONObjMaxUserSize / (1024 * 1024) + "MB); code 16389");
 
- 						}
 
- 					} catch (err) {
 
- 						return callback(err);
 
- 					}
 
- 					var result = {
 
- 						result: resultArray
 
- 	//					,ok: true;	//not actually in here... where does this come from?
 
- 					};
 
- 					return callback(null, result);
 
- 				}
 
- 			);
 
- 		});
 
- 	}
 
- };
 
 
  |