| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504 | 
							- "use strict";
 
- var async = require("async"),
 
- 	DepsTracker = require("./DepsTracker"),
 
- 	documentSources = require("./documentSources/"),
 
- 	DocumentSource = documentSources.DocumentSource,
 
- 	LimitDocumentSource = documentSources.LimitDocumentSource,
 
- 	MatchDocumentSource = documentSources.MatchDocumentSource,
 
- 	ProjectDocumentSource = documentSources.ProjectDocumentSource,
 
- 	SkipDocumentSource = documentSources.SkipDocumentSource,
 
- 	UnwindDocumentSource = documentSources.UnwindDocumentSource,
 
- 	GroupDocumentSource = documentSources.GroupDocumentSource,
 
- 	OutDocumentSource = documentSources.OutDocumentSource,
 
- 	GeoNearDocumentSource = documentSources.GeoNearDocumentSource,
 
- 	RedactDocumentSource = documentSources.RedactDocumentSource,
 
- 	SortDocumentSource = documentSources.SortDocumentSource;
 
- /**
 
-  * mongodb "commands" (sent via db.$cmd.findOne(...)) subclass to make a command.  define a singleton object for it.
 
-  * @class Pipeline
 
-  * @namespace mungedb-aggregate.pipeline
 
-  * @module mungedb-aggregate
 
-  * @constructor
 
-  **/
 
- var Pipeline = module.exports = function Pipeline(theCtx){
 
- 	this.sources = null;
 
- 	this.explain = false;
 
- 	this.splitMongodPipeline = false;
 
- 	this.ctx = theCtx;
 
- }, klass = Pipeline, proto = klass.prototype;
 
- klass.COMMAND_NAME = "aggregate";
 
- klass.PIPELINE_NAME = "pipeline";
 
- klass.EXPLAIN_NAME = "explain";
 
- klass.FROM_ROUTER_NAME = "fromRouter";
 
- klass.SERVER_PIPELINE_NAME = "serverPipeline";
 
- klass.MONGOS_PIPELINE_NAME = "mongosPipeline";
 
- klass.BATCH_SIZE_NAME = "batchSize";
 
- klass.stageDesc = {}; //NOTE: DEVIATION FROM MONGO: attaching to the class to make it easier to test and extend
 
- klass.stageDesc[GeoNearDocumentSource.geoNearName] = GeoNearDocumentSource.createFromJson;
 
- klass.stageDesc[GroupDocumentSource.groupName] = GroupDocumentSource.createFromJson;
 
- klass.stageDesc[LimitDocumentSource.limitName] = LimitDocumentSource.createFromJson;
 
- klass.stageDesc[MatchDocumentSource.matchName] = MatchDocumentSource.createFromJson;
 
- //SKIPPED: klass.stageDesc[MergeCursorsDocumentSource.name] = MergeCursorsDocumentSource.createFromJson;
 
- klass.stageDesc[OutDocumentSource.outName] = OutDocumentSource.createFromJson;
 
- klass.stageDesc[ProjectDocumentSource.projectName] = ProjectDocumentSource.createFromJson;
 
- klass.stageDesc[RedactDocumentSource.redactName] = ProjectDocumentSource.createFromJson;
 
- klass.stageDesc[SkipDocumentSource.skipName] = SkipDocumentSource.createFromJson;
 
- klass.stageDesc[SortDocumentSource.sortName] = SortDocumentSource.createFromJson;
 
- klass.stageDesc[UnwindDocumentSource.unwindName] = UnwindDocumentSource.createFromJson;
 
- // klass.nStageDesc = Object.keys(klass.stageDesc).length; //NOTE: DEVIATION FROM MONGO: not using this to bsearch these in `.parseCommand`
 
- klass.optimizations = {};
 
- klass.optimizations.local = {};
 
- klass.optimizations.sharded = {};
 
- /**
 
-  * Moves $match before $sort when they are placed next to one another
 
-  * @static
 
-  * @method moveMatchBeforeSort
 
-  * @param pipelineInst An instance of a Pipeline
 
-  */
 
- klass.optimizations.local.moveMatchBeforeSort = function moveMatchBeforeSort(pipelineInst) {
 
- 	var sources = pipelineInst.sources;
 
- 	for(var srcn = sources.length, srci = 1; srci < srcn; ++srci) {
 
- 		var source = sources[srci];
 
- 		if(source.constructor === MatchDocumentSource) {
 
- 			var previous = sources[srci - 1];
 
- 			if(previous && previous.constructor === SortDocumentSource) { //Added check that previous exists
 
- 				/* swap this item with the previous */
 
- 				sources[srci] = previous;
 
- 				sources[srci-1] = source;
 
- 			}
 
- 		}
 
- 	}
 
- };
 
- /**
 
-  * Moves $limit before $skip when they are placed next to one another
 
-  * @static
 
-  * @method moveLimitBeforeSkip
 
-  * @param pipelineInst An instance of a Pipeline
 
-  */
 
- klass.optimizations.local.moveLimitBeforeSkip = function moveLimitBeforeSkip(pipelineInst) {
 
- 	var sources = pipelineInst.sources;
 
- 	if(sources.length === 0) return;
 
- 	for(var i = sources.length - 1; i >= 1 /* not looking at 0 */; i--) {
 
- 		var limit = sources[i].constructor === LimitDocumentSource ? sources[i] : undefined,
 
- 			skip = sources[i-1].constructor === SkipDocumentSource ? sources[i-1] : undefined;
 
- 		if(limit && skip) {
 
- 			limit.setLimit(limit.getLimit() + skip.getSkip());
 
- 			sources[i-1] = limit;
 
- 			sources[i] = skip;
 
- 			// Start at back again. This is needed to handle cases with more than 1 $limit
 
- 			// (S means skip, L means limit)
 
- 			//
 
- 			// These two would work without second pass (assuming back to front ordering)
 
- 			// SL   -> LS
 
- 			// SSL  -> LSS
 
- 			//
 
- 			// The following cases need a second pass to handle the second limit
 
- 			// SLL  -> LLS
 
- 			// SSLL -> LLSS
 
- 			// SLSL -> LLSS
 
- 			i = sources.length; // decremented before next pass
 
- 		}
 
- 	}
 
- };
 
- /**
 
-  * Attempts to coalesce every pipeline stage into the previous pipeline stage, starting after the first
 
-  * @static
 
-  * @method coalesceAdjacent
 
-  * @param pipelineInst An instance of a Pipeline
 
-  */
 
- klass.optimizations.local.coalesceAdjacent = function coalesceAdjacent(pipelineInst) {
 
- 	var sources = pipelineInst.sources;
 
- 	if(sources.length === 0) return;
 
- 	// move all sources to a temporary list
 
- 	var moveSrc = sources.pop(),
 
- 		tempSources = [];
 
- 	while(moveSrc) {
 
- 		tempSources.unshift(moveSrc);
 
- 		moveSrc = sources.pop();
 
- 	}
 
- 	// move the first one to the final list
 
- 	sources.push(tempSources[0]);
 
- 	// run through the sources, coalescing them or keeping them
 
- 	for(var tempn = tempSources.length, tempi = 1; tempi < tempn; ++tempi) {
 
- 		// If we can't coalesce the source with the last, then move it
 
- 		// to the final list, and make it the new last.  (If we succeeded,
 
- 		// then we're still on the same last, and there's no need to move
 
- 		// or do anything with the source -- the destruction of tempSources
 
- 		// will take care of the rest.)
 
- 		var lastSource = sources[sources.length-1],
 
- 			tempSrc = tempSources[tempi];
 
- 		if(!(lastSource && tempSrc)) {
 
- 			throw new Error("Must have a last and current source"); // verify(lastSource && tempSrc);
 
- 		}
 
- 		if(!lastSource.coalesce(tempSrc)) sources.push(tempSrc);
 
- 	}
 
- };
 
- /**
 
-  * Iterates over sources in the pipelineInst, optimizing each
 
-  * @static
 
-  * @method optimizeEachDocumentSource
 
-  * @param pipelineInst An instance of a Pipeline
 
-  */
 
- klass.optimizations.local.optimizeEachDocumentSource = function optimizeEachDocumentSource(pipelineInst) {
 
- 	var sources = pipelineInst.sources;
 
- 	for(var srci = 0, srcn = sources.length; srci < srcn; ++srci) {
 
- 		sources[srci].optimize();
 
- 	}
 
- };
 
- /**
 
-  * Auto-places a $match before a $redact when the $redact is the first item in a pipeline
 
-  * @static
 
-  * @method duplicateMatchBeforeInitalRedact
 
-  * @param pipelineInst An instance of a Pipeline
 
-  */
 
- klass.optimizations.local.duplicateMatchBeforeInitalRedact = function duplicateMatchBeforeInitalRedact(pipelineInst) {
 
- 	var sources = pipelineInst.sources;
 
- 	if(sources.length >= 2 && sources[0].constructor === RedactDocumentSource) {
 
- 		if(sources[1].constructor === MatchDocumentSource) {
 
- 			var match = sources[1],
 
- 				redactSafePortion = match.redactSafePortion();
 
- 			if(Object.keys(redactSafePortion).length > 0) {
 
- 				sources.shift(MatchDocumentSource.createFromJson(redactSafePortion, pipelineInst.ctx));
 
- 			}
 
- 		}
 
- 	}
 
- };
 
- //SKIPPED: addRequiredPrivileges
 
- /**
 
-  * Perform optimizations for a pipeline through sharding
 
-  * @method splitForSharded
 
-  */
 
- proto.splitForSharded = function splitForSharded() {
 
- 	var shardPipeline = new Pipeline({});
 
- 	shardPipeline.explain = this.explain;
 
- 	klass.optimizations.sharded.findSplitPoint(shardPipeline, this);
 
- 	klass.optimizations.sharded.moveFinalUnwindFromShardsToMerger(shardPipeline, this);
 
- 	//klass.optimizations.sharded.limitFieldsSentFromShardsToMerger(shardPipeline, this);
 
- 	return shardPipeline;
 
- };
 
- /**
 
-  * Split the source into Merge sources and Shard sources
 
-  * @static
 
-  * @method findSplitPoint
 
-  * @param shardPipe Shard sources
 
-  * @param mergePipe Merge sources
 
-  */
 
- klass.optimizations.sharded.findSplitPoint = function findSplitPoint(shardPipe, mergePipe) {
 
- 	while(mergePipe.sources.length > 0) {
 
- 		var current = mergePipe.sources[0];
 
- 		mergePipe.sources.splice(0, 1);
 
- 		if (current.isSplittable && current.isSplittable()) {
 
- 			var shardSource = current.getShardSource(),
 
- 				mergeSource = current.getMergeSource();
 
- 			//if (typeof shardSource != "undefined") { shardPipe.sources.push(shardSource); }		//push_back
 
- 			if (shardSource) { shardPipe.sources.push(shardSource); }		//push_back
 
- 			//if (typeof mergeSource != "undefined") { mergePipe.sources.unshift(mergeSource); }	//push_front
 
- 			if (mergeSource) { mergePipe.sources.unshift(mergeSource); }	//push_front
 
- 			break;
 
- 		}
 
- 		else {
 
- 			if (!shardPipe.sources) { shardPipe.sources = []; }
 
- 			shardPipe.sources.push(current);
 
- 		}
 
- 	}
 
- };
 
- /**
 
-  * Optimize pipeline through moving unwind to the end
 
-  * @static
 
-  * @method moveFinalUnwindFromShardsToMerger
 
-  * @param shardPipe shard sources
 
-  * @param mergePipe merge sources
 
-  */
 
- klass.optimizations.sharded.moveFinalUnwindFromShardsToMerger = function moveFinalUnwindFromShardsToMerger(shardPipe, mergePipe) {
 
- 	if (true) {
 
- 		while (shardPipe.sources !== null &&
 
- 				shardPipe.sources.length > 0 &&
 
- 				shardPipe.sources[shardPipe.sources.length - 1] instanceof UnwindDocumentSource) {
 
- 			mergePipe.sources.unshift(shardPipe.sources.pop());
 
- 		}
 
- 	}
 
- };
 
- //SKIPPED: optimizations.sharded.limitFieldsSentFromShardsToMerger.  Somehow what this produces is not handled by Expression.js (err 16404)
 
- /**
 
-  * Create an `Array` of `DocumentSource`s from the given JSON pipeline
 
-  * // NOTE: DEVIATION FROM MONGO: split out into a separate function to better allow extensions (was in parseCommand)
 
-  * @static
 
-  * @method parseDocumentSources
 
-  * @param pipeline  {Array}  The JSON pipeline
 
-  * @returns {Array}  The parsed `DocumentSource`s
 
-  */
 
- klass.parseDocumentSources = function parseDocumentSources(pipeline, ctx){
 
- 	var sources = [];
 
- 	for (var nSteps = pipeline.length, iStep = 0; iStep < nSteps; ++iStep) {
 
- 		// pull out the pipeline element as an object
 
- 		var pipeElement = pipeline[iStep];
 
- 		if (!(pipeElement instanceof Object)) throw new Error("pipeline element " + iStep + " is not an object; code 15942");
 
- 		var obj = pipeElement;
 
- 		// Parse a pipeline stage from 'obj'.
 
- 		if (Object.keys(obj).length !== 1)
 
- 			throw new Error("A pipeline stage specification object must contain exactly one field; code 16435");
 
- 		var stageName = Object.keys(obj)[0],
 
- 			stageSpec = obj[stageName];
 
- 		// Create a DocumentSource pipeline stage from 'stageSpec'.
 
- 		var desc = klass.stageDesc[stageName];
 
- 		if (!desc) throw new Error("Unrecognized pipeline stage name: '" + stageName + "'; uassert code 16436");
 
- 		// Parse the stage
 
- 		var stage = desc(stageSpec, ctx);
 
- 		if (!stage) throw new Error("Stage must not be undefined!"); // verify(stage)
 
- 		sources.push(stage);
 
- 		if(stage.constructor === OutDocumentSource && iStep !== nSteps - 1) {
 
- 			throw new Error("$out can only be the final stage in the pipeline; code 16991");
 
- 		}
 
- 	}
 
- 	return sources;
 
- };
 
- /**
 
-  * Create a pipeline from the command.
 
-  * @static
 
-  * @method parseCommand
 
-  * @param cmdObj  {Object}  The command object sent from the client
 
-  * @param   cmdObj.aggregate            {Array}    the thing to aggregate against // NOTE: DEVIATION FROM MONGO: not a collection name
 
-  * @param   cmdObj.pipeline             {Object}   the JSON pipeline of `DocumentSource` specs
 
-  * @param   cmdObj.explain              {Boolean}  should explain?
 
-  * @param   cmdObj.fromRouter           {Boolean}  is from router?
 
-  * @param   cmdObj.splitMongodPipeline	{Boolean}  should split?
 
-  * @param ctx     {Object}  Not used yet in mungedb-aggregate
 
-  * @returns	{Array}	the pipeline, if created, otherwise a NULL reference
 
-  */
 
- klass.parseCommand = function parseCommand(cmdObj, ctx){
 
- 	var pipelineNamespace = require("./"),
 
- 		Pipeline = pipelineNamespace.Pipeline,	// using require in case Pipeline gets replaced with an extension
 
- 		pipelineInst = new Pipeline(ctx);
 
- 	//gather the specification for the aggregation
 
- 	var pipeline;
 
- 	for (var fieldName in cmdObj) { //jshint ignore:line
 
- 		var cmdElement = cmdObj[fieldName];
 
- 		if (fieldName[0] === "$")
 
- 			continue;
 
- 		else if (fieldName === "cursor")
 
- 			continue;
 
- 		else if (fieldName === klass.COMMAND_NAME)
 
- 			continue; //look for the aggregation command
 
- 		else if (fieldName === klass.BATCH_SIZE_NAME)
 
- 			continue;
 
- 		else if (fieldName === klass.PIPELINE_NAME)
 
- 			pipeline = cmdElement; //check for the pipeline of JSON doc srcs
 
- 		else if (fieldName === klass.EXPLAIN_NAME)
 
- 			pipelineInst.explain = cmdElement; //check for explain option
 
- 		else if (fieldName === klass.FROM_ROUTER_NAME)
 
- 			ctx.inShard = cmdElement; //if the request came from the router, we're in a shard
 
- 		else if (fieldName === "allowDiskUsage") {
 
- 			if (typeof cmdElement !== "boolean")
 
- 				throw new Error("allowDiskUsage must be a bool, not a " + typeof allowDiskUsage + "; uassert code 16949");
 
- 		} else
 
- 			throw new Error("unrecognized field " + JSON.stringify(fieldName));
 
- 	}
 
- 	/**
 
- 	 * If we get here, we've harvested the fields we expect for a pipeline
 
- 	 * Set up the specified document source pipeline.
 
- 	 */
 
- 	// NOTE: DEVIATION FROM MONGO: split this into a separate function to simplify facilitate extensions (now in parseDocumentSources)
 
- 	pipelineInst.sources = Pipeline.parseDocumentSources(pipeline, ctx);
 
- 	// The order in which optimizations are applied can have significant impact on the
 
- 	// efficiency of the final pipeline. Be Careful!
 
- 	klass.optimizations.local.moveMatchBeforeSort(pipelineInst);
 
- 	klass.optimizations.local.moveLimitBeforeSkip(pipelineInst);
 
- 	klass.optimizations.local.coalesceAdjacent(pipelineInst);
 
- 	klass.optimizations.local.optimizeEachDocumentSource(pipelineInst);
 
- 	klass.optimizations.local.duplicateMatchBeforeInitalRedact(pipelineInst);
 
- 	return pipelineInst;
 
- };
 
- /**
 
-  * Gets the initial $match query when $match is the first pipeline stage
 
-  * @method run
 
-  * @param	inputSource		{DocumentSource}	The input document source for the pipeline
 
-  * @param	[callback]		{Function}			Optional callback function if using async extensions
 
-  * @return {Object}	An empty object or the match spec
 
-  */
 
- proto.getInitialQuery = function getInitialQuery() {
 
- 	var sources = this.sources;
 
- 	if (sources.length === 0)
 
- 		return {};
 
- 	/* look for an initial $match */
 
- 	var match = sources[0] instanceof MatchDocumentSource ? sources[0] : undefined;
 
- 	if (!match)
 
- 		return {};
 
- 	return match.getQuery();
 
- };
 
- /**
 
-  * Creates the JSON representation of the pipeline
 
-  * @method run
 
-  * @param	inputSource		{DocumentSource}	The input document source for the pipeline
 
-  * @param	[callback]		{Function}			Optional callback function if using async extensions
 
-  * @return {Object}	An empty object or the match spec
 
-  */
 
- proto.serialize = function serialize() {
 
- 	var serialized = {},
 
- 		array = [];
 
- 	// create an array out of the pipeline operations
 
- 	if (this.sources) {
 
- 		for (var i = 0; i < this.sources.length; i++) {
 
- 		//this.sources.forEach(function(source) {
 
- 			this.sources[i].serializeToArray(array);
 
- 		}
 
- 	}
 
- 	serialized[klass.COMMAND_NAME] = this.ctx && this.ctx.ns && this.ctx.ns.coll ? this.ctx.ns.coll : "";
 
- 	serialized[klass.PIPELINE_NAME] = array;
 
- 	if(this.explain) serialized[klass.EXPLAIN_NAME] = this.explain;
 
- 	return serialized;
 
- };
 
- /**
 
-  * Points each source at its previous source
 
-  * @method stitch
 
-  */
 
- proto.stitch = function stitch() {
 
- 	if(this.sources.length <= 0) throw new Error("should not have an empty pipeline; massert code 16600");
 
- 	/* chain together the sources we found */
 
- 	var prevSource = this.sources[0];
 
- 	for(var srci = 1, srcn = this.sources.length; srci < srcn; srci++) {
 
- 		var tempSource = this.sources[srci];
 
- 		tempSource.setSource(prevSource);
 
- 		prevSource = tempSource;
 
- 	}
 
- };
 
- /**
 
-  * Run the pipeline
 
-  * @method run
 
-  * @param [isAsync] {Boolean} whether or not to use setImmediate to force async calls (to avoid stack overflows)
 
-  * @param callback {Function} gets called once for each document result from the pipeline
 
-  */
 
- proto.run = function run(isAsync, callback) {
 
- 	if (typeof isAsync === "function") callback = isAsync, isAsync = true;
 
- 	// should not get here in the explain case
 
- 	if (this.explain) return callback(new Error("Assertion error: don't run pipeline in explain mode"));
 
- 	var nextDoc = null,
 
- 		finalSource = this.sources[this.sources.length - 1];
 
- 	async.doWhilst(
 
- 		function iterator(next) {
 
- 			return finalSource.getNext(function(err, doc) {
 
- 				if (isAsync) {
 
- 					async.setImmediate(function() {
 
- 						nextDoc = doc;
 
- 						callback(err, nextDoc);
 
- 						next(err);
 
- 					});
 
- 				} else { // sync mode; only for small sets, stack overflow on large sets
 
- 					nextDoc = doc;
 
- 					callback(err, nextDoc);
 
- 					next(err);
 
- 				}
 
- 			});
 
- 		},
 
- 		function test() {
 
- 			return nextDoc !== null;
 
- 		},
 
- 		function done(err) {
 
- 			//nothing to do here
 
- 		}
 
- 	);
 
- };
 
- /**
 
-  * Get the pipeline explanation
 
-  * @method writeExplainOps
 
-  * @return {Array}	An array of source explanations
 
-  */
 
- proto.writeExplainOps = function writeExplainOps() {
 
- 	var array = [];
 
- 	this.sources.forEach(function(source) {
 
- 		source.serializeToArray(array, /*explain=*/true);
 
- 	});
 
- 	return array;
 
- };
 
- /**
 
-  * Set the source of documents for the pipeline
 
-  * @method addInitialSource
 
-  * @param source {DocumentSource}
 
-  */
 
- proto.addInitialSource = function addInitialSource(source) {
 
- 	this.sources.unshift(source);
 
- };
 
- //SKIPPED: canRunInMongos
 
- //Note: Deviation from Mongo: Mongo 2.6.5 passes a param to getDependencies
 
- //	to calculate TextScore.  mungedb-aggregate doesn't do this, so no param is needed.
 
- proto.getDependencies = function getDependencies () {
 
-     var deps = new DepsTracker(),
 
- 		knowAllFields = false;
 
-     //NOTE: Deviation from Mongo -- We aren't using Meta and textscore
 
-     for (var i = 0; i < this.sources.length && !knowAllFields; i++) {
 
-         var localDeps = new DepsTracker(),
 
-         	status = this.sources[i].getDependencies(localDeps);
 
-         if (status === DocumentSource.GetDepsReturn.NOT_SUPPORTED) {
 
-             // Assume this stage needs everything. We may still know something about our
 
-             // dependencies if an earlier stage returned either EXHAUSTIVE_FIELDS or
 
-             // EXHAUSTIVE_META.
 
-             break;
 
-         }
 
-         if (!knowAllFields) {
 
-             for (var key in localDeps.fields) //jshint ignore:line
 
-             	deps.fields[key] = localDeps.fields[key];
 
-             if (localDeps.needWholeDocument)
 
-                 deps.needWholeDocument = true;
 
-             knowAllFields = status & DocumentSource.GetDepsReturn.EXHAUSTIVE_FIELDS;
 
-         }
 
-     }
 
-     if (!knowAllFields)
 
-         deps.needWholeDocument = true; // don't know all fields we need
 
-     return deps;
 
- };
 
 
  |