| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425 |
- "use strict";
- /**
- * mongodb "commands" (sent via db.$cmd.findOne(...)) subclass to make a command. define a singleton object for it.
- * @class Pipeline
- * @namespace mungedb-aggregate.pipeline
- * @module mungedb-aggregate
- * @constructor
- **/
- // CONSTRUCTOR
- var Pipeline = module.exports = function Pipeline(theCtx){
- this.sources = null;
- this.explain = false;
- this.splitMongodPipeline = false;
- this.ctx = theCtx;
- this.SYNC_MODE = false;
- }, klass = Pipeline, base = Object, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
- var DocumentSource = require("./documentSources/DocumentSource"),
- LimitDocumentSource = require('./documentSources/LimitDocumentSource'),
- MatchDocumentSource = require('./documentSources/MatchDocumentSource'),
- ProjectDocumentSource = require('./documentSources/ProjectDocumentSource'),
- SkipDocumentSource = require('./documentSources/SkipDocumentSource'),
- UnwindDocumentSource = require('./documentSources/UnwindDocumentSource'),
- GroupDocumentSource = require('./documentSources/GroupDocumentSource'),
- OutDocumentSource = require('./documentSources/OutDocumentSource'),
- GeoNearDocumentSource = require('./documentSources/GeoNearDocumentSource'),
- RedactDocumentSource = require('./documentSources/RedactDocumentSource'),
- SortDocumentSource = require('./documentSources/SortDocumentSource');
- klass.COMMAND_NAME = "aggregate";
- klass.PIPELINE_NAME = "pipeline";
- klass.EXPLAIN_NAME = "explain";
- klass.FROM_ROUTER_NAME = "fromRouter";
- klass.SERVER_PIPELINE_NAME = "serverPipeline";
- klass.MONGOS_PIPELINE_NAME = "mongosPipeline";
- klass.stageDesc = {};//attaching this to the class for test cases
- klass.stageDesc[GeoNearDocumentSource.geoNearName] = GeoNearDocumentSource.createFromJson;
- klass.stageDesc[GroupDocumentSource.groupName] = GroupDocumentSource.createFromJson;
- klass.stageDesc[LimitDocumentSource.limitName] = LimitDocumentSource.createFromJson;
- klass.stageDesc[MatchDocumentSource.matchName] = MatchDocumentSource.createFromJson;
- klass.stageDesc[OutDocumentSource.outName] = OutDocumentSource.createFromJson;
- klass.stageDesc[ProjectDocumentSource.projectName] = ProjectDocumentSource.createFromJson;
- klass.stageDesc[RedactDocumentSource.redactName] = ProjectDocumentSource.createFromJson;
- klass.stageDesc[SkipDocumentSource.skipName] = SkipDocumentSource.createFromJson;
- klass.stageDesc[SortDocumentSource.sortName] = SortDocumentSource.createFromJson;
- klass.stageDesc[UnwindDocumentSource.unwindName] = UnwindDocumentSource.createFromJson;
- klass.nStageDesc = Object.keys(klass.stageDesc).length;
- klass.optimizations = {};
- klass.optimizations.local = {};
- /**
- * Moves $match before $sort when they are placed next to one another
- * @static
- * @method moveMatchBeforeSort
- * @param pipelineInst An instance of a Pipeline
- **/
- klass.optimizations.local.moveMatchBeforeSort = function moveMatchBeforeSort(pipelineInst) {
- var sources = pipelineInst.sources;
- for(var srcn = sources.length, srci = 1; srci < srcn; ++srci) {
- var source = sources[srci];
- if(source.constructor === MatchDocumentSource) {
- var previous = sources[srci - 1];
- if(previous && previous.constructor === SortDocumentSource) { //Added check that previous exists
- /* swap this item with the previous */
- sources[srci] = previous;
- sources[srci-1] = source;
- }
- }
- }
- };
- /**
- * Moves $limit before $skip when they are placed next to one another
- * @static
- * @method moveLimitBeforeSkip
- * @param pipelineInst An instance of a Pipeline
- **/
- klass.optimizations.local.moveLimitBeforeSkip = function moveLimitBeforeSkip(pipelineInst) {
- var sources = pipelineInst.sources;
- if(sources.length === 0) return;
- for(var i = sources.length - 1; i >= 1 /* not looking at 0 */; i--) {
- var limit = sources[i].constructor === LimitDocumentSource ? sources[i] : undefined,
- skip = sources[i-1].constructor === SkipDocumentSource ? sources[i-1] : undefined;
- if(limit && skip) {
- limit.setLimit(limit.getLimit() + skip.getSkip());
- sources[i-1] = limit;
- sources[i] = skip;
- // Start at back again. This is needed to handle cases with more than 1 $limit
- // (S means skip, L means limit)
- //
- // These two would work without second pass (assuming back to front ordering)
- // SL -> LS
- // SSL -> LSS
- //
- // The following cases need a second pass to handle the second limit
- // SLL -> LLS
- // SSLL -> LLSS
- // SLSL -> LLSS
- i = sources.length; // decremented before next pass
- }
- }
- };
- /**
- * Attempts to coalesce every pipeline stage into the previous pipeline stage, starting after the first
- * @static
- * @method coalesceAdjacent
- * @param pipelineInst An instance of a Pipeline
- **/
- klass.optimizations.local.coalesceAdjacent = function coalesceAdjacent(pipelineInst) {
- var sources = pipelineInst.sources;
- if(sources.length === 0) return;
- // move all sources to a temporary list
- var moveSrc = sources.pop(),
- tempSources = [];
- while(moveSrc) {
- tempSources.unshift(moveSrc);
- moveSrc = sources.pop();
- }
- // move the first one to the final list
- sources.push(tempSources[0]);
- // run through the sources, coalescing them or keeping them
- for(var tempn = tempSources.length, tempi = 1; tempi < tempn; ++tempi) {
- // If we can't coalesce the source with the last, then move it
- // to the final list, and make it the new last. (If we succeeded,
- // then we're still on the same last, and there's no need to move
- // or do anything with the source -- the destruction of tempSources
- // will take care of the rest.)
- var lastSource = sources[sources.length-1],
- tempSrc = tempSources[tempi];
- if(!(lastSource && tempSrc)) {
- throw new Error('Must have a last and current source'); // verify(lastSource && tempSrc);
- }
- if(!lastSource.coalesce(tempSrc)) sources.push(tempSrc);
- }
- };
- /**
- * Iterates over sources in the pipelineInst, optimizing each
- * @static
- * @method optimizeEachDocumentSource
- * @param pipelineInst An instance of a Pipeline
- **/
- klass.optimizations.local.optimizeEachDocumentSource = function optimizeEachDocumentSource(pipelineInst) {
- var sources = pipelineInst.sources;
- for(var srci = 0, srcn = sources.length; srci < srcn; ++srci) {
- sources[srci].optimize();
- }
- };
- /**
- * Auto-places a $match before a $redact when the $redact is the first item in a pipeline
- * @static
- * @method duplicateMatchBeforeInitalRedact
- * @param pipelineInst An instance of a Pipeline
- **/
- klass.optimizations.local.duplicateMatchBeforeInitalRedact = function duplicateMatchBeforeInitalRedact(pipelineInst) {
- var sources = pipelineInst.sources;
- if(sources.length >= 2 && sources[0].constructor === RedactDocumentSource) {
- if(sources[1].constructor === MatchDocumentSource) {
- var match = sources[1],
- redactSafePortion = match.redactSafePortion();
- if(Object.keys(redactSafePortion).length > 0) {
- sources.shift(MatchDocumentSource.createFromJson(redactSafePortion, pipelineInst.ctx));
- }
- }
- }
- };
- /**
- * Create an `Array` of `DocumentSource`s from the given JSON pipeline
- * // NOTE: DEVIATION FROM MONGO: split out into a separate function to better allow extensions (was in parseCommand)
- * @static
- * @method parseDocumentSources
- * @param pipeline {Array} The JSON pipeline
- * @returns {Array} The parsed `DocumentSource`s
- **/
- klass.parseDocumentSources = function parseDocumentSources(pipeline, ctx){
- var sources = [];
- for (var nSteps = pipeline.length, iStep = 0; iStep < nSteps; ++iStep) {
- // pull out the pipeline element as an object
- var pipeElement = pipeline[iStep];
- if (!(pipeElement instanceof Object)) throw new Error("pipeline element " + iStep + " is not an object; code 15942");
- var obj = pipeElement;
- // Parse a pipeline stage from 'obj'.
- if (Object.keys(obj).length !== 1) throw new Error("A pipeline stage specification object must contain exactly one field; code 16435");
- var stageName = Object.keys(obj)[0],
- stageSpec = obj[stageName];
- // Create a DocumentSource pipeline stage from 'stageSpec'.
- var desc = klass.stageDesc[stageName];
- if (!desc) throw new Error("Unrecognized pipeline stage name: '" + stageName + "'; code 16435");
- // Parse the stage
- var stage = desc(stageSpec, ctx);
- if (!stage) throw new Error("Stage must not be undefined!"); // verify(stage)
- sources.push(stage);
- if(stage.constructor === OutDocumentSource && iStep !== nSteps - 1) {
- throw new Error("$out can only be the final stage in the pipeline; code 16435");
- }
- }
- return sources;
- };
- /**
- * Create a pipeline from the command.
- * @static
- * @method parseCommand
- * @param cmdObj {Object} The command object sent from the client
- * @param cmdObj.aggregate {Array} the thing to aggregate against; // NOTE: DEVIATION FROM MONGO: expects an Array of inputs rather than a collection name
- * @param cmdObj.pipeline {Object} the JSON pipeline of `DocumentSource` specs
- * @param cmdObj.explain {Boolean} should explain?
- * @param cmdObj.fromRouter {Boolean} is from router?
- * @param cmdObj.splitMongodPipeline {Boolean} should split?
- * @param ctx {Object} Not used yet in mungedb-aggregate
- * @returns {Array} the pipeline, if created, otherwise a NULL reference
- **/
- klass.parseCommand = function parseCommand(cmdObj, ctx){
- var pipelineNamespace = require("./"),
- Pipeline = pipelineNamespace.Pipeline, // using require in case Pipeline gets replaced with an extension
- pipelineInst = new Pipeline(ctx);
- //gather the specification for the aggregation
- var pipeline;
- for(var fieldName in cmdObj){
- var cmdElement = cmdObj[fieldName];
- if(fieldName[0] == "$") continue;
- else if(fieldName == "cursor") continue;
- else if(fieldName == klass.COMMAND_NAME) continue; //look for the aggregation command
- else if(fieldName == klass.PIPELINE_NAME) pipeline = cmdElement; //check for the pipeline of JSON doc srcs
- else if(fieldName == klass.EXPLAIN_NAME) pipelineInst.explain = cmdElement; //check for explain option
- else if(fieldName == klass.FROM_ROUTER_NAME) ctx.inShard = cmdElement; //if the request came from the router, we're in a shard
- else if(fieldName == "allowDiskUsage") {
- if(typeof cmdElement !== 'boolean') throw new Error("allowDiskUsage must be a bool, not a " + typeof allowDiskUsage+ "; uassert code 16949");
- }
- else throw new Error("unrecognized field " + JSON.stringify(fieldName));
- }
- /**
- * If we get here, we've harvested the fields we expect for a pipeline
- * Set up the specified document source pipeline.
- **/
- // NOTE: DEVIATION FROM MONGO: split this into a separate function to simplify and better allow for extensions (now in parseDocumentSources)
- var sources = pipelineInst.sources = Pipeline.parseDocumentSources(pipeline, ctx);
- klass.optimizations.local.moveMatchBeforeSort(pipelineInst);
- klass.optimizations.local.moveLimitBeforeSkip(pipelineInst);
- klass.optimizations.local.coalesceAdjacent(pipelineInst);
- klass.optimizations.local.optimizeEachDocumentSource(pipelineInst);
- klass.optimizations.local.duplicateMatchBeforeInitalRedact(pipelineInst);
- return pipelineInst;
- };
- // sync callback for Pipeline#run if omitted
- klass.SYNC_CALLBACK = function(err, results){
- if (err) throw err;
- return results.result;
- };
- function ifError(err) {
- if (err) throw err;
- }
- /**
- * Gets the initial $match query when $match is the first pipeline stage
- * @method run
- * @param inputSource {DocumentSource} The input document source for the pipeline
- * @param [callback] {Function} Optional callback function if using async extensions
- * @return {Object} An empty object or the match spec
- **/
- proto.getInitialQuery = function getInitialQuery() {
- var sources = this.sources;
- if(sources.length === 0) {
- return {};
- }
- /* look for an initial $match */
- var match = sources[0].constructor === MatchDocumentSource ? sources[0] : undefined;
- if(!match) return {};
- return match.getQuery();
- };
- /**
- * Creates the JSON representation of the pipeline
- * @method run
- * @param inputSource {DocumentSource} The input document source for the pipeline
- * @param [callback] {Function} Optional callback function if using async extensions
- * @return {Object} An empty object or the match spec
- **/
- proto.serialize = function serialize() {
- var serialized = {},
- array = [];
- // create an array out of the pipeline operations
- this.sources.forEach(function(source) {
- source.serializeToArray(array);
- });
- serialized[klass.COMMAND_NAME] = this.ctx && this.ctx.ns && this.ctx.ns.coll ? this.ctx.ns.coll : '';
- serialized[klass.PIPELINE_NAME] = array;
- if(this.explain) serialized[klass.EXPLAIN_NAME] = this.explain;
- return serialized;
- };
- /**
- * Points each source at its previous source
- * @method stitch
- **/
- proto.stitch = function stitch() {
- if(this.sources.length <= 0) throw new Error("should not have an empty pipeline; massert code 16600");
- /* chain together the sources we found */
- var prevSource = this.sources[0];
- for(var srci = 1, srcn = this.sources.length; srci < srcn; srci++) {
- var tempSource = this.sources[srci];
- tempSource.setSource(prevSource);
- prevSource = tempSource;
- }
- };
- /**
- * Run the pipeline
- * @method run
- * @param callback {Function} Optional. Run the pipeline in async mode; callback(err, result)
- * @return result {Object} The result of executing the pipeline
- **/
- proto.run = function run(callback) {
- // should not get here in the explain case
- if(this.explain) throw new Error("Should not be running a pipeline in explain mode!");
- /* NOTE: DEVIATION FROM MONGO SOURCE. WE'RE SUPPORTING SYNC AND ASYNC */
- if(this.SYNC_MODE) {
- callback();
- return this._runSync();
- } else {
- return this._runAsync(callback);
- }
- };
- /**
- * Get the last document source in the pipeline
- * @method _getFinalSource
- * @return {Object} The DocumentSource at the end of the pipeline
- * @private
- **/
- proto._getFinalSource = function _getFinalSource() {
- return this.sources[this.sources.length - 1];
- };
- /**
- * Run the pipeline synchronously
- * @method _runSync
- * @return {Object} The results object {result:resultArray}
- * @private
- **/
- proto._runSync = function _runSync(callback) {
- var resultArray = [],
- finalSource = this._getFinalSource(),
- handleErr = function(err) {
- if(err) throw err;
- },
- next;
- while((next = finalSource.getNext(handleErr)) !== DocumentSource.EOF) {
- resultArray.push(next);
- }
- return {result:resultArray};
- };
- /**
- * Run the pipeline asynchronously
- * @method _runAsync
- * @param callback {Function} callback(err, resultObject)
- * @private
- **/
- proto._runAsync = function _runAsync(callback) {
- var resultArray = [],
- finalSource = this._getFinalSource(),
- gotNext = function(err, doc) {
- if(err) return callback(err);
- if(doc !== DocumentSource.EOF) {
- resultArray.push(doc);
- return setImmediate(function() { //setImmediate to avoid callstack size issues
- finalSource.getNext(gotNext);
- });
- } else {
- return callback(null, {result:resultArray});
- }
- };
- finalSource.getNext(gotNext);
- };
- /**
- * Get the pipeline explanation
- * @method writeExplainOps
- * @return {Array} An array of source explanations
- **/
- proto.writeExplainOps = function writeExplainOps() {
- var array = [];
- this.sources.forEach(function(source) {
- source.serializeToArray(array, /*explain=*/true);
- });
- return array;
- };
- /**
- * Set the source of documents for the pipeline
- * @method addInitialSource
- * @param source {DocumentSource}
- **/
- proto.addInitialSource = function addInitialSource(source) {
- this.sources.unshift(source);
- };
|