| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190 | 
							- var Pipeline = module.exports = (function(){
 
- 	// CONSTRUCTOR
 
- 	var klass = function Pipeline(){
 
- 		
 
- 		this.sourceVector = [];//should be provate?
 
- 		
 
- 	}, base = Object, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
 
- 	
 
- 	var LimitDocumentSource = require('./documentSources/LimitDocumentSource'),
 
- 		MatchDocumentSource = require('./documentSources/MatchDocumentSource'),
 
- 		ProjectDocumentSource = require('./documentSources/ProjectDocumentSource'),
 
- 		SkipDocumentSource = require('./documentSources/SkipDocumentSource'),
 
- 		UnwindDocumentSource = require('./documentSources/UnwindDocumentSource'),
 
- 		GroupDocumentSource = require('./documentSources/GroupDocumentSource'),
 
- 		SortDocumentSource = require('./documentSources/SortDocumentSource');
 
- 	
 
- 	klass.StageDesc = {};//attaching this to the class for test cases
 
- 	klass.StageDesc[LimitDocumentSource.limitName] = LimitDocumentSource.createFromJson;
 
- 	klass.StageDesc[MatchDocumentSource.matchName] = MatchDocumentSource.createFromJson;
 
- 	klass.StageDesc[ProjectDocumentSource.projectName] = ProjectDocumentSource.createFromJson;
 
- 	klass.StageDesc[SkipDocumentSource.skipName] = SkipDocumentSource.createFromJson;
 
- 	klass.StageDesc[UnwindDocumentSource.unwindName] = UnwindDocumentSource.createFromJson;
 
- 	klass.StageDesc[GroupDocumentSource.groupName] = GroupDocumentSource.createFromJson;
 
- 	klass.StageDesc[SortDocumentSource.sortName] = SortDocumentSource.createFromJson;
 
- 	
 
-     /**
 
-      * Create a pipeline from the command.
 
- 	 *
 
-      * @param	{Object} cmdObj the command object sent from the client
 
-      * @returns	{Array}	the pipeline, if created, otherwise a NULL reference
 
-      **/
 
- 	klass.parseCommand = function parseCommand(cmdObj){
 
- 		var pipelineInstance = new Pipeline(),
 
- 			pipeline = cmdObj;//munge: skipping the command parsing since all we care about is the pipeline
 
- 		
 
- 		var sourceVector = pipelineInstance.sourceVector,
 
- 			nSteps = pipeline.length;
 
- 		for( var iStep = 0; iStep<nSteps; ++iStep){
 
-             /* pull out the pipeline element as an object */
 
- 			var pipeElement = pipeline[iStep];
 
- 			if (!(pipeElement instanceof Object)){
 
- 				throw new Error("pipeline element " + iStep + " is not an object; code 15942" );
 
- 			}
 
- 			
 
-             // Parse a pipeline stage from 'obj'.
 
- 			var obj = pipeElement;
 
- 			if (Object.keys(obj).length !== 1){
 
- 				throw new Error("A pipeline stage specification object must contain exactly one field; code 16435" );
 
- 			}
 
-             // Create a DocumentSource pipeline stage from 'stageSpec'.
 
-             var stageName = Object.keys(obj)[0],
 
- 				stageSpec = obj[stageName],
 
- 				desc = klass.StageDesc[stageName];
 
- 				
 
- 			if (!desc){
 
- 				throw new Error("Unrecognized pipeline stage name: '" + stageName + "'; code 16435" );
 
- 			}
 
- 			
 
-             var stage = desc(stageSpec);
 
-             //verify(stage);
 
-             stage.setPipelineStep(iStep);
 
-             sourceVector.push(stage);
 
- 		}
 
- 		
 
-         /* if there aren't any pipeline stages, there's nothing more to do */
 
- 		if (!sourceVector.length){
 
- 			return pipelineInstance;
 
- 		}
 
- 		
 
- 		/*
 
-           Move filters up where possible.
 
-           CW TODO -- move filter past projections where possible, and noting
 
-           corresponding field renaming.
 
-         */
 
-         /*
 
-           Wherever there is a match immediately following a sort, swap them.
 
-           This means we sort fewer items.  Neither changes the documents in
 
-           the stream, so this transformation shouldn't affect the result.
 
-           We do this first, because then when we coalesce operators below,
 
-           any adjacent matches will be combined.
 
-          */
 
-         for(var srcn = sourceVector.length, srci = 1; srci < srcn; ++srci) {
 
-             var source = sourceVector[srci];
 
-             if (source.constructor === MatchDocumentSource) {
 
-                 var previous = sourceVector[srci - 1];
 
-                 if (previous.constructor === klass.SortDocumentSource) { //TODO: remove 'sort.' once sort is implemented!!!
 
-                     /* swap this item with the previous */
 
-                     sourceVector[srci - 1] = source;
 
-                     sourceVector[srci] = previous;
 
-                 }
 
-             }
 
-         }
 
-         
 
- 		/*
 
-           Coalesce adjacent filters where possible.  Two adjacent filters
 
-           are equivalent to one filter whose predicate is the conjunction of
 
-           the two original filters' predicates.  For now, capture this by
 
-           giving any DocumentSource the option to absorb it's successor; this
 
-           will also allow adjacent projections to coalesce when possible.
 
-           Run through the DocumentSources, and give each one the opportunity
 
-           to coalesce with its successor.  If successful, remove the
 
-           successor.
 
-           Move all document sources to a temporary list.
 
-         */
 
-         var tempVector = sourceVector.slice(0);
 
-         sourceVector.length = 0;
 
-         /* move the first one to the final list */
 
-         sourceVector.push(tempVector[0]);
 
-         /* run through the sources, coalescing them or keeping them */
 
-         for(var tempn = tempVector.length, tempi = 1; tempi < tempn; ++tempi) {
 
-             /*
 
-               If we can't coalesce the source with the last, then move it
 
-               to the final list, and make it the new last.  (If we succeeded,
 
-               then we're still on the same last, and there's no need to move
 
-               or do anything with the source -- the destruction of tempVector
 
-               will take care of the rest.)
 
-             */
 
-             var lastSource = sourceVector[sourceVector.length - 1];
 
-             var temp = tempVector[tempi];
 
-             if (!temp || !lastSource){
 
- 				throw new Error("null document sources found");
 
-             }
 
-             if (!lastSource.coalesce(temp)){
 
-                 sourceVector.push(temp);
 
-             }
 
-         }
 
-         /* optimize the elements in the pipeline */
 
-         for(var i = 0, l = sourceVector.length; i<l; i++) {
 
- 			var iter = sourceVector[i];
 
-             if (!iter) {
 
-                 throw new Error("Pipeline received empty document as argument");
 
-             }
 
-             iter.optimize();
 
-         }
 
-         return pipelineInstance;
 
- 	};
 
- 	/**
 
- 	 * Run the pipeline
 
- 	 * 
 
- 	 * @param	{Object}	result	the results of running the pipeline will be stored on this object
 
- 	 * @param	{CursorDocumentSource}	source	the primary document source of the data
 
- 	**/
 
- 	proto.run = function run(result, source){
 
-         for(var i = 0, l = this.sourceVector.length; i<l; i++) {
 
- 			var temp = this.sourceVector[i];
 
-             temp.setSource(source);
 
-             source = temp;
 
-         }
 
-         /* source is left pointing at the last source in the chain */
 
-         /*
 
-           Iterate through the resulting documents, and add them to the result.
 
-           We do this even if we're doing an explain, in order to capture
 
-           the document counts and other stats.  However, we don't capture
 
-           the result documents for explain.
 
-           */
 
-         // the array in which the aggregation results reside
 
-         // cant use subArrayStart() due to error handling
 
-         var resultArray = [];
 
-         for(var hasDoc = !source.eof(); hasDoc; hasDoc = source.advance()) {
 
-             var document = source.getCurrent();
 
-             /* add the document to the result set */
 
-             resultArray.push(document);
 
-             
 
-             //Commenting out this assertion for munge.  MUHAHAHA!!!
 
-             
 
-             // object will be too large, assert. the extra 1KB is for headers
 
- //            uassert(16389,
 
- //                    str::stream() << "aggregation result exceeds maximum document size ("
 
- //                                  << BSONObjMaxUserSize / (1024 * 1024) << "MB)",
 
- //                    resultArray.len() < BSONObjMaxUserSize - 1024);
 
-         }
 
-         result.result = resultArray;
 
-         
 
- 		return true;
 
- 	};
 
- 	
 
- 	return klass;
 
- })();
 
 
  |