| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184 | 
							- "use strict";
 
- var Pipeline = module.exports = (function(){
 
- 	// CONSTRUCTOR
 
- 	/**
 
- 	 * mongodb "commands" (sent via db.$cmd.findOne(...)) subclass to make a command.  define a singleton object for it.
 
- 	 * @class Pipeline
 
- 	 * @namespace mungedb.aggregate.pipeline
 
- 	 * @module mungedb-aggregate
 
- 	 * @constructor
 
- 	 **/
 
- 	var klass = function Pipeline(/*theCtx*/){
 
- 		this.sourceVector = [];//should be provate?
 
- 	}, base = Object, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
 
- 	
 
- 	var LimitDocumentSource = require('./documentSources/LimitDocumentSource'),
 
- 		MatchDocumentSource = require('./documentSources/MatchDocumentSource'),
 
- 		ProjectDocumentSource = require('./documentSources/ProjectDocumentSource'),
 
- 		SkipDocumentSource = require('./documentSources/SkipDocumentSource'),
 
- 		UnwindDocumentSource = require('./documentSources/UnwindDocumentSource'),
 
- 		GroupDocumentSource = require('./documentSources/GroupDocumentSource'),
 
- 		SortDocumentSource = require('./documentSources/SortDocumentSource'),
 
- 		SplitDocumentSource = require('./documentSources/SplitDocumentSource');
 
- 	
 
- 	klass.StageDesc = {};//attaching this to the class for test cases
 
- 	klass.StageDesc[LimitDocumentSource.limitName] = LimitDocumentSource.createFromJson;
 
- 	klass.StageDesc[MatchDocumentSource.matchName] = MatchDocumentSource.createFromJson;
 
- 	klass.StageDesc[ProjectDocumentSource.projectName] = ProjectDocumentSource.createFromJson;
 
- 	klass.StageDesc[SkipDocumentSource.skipName] = SkipDocumentSource.createFromJson;
 
- 	klass.StageDesc[UnwindDocumentSource.unwindName] = UnwindDocumentSource.createFromJson;
 
- 	klass.StageDesc[GroupDocumentSource.groupName] = GroupDocumentSource.createFromJson;
 
- 	klass.StageDesc[SortDocumentSource.sortName] = SortDocumentSource.createFromJson;
 
- 	klass.StageDesc[SplitDocumentSource.splitName] = SplitDocumentSource.createFromJson;
 
- 	
 
- 	/**
 
- 	 * Create a pipeline from the command.
 
- 	 *
 
- 	 * @static
 
- 	 * @method parseCommand
 
- 	 * @param	{Object} cmdObj the command object sent from the client
 
- 	 * @returns	{Array}	the pipeline, if created, otherwise a NULL reference
 
- 	 **/
 
- 	klass.parseCommand = function parseCommand(cmdObj){
 
- 		var pipelineInstance = new Pipeline(),
 
- 			pipeline = cmdObj;//munge: skipping the command parsing since all we care about is the pipeline
 
- 		
 
- 		var sourceVector = pipelineInstance.sourceVector,
 
- 			nSteps = pipeline.length;
 
- 		for( var iStep = 0; iStep<nSteps; ++iStep){
 
- 			/* pull out the pipeline element as an object */
 
- 			var pipeElement = pipeline[iStep];
 
- 			if (!(pipeElement instanceof Object)){
 
- 				throw new Error("pipeline element " + iStep + " is not an object; code 15942" );
 
- 			}
 
- 			
 
- 			// Parse a pipeline stage from 'obj'.
 
- 			var obj = pipeElement;
 
- 			if (Object.keys(obj).length !== 1){
 
- 				throw new Error("A pipeline stage specification object must contain exactly one field; code 16435" );
 
- 			}
 
- 			// Create a DocumentSource pipeline stage from 'stageSpec'.
 
- 			var stageName = Object.keys(obj)[0],
 
- 				stageSpec = obj[stageName],
 
- 				desc = klass.StageDesc[stageName];
 
- 				
 
- 			if (!desc){
 
- 				throw new Error("Unrecognized pipeline stage name: '" + stageName + "'; code 16435" );
 
- 			}
 
- 			
 
- 			var stage = desc(stageSpec);
 
- 			//verify(stage);
 
- 			stage.setPipelineStep(iStep);
 
- 			sourceVector.push(stage);
 
- 		}
 
- 		
 
- 		/* if there aren't any pipeline stages, there's nothing more to do */
 
- 		if (!sourceVector.length){
 
- 			return pipelineInstance;
 
- 		}
 
- 		
 
- 		/* Move filters up where possible.
 
- 		CW TODO -- move filter past projections where possible, and noting corresponding field renaming.
 
- 		*/
 
- 		/*
 
- 		Wherever there is a match immediately following a sort, swap them.
 
- 		This means we sort fewer items.  Neither changes the documents in the stream, so this transformation shouldn't affect the result.
 
- 		We do this first, because then when we coalesce operators below, any adjacent matches will be combined.
 
- 		*/
 
- 		for(var srcn = sourceVector.length, srci = 1; srci < srcn; ++srci) {
 
- 			var source = sourceVector[srci];
 
- 			if (source.constructor === MatchDocumentSource) {
 
- 				var previous = sourceVector[srci - 1];
 
- 				if (previous.constructor === klass.SortDocumentSource) { //TODO: remove 'sort.' once sort is implemented!!!
 
- 					/* swap this item with the previous */
 
- 					sourceVector[srci - 1] = source;
 
- 					sourceVector[srci] = previous;
 
- 				}
 
- 			}
 
- 		}
 
- 		
 
- 		/*
 
- 		Coalesce adjacent filters where possible.  Two adjacent filters are equivalent to one filter whose predicate is the conjunction of the two original filters' predicates.
 
- 		For now, capture this by giving any DocumentSource the option to absorb it's successor; this will also allow adjacent projections to coalesce when possible.
 
- 		Run through the DocumentSources, and give each one the opportunity to coalesce with its successor.  If successful, remove the successor.
 
- 		Move all document sources to a temporary list.
 
- 		*/
 
- 		var tempVector = sourceVector.slice(0);
 
- 		sourceVector.length = 0;
 
- 		/* move the first one to the final list */
 
- 		sourceVector.push(tempVector[0]);
 
- 		/* run through the sources, coalescing them or keeping them */
 
- 		for(var tempn = tempVector.length, tempi = 1; tempi < tempn; ++tempi) {
 
- 			/*
 
- 			If we can't coalesce the source with the last, then move it to the final list, and make it the new last.
 
- 			(If we succeeded, then we're still on the same last, and there's no need to move or do anything with the source -- the destruction of tempVector will take care of the rest.)
 
- 			*/
 
- 			var lastSource = sourceVector[sourceVector.length - 1];
 
- 			var temp = tempVector[tempi];
 
- 			if (!temp || !lastSource){
 
- 				throw new Error("null document sources found");
 
- 			}
 
- 			if (!lastSource.coalesce(temp)){
 
- 				sourceVector.push(temp);
 
- 			}
 
- 		}
 
- 		/* optimize the elements in the pipeline */
 
- 		for(var i = 0, l = sourceVector.length; i<l; i++) {
 
- 			var iter = sourceVector[i];
 
- 			if (!iter) {
 
- 				throw new Error("Pipeline received empty document as argument");
 
- 			}
 
- 			iter.optimize();
 
- 		}
 
- 		return pipelineInstance;
 
- 	};
 
- 	/**
 
- 	 * Run the pipeline
 
- 	 *
 
- 	 * @method run 
 
- 	 * @param	{Object}	result	the results of running the pipeline will be stored on this object
 
- 	 * @param	{CursorDocumentSource}	source	the primary document source of the data
 
- 	**/
 
- 	proto.run = function run(result, source){
 
- 		for(var i = 0, l = this.sourceVector.length; i<l; i++) {
 
- 			var temp = this.sourceVector[i];
 
- 			temp.setSource(source);
 
- 			source = temp;
 
- 		}
 
- 		/* source is left pointing at the last source in the chain */
 
- 		/*
 
- 		Iterate through the resulting documents, and add them to the result.
 
- 		We do this even if we're doing an explain, in order to capture the document counts and other stats.
 
- 		However, we don't capture the result documents for explain.
 
- 		*/
 
- 		// the array in which the aggregation results reside
 
- 		// cant use subArrayStart() due to error handling
 
- 		var resultArray = [];
 
- 		for(var hasDoc = !source.eof(); hasDoc; hasDoc = source.advance()) {
 
- 			var document = source.getCurrent();
 
- 			/* add the document to the result set */
 
- 			resultArray.push(document);
 
- 			
 
- 			//Commenting out this assertion for munge.  MUHAHAHA!!!
 
- 			
 
- 			// object will be too large, assert. the extra 1KB is for headers
 
- //			uassert(16389,
 
- //					str::stream() << "aggregation result exceeds maximum document size (" << BSONObjMaxUserSize / (1024 * 1024) << "MB)",
 
- //					resultArray.len() < BSONObjMaxUserSize - 1024);
 
- 		}
 
- 		result.result = resultArray;
 
- 		
 
- 		return true;
 
- 	};
 
- 	
 
- 	return klass;
 
- })();
 
 
  |