123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271 |
- "use strict";
- var async = require("async");
- /**
- * mongodb "commands" (sent via db.$cmd.findOne(...)) subclass to make a command. define a singleton object for it.
- * @class Pipeline
- * @namespace mungedb-aggregate.pipeline
- * @module mungedb-aggregate
- * @constructor
- **/
- // CONSTRUCTOR
- var Pipeline = module.exports = function Pipeline(theCtx){
- this.collectionName = null;
- this.sourceVector = null;
- this.explain = false;
- this.splitMongodPipeline = false;
- this.ctx = theCtx;
- }, klass = Pipeline, base = Object, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
- var DocumentSource = require("./documentSources/DocumentSource"),
- LimitDocumentSource = require('./documentSources/LimitDocumentSource'),
- MatchDocumentSource = require('./documentSources/MatchDocumentSource'),
- ProjectDocumentSource = require('./documentSources/ProjectDocumentSource'),
- SkipDocumentSource = require('./documentSources/SkipDocumentSource'),
- UnwindDocumentSource = require('./documentSources/UnwindDocumentSource'),
- GroupDocumentSource = require('./documentSources/GroupDocumentSource'),
- SortDocumentSource = require('./documentSources/SortDocumentSource');
- klass.COMMAND_NAME = "aggregate";
- klass.PIPELINE_NAME = "pipeline";
- klass.EXPLAIN_NAME = "explain";
- klass.FROM_ROUTER_NAME = "fromRouter";
- klass.SPLIT_MONGOD_PIPELINE_NAME = "splitMongodPipeline";
- klass.SERVER_PIPELINE_NAME = "serverPipeline";
- klass.MONGOS_PIPELINE_NAME = "mongosPipeline";
- klass.stageDesc = {};//attaching this to the class for test cases
- klass.stageDesc[LimitDocumentSource.limitName] = LimitDocumentSource.createFromJson;
- klass.stageDesc[MatchDocumentSource.matchName] = MatchDocumentSource.createFromJson;
- klass.stageDesc[ProjectDocumentSource.projectName] = ProjectDocumentSource.createFromJson;
- klass.stageDesc[SkipDocumentSource.skipName] = SkipDocumentSource.createFromJson;
- klass.stageDesc[UnwindDocumentSource.unwindName] = UnwindDocumentSource.createFromJson;
- klass.stageDesc[GroupDocumentSource.groupName] = GroupDocumentSource.createFromJson;
- klass.stageDesc[SortDocumentSource.sortName] = SortDocumentSource.createFromJson;
- /**
- * Create an `Array` of `DocumentSource`s from the given JSON pipeline
- * // NOTE: DEVIATION FROM MONGO: split out into a separate function to better allow extensions (was in parseCommand)
- * @static
- * @method parseDocumentSources
- * @param pipeline {Array} The JSON pipeline
- * @returns {Array} The parsed `DocumentSource`s
- **/
- klass.parseDocumentSources = function parseDocumentSources(pipeline, ctx){
- var sourceVector = [];
- for (var nSteps = pipeline.length, iStep = 0; iStep < nSteps; ++iStep) {
- // pull out the pipeline element as an object
- var pipeElement = pipeline[iStep];
- if (!(pipeElement instanceof Object)) throw new Error("pipeline element " + iStep + " is not an object; code 15942");
- var obj = pipeElement;
- // Parse a pipeline stage from 'obj'.
- if (Object.keys(obj).length !== 1) throw new Error("A pipeline stage specification object must contain exactly one field; code 16435");
- var stageName = Object.keys(obj)[0],
- stageSpec = obj[stageName];
- // Create a DocumentSource pipeline stage from 'stageSpec'.
- var desc = klass.stageDesc[stageName];
- if (!desc) throw new Error("Unrecognized pipeline stage name: '" + stageName + "'; code 16435");
- // Parse the stage
- var stage = desc(stageSpec, ctx);
- if (!stage) throw new Error("Stage must not be undefined!");
- stage.setPipelineStep(iStep);
- sourceVector.push(stage);
- }
- return sourceVector;
- };
- /**
- * Create a pipeline from the command.
- * @static
- * @method parseCommand
- * @param cmdObj {Object} The command object sent from the client
- * @param cmdObj.aggregate {Array} the thing to aggregate against; // NOTE: DEVIATION FROM MONGO: expects an Array of inputs rather than a collection name
- * @param cmdObj.pipeline {Object} the JSON pipeline of `DocumentSource` specs
- * @param cmdObj.explain {Boolean} should explain?
- * @param cmdObj.fromRouter {Boolean} is from router?
- * @param cmdObj.splitMongodPipeline {Boolean} should split?
- * @param ctx {Object} Not used yet in mungedb-aggregate
- * @returns {Array} the pipeline, if created, otherwise a NULL reference
- **/
- klass.parseCommand = function parseCommand(cmdObj, ctx){
- var pipelineNamespace = require("./"),
- Pipeline = pipelineNamespace.Pipeline, // using require in case Pipeline gets replaced with an extension
- pipelineInst = new Pipeline(ctx);
- //gather the specification for the aggregation
- var pipeline;
- for(var fieldName in cmdObj){
- var cmdElement = cmdObj[fieldName];
- if(fieldName == klass.COMMAND_NAME) pipelineInst.collectionName = cmdElement; //look for the aggregation command
- else if(fieldName == klass.PIPELINE_NAME) pipeline = cmdElement; //check for the pipeline of JSON doc srcs
- else if(fieldName == klass.EXPLAIN_NAME) pipelineInst.explain = cmdElement; //check for explain option
- else if(fieldName == klass.FROM_ROUTER_NAME) pipelineInst.fromRouter = cmdElement; //if the request came from the router, we're in a shard
- else if(fieldName == klass.SPLIT_MONGOD_PIPELINE_NAME) pipelineInst.splitMongodPipeline = cmdElement; //check for debug options
- // NOTE: DEVIATION FROM MONGO: Not implementing: "Ignore $auth information sent along with the command. The authentication system will use it, it's not a part of the pipeline."
- else throw new Error("unrecognized field " + JSON.stringify(fieldName));
- }
- /**
- * If we get here, we've harvested the fields we expect for a pipeline
- * Set up the specified document source pipeline.
- **/
- // NOTE: DEVIATION FROM MONGO: split this into a separate function to simplify and better allow for extensions (now in parseDocumentSources)
- var sourceVector = pipelineInst.sourceVector = Pipeline.parseDocumentSources(pipeline, ctx);
- /* if there aren't any pipeline stages, there's nothing more to do */
- if (!sourceVector.length) return pipelineInst;
- /* Move filters up where possible.
- CW TODO -- move filter past projections where possible, and noting corresponding field renaming.
- */
- /*
- Wherever there is a match immediately following a sort, swap them.
- This means we sort fewer items. Neither changes the documents in the stream, so this transformation shouldn't affect the result.
- We do this first, because then when we coalesce operators below, any adjacent matches will be combined.
- */
- for(var srcn = sourceVector.length, srci = 1; srci < srcn; ++srci) {
- var source = sourceVector[srci];
- if (source instanceof MatchDocumentSource) {
- var previous = sourceVector[srci - 1];
- if (previous instanceof SortDocumentSource) {
- /* swap this item with the previous */
- sourceVector[srci - 1] = source;
- sourceVector[srci] = previous;
- }
- }
- }
- /*
- Coalesce adjacent filters where possible. Two adjacent filters are equivalent to one filter whose predicate is the conjunction of the two original filters' predicates.
- For now, capture this by giving any DocumentSource the option to absorb it's successor; this will also allow adjacent projections to coalesce when possible.
- Run through the DocumentSources, and give each one the opportunity to coalesce with its successor. If successful, remove the successor.
- Move all document sources to a temporary list.
- */
- var tempVector = sourceVector.slice(0);
- sourceVector.length = 0;
- // move the first one to the final list
- sourceVector.push(tempVector[0]);
- // run through the sources, coalescing them or keeping them
- for(var tempn = tempVector.length, tempi = 1; tempi < tempn; ++tempi) {
- /*
- If we can't coalesce the source with the last, then move it to the final list, and make it the new last.
- (If we succeeded, then we're still on the same last, and there's no need to move or do anything with the source -- the destruction of tempVector will take care of the rest.)
- */
- var lastSource = sourceVector[sourceVector.length - 1],
- temp = tempVector[tempi];
- if (!temp || !lastSource) throw new Error("null document sources found");
- if (!lastSource.coalesce(temp)){
- sourceVector.push(temp);
- }
- }
- // optimize the elements in the pipeline
- for(var i = 0, l = sourceVector.length; i<l; i++) {
- var iter = sourceVector[i];
- if (!iter) throw new Error("Pipeline received empty document as argument");
- iter.optimize();
- }
- return pipelineInst;
- };
- // sync callback for Pipeline#run if omitted
- klass.SYNC_CALLBACK = function(err, results){
- if (err) throw err;
- return results.result;
- };
- function ifError(err) {
- if (err) throw err;
- }
- /**
- * Run the pipeline
- * @method run
- * @param inputSource {DocumentSource} The input document source for the pipeline
- * @param [callback] {Function} Optional callback function if using async extensions
- **/
- proto.run = function run(inputSource, callback){
- if (inputSource && !(inputSource instanceof DocumentSource)) throw new Error("arg `inputSource` must be an instance of DocumentSource");
- if (!callback) callback = klass.SYNC_CALLBACK;
- var self = this;
- if (callback === klass.SYNC_CALLBACK) { // SYNCHRONOUS MODE
- inputSource.setSource(undefined, ifError); //TODO: HACK: temp solution to the fact that we need to initialize our source since we're using setSource as a workaround for the lack of real async cursors
- var source = inputSource;
- for(var i = 0, l = self.sourceVector.length; i < l; i++){
- var temp = self.sourceVector[i];
- temp.setSource(source, ifError);
- source = temp;
- }
- /*
- Iterate through the resulting documents, and add them to the result.
- We do this even if we're doing an explain, in order to capture the document counts and other stats.
- However, we don't capture the result documents for explain.
- */
- var resultArray = [];
- try{
- for(var hasDoc = !source.eof(); hasDoc; hasDoc = source.advance()) {
- var document = source.getCurrent();
- resultArray.push(document); // add the document to the result set
- //Commenting out this assertion for munge. MUHAHAHA!!!
- // object will be too large, assert. the extra 1KB is for headers
- //if(resultArray.len() < BSONObjMaxUserSize - 1024) throw new Error("aggregation result exceeds maximum document size (" + BSONObjMaxUserSize / (1024 * 1024) + "MB); code 16389");
- }
- } catch (err) {
- return callback(err);
- }
- var result = {
- result: resultArray
- // ,ok: true; //not actually in here... where does this come from?
- };
- return callback(null, result);
- } else { // ASYNCHRONOUS MODE //TODO: move this up to a higher level package?
- return inputSource.setSource(undefined, function(err){ //TODO: HACK: temp solution to the fact that we need to initialize our source since we're using setSource as a workaround for the lack of real async cursors
- if (err) return callback(err);
- // chain together the sources we found
- var source = inputSource;
- async.eachSeries(
- self.sourceVector,
- function eachSrc(temp, next){
- temp.setSource(source, function(err){
- if (err) return next(err);
- source = temp;
- return next();
- });
- },
- function doneSrcs(err){ //source is left pointing at the last source in the chain
- if (err) return callback(err);
- /*
- Iterate through the resulting documents, and add them to the result.
- We do this even if we're doing an explain, in order to capture the document counts and other stats.
- However, we don't capture the result documents for explain.
- */
- // the array in which the aggregation results reside
- var resultArray = [];
- try{
- for(var hasDoc = !source.eof(); hasDoc; hasDoc = source.advance()) {
- var document = source.getCurrent();
- resultArray.push(document); // add the document to the result set
- //Commenting out this assertion for munge. MUHAHAHA!!!
- // object will be too large, assert. the extra 1KB is for headers
- //if(resultArray.len() < BSONObjMaxUserSize - 1024) throw new Error("aggregation result exceeds maximum document size (" + BSONObjMaxUserSize / (1024 * 1024) + "MB); code 16389");
- }
- } catch (err) {
- return callback(err);
- }
- var result = {
- result: resultArray
- // ,ok: true; //not actually in here... where does this come from?
- };
- return callback(null, result);
- }
- );
- });
- }
- };
|