|
|
@@ -1,5 +1,20 @@
|
|
|
"use strict";
|
|
|
-var async = require('async');
|
|
|
+var async = require("async"),
|
|
|
+ DepsTracker = require("./DepsTracker"),
|
|
|
+ documentSources = require("./documentSources/"),
|
|
|
+ DocumentSource = documentSources.DocumentSource,
|
|
|
+ LimitDocumentSource = documentSources.LimitDocumentSource,
|
|
|
+ MatchDocumentSource = documentSources.MatchDocumentSource,
|
|
|
+ ProjectDocumentSource = documentSources.ProjectDocumentSource,
|
|
|
+ SkipDocumentSource = documentSources.SkipDocumentSource,
|
|
|
+ UnwindDocumentSource = documentSources.UnwindDocumentSource,
|
|
|
+ GroupDocumentSource = documentSources.GroupDocumentSource,
|
|
|
+ OutDocumentSource = documentSources.OutDocumentSource,
|
|
|
+ GeoNearDocumentSource = documentSources.GeoNearDocumentSource,
|
|
|
+ RedactDocumentSource = documentSources.RedactDocumentSource,
|
|
|
+ SortDocumentSource = documentSources.SortDocumentSource;
|
|
|
+
|
|
|
+
|
|
|
/**
|
|
|
* mongodb "commands" (sent via db.$cmd.findOne(...)) subclass to make a command. define a singleton object for it.
|
|
|
* @class Pipeline
|
|
|
@@ -7,26 +22,12 @@ var async = require('async');
|
|
|
* @module mungedb-aggregate
|
|
|
* @constructor
|
|
|
**/
|
|
|
-// CONSTRUCTOR
|
|
|
var Pipeline = module.exports = function Pipeline(theCtx){
|
|
|
this.sources = null;
|
|
|
this.explain = false;
|
|
|
this.splitMongodPipeline = false;
|
|
|
this.ctx = theCtx;
|
|
|
-}, klass = Pipeline, base = Object, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
|
|
|
-
|
|
|
-var DocumentSource = require("./documentSources/DocumentSource"),
|
|
|
- LimitDocumentSource = require('./documentSources/LimitDocumentSource'),
|
|
|
- MatchDocumentSource = require('./documentSources/MatchDocumentSource'),
|
|
|
- ProjectDocumentSource = require('./documentSources/ProjectDocumentSource'),
|
|
|
- SkipDocumentSource = require('./documentSources/SkipDocumentSource'),
|
|
|
- UnwindDocumentSource = require('./documentSources/UnwindDocumentSource'),
|
|
|
- GroupDocumentSource = require('./documentSources/GroupDocumentSource'),
|
|
|
- OutDocumentSource = require('./documentSources/OutDocumentSource'),
|
|
|
- GeoNearDocumentSource = require('./documentSources/GeoNearDocumentSource'),
|
|
|
- RedactDocumentSource = require('./documentSources/RedactDocumentSource'),
|
|
|
- SortDocumentSource = require('./documentSources/SortDocumentSource'),
|
|
|
- DepsTracker = require('./DepsTracker');
|
|
|
+}, klass = Pipeline, proto = klass.prototype;
|
|
|
|
|
|
klass.COMMAND_NAME = "aggregate";
|
|
|
klass.PIPELINE_NAME = "pipeline";
|
|
|
@@ -138,7 +139,7 @@ klass.optimizations.local.coalesceAdjacent = function coalesceAdjacent(pipelineI
|
|
|
var lastSource = sources[sources.length-1],
|
|
|
tempSrc = tempSources[tempi];
|
|
|
if(!(lastSource && tempSrc)) {
|
|
|
- throw new Error('Must have a last and current source'); // verify(lastSource && tempSrc);
|
|
|
+ throw new Error("Must have a last and current source"); // verify(lastSource && tempSrc);
|
|
|
}
|
|
|
if(!lastSource.coalesce(tempSrc)) sources.push(tempSrc);
|
|
|
}
|
|
|
@@ -229,7 +230,9 @@ klass.optimizations.sharded.findSplitPoint = function findSplitPoint(shardPipe,
|
|
|
*/
|
|
|
klass.optimizations.sharded.moveFinalUnwindFromShardsToMerger = function moveFinalUnwindFromShardsToMerger(shardPipe, mergePipe) {
|
|
|
if (true) {
|
|
|
- while(shardPipe.sources !== null && (shardPipe.sources.length > 0 && shardPipe.sources[shardPipe.sources.length-1] instanceof UnwindDocumentSource)) {
|
|
|
+ while (shardPipe.sources !== null &&
|
|
|
+ shardPipe.sources.length > 0 &&
|
|
|
+ shardPipe.sources[shardPipe.sources.length - 1] instanceof UnwindDocumentSource) {
|
|
|
mergePipe.sources.unshift(shardPipe.sources.pop());
|
|
|
}
|
|
|
}
|
|
|
@@ -255,7 +258,8 @@ klass.parseDocumentSources = function parseDocumentSources(pipeline, ctx){
|
|
|
var obj = pipeElement;
|
|
|
|
|
|
// Parse a pipeline stage from 'obj'.
|
|
|
- if (Object.keys(obj).length !== 1) throw new Error("A pipeline stage specification object must contain exactly one field; code 16435");
|
|
|
+ if (Object.keys(obj).length !== 1)
|
|
|
+ throw new Error("A pipeline stage specification object must contain exactly one field; code 16435");
|
|
|
var stageName = Object.keys(obj)[0],
|
|
|
stageSpec = obj[stageName];
|
|
|
|
|
|
@@ -280,7 +284,7 @@ klass.parseDocumentSources = function parseDocumentSources(pipeline, ctx){
|
|
|
* @static
|
|
|
* @method parseCommand
|
|
|
* @param cmdObj {Object} The command object sent from the client
|
|
|
- * @param cmdObj.aggregate {Array} the thing to aggregate against; // NOTE: DEVIATION FROM MONGO: expects an Array of inputs rather than a collection name
|
|
|
+ * @param cmdObj.aggregate {Array} the thing to aggregate against // NOTE: DEVIATION FROM MONGO: not a collection name
|
|
|
* @param cmdObj.pipeline {Object} the JSON pipeline of `DocumentSource` specs
|
|
|
* @param cmdObj.explain {Boolean} should explain?
|
|
|
* @param cmdObj.fromRouter {Boolean} is from router?
|
|
|
@@ -295,26 +299,34 @@ klass.parseCommand = function parseCommand(cmdObj, ctx){
|
|
|
|
|
|
//gather the specification for the aggregation
|
|
|
var pipeline;
|
|
|
- for(var fieldName in cmdObj){
|
|
|
+ for (var fieldName in cmdObj) { //jshint ignore:line
|
|
|
var cmdElement = cmdObj[fieldName];
|
|
|
- if(fieldName[0] == "$") continue;
|
|
|
- else if(fieldName == "cursor") continue;
|
|
|
- else if(fieldName == klass.COMMAND_NAME) continue; //look for the aggregation command
|
|
|
- else if(fieldName == klass.BATCH_SIZE_NAME) continue;
|
|
|
- else if(fieldName == klass.PIPELINE_NAME) pipeline = cmdElement; //check for the pipeline of JSON doc srcs
|
|
|
- else if(fieldName == klass.EXPLAIN_NAME) pipelineInst.explain = cmdElement; //check for explain option
|
|
|
- else if(fieldName == klass.FROM_ROUTER_NAME) ctx.inShard = cmdElement; //if the request came from the router, we're in a shard
|
|
|
- else if(fieldName == "allowDiskUsage") {
|
|
|
- if(typeof cmdElement !== 'boolean') throw new Error("allowDiskUsage must be a bool, not a " + typeof allowDiskUsage+ "; uassert code 16949");
|
|
|
- }
|
|
|
- else throw new Error("unrecognized field " + JSON.stringify(fieldName));
|
|
|
+ if (fieldName[0] === "$")
|
|
|
+ continue;
|
|
|
+ else if (fieldName === "cursor")
|
|
|
+ continue;
|
|
|
+ else if (fieldName === klass.COMMAND_NAME)
|
|
|
+ continue; //look for the aggregation command
|
|
|
+ else if (fieldName === klass.BATCH_SIZE_NAME)
|
|
|
+ continue;
|
|
|
+ else if (fieldName === klass.PIPELINE_NAME)
|
|
|
+ pipeline = cmdElement; //check for the pipeline of JSON doc srcs
|
|
|
+ else if (fieldName === klass.EXPLAIN_NAME)
|
|
|
+ pipelineInst.explain = cmdElement; //check for explain option
|
|
|
+ else if (fieldName === klass.FROM_ROUTER_NAME)
|
|
|
+ ctx.inShard = cmdElement; //if the request came from the router, we're in a shard
|
|
|
+ else if (fieldName === "allowDiskUsage") {
|
|
|
+ if (typeof cmdElement !== "boolean")
|
|
|
+ throw new Error("allowDiskUsage must be a bool, not a " + typeof allowDiskUsage + "; uassert code 16949");
|
|
|
+ } else
|
|
|
+ throw new Error("unrecognized field " + JSON.stringify(fieldName));
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* If we get here, we've harvested the fields we expect for a pipeline
|
|
|
* Set up the specified document source pipeline.
|
|
|
*/
|
|
|
- // NOTE: DEVIATION FROM MONGO: split this into a separate function to simplify and better allow for extensions (now in parseDocumentSources)
|
|
|
+ // NOTE: DEVIATION FROM MONGO: split this into a separate function to simplify facilitate extensions (now in parseDocumentSources)
|
|
|
pipelineInst.sources = Pipeline.parseDocumentSources(pipeline, ctx);
|
|
|
klass.optimizations.local.moveMatchBeforeSort(pipelineInst);
|
|
|
klass.optimizations.local.moveLimitBeforeSkip(pipelineInst);
|
|
|
@@ -325,10 +337,6 @@ klass.parseCommand = function parseCommand(cmdObj, ctx){
|
|
|
return pipelineInst;
|
|
|
};
|
|
|
|
|
|
-function ifError(err) {
|
|
|
- if (err) throw err;
|
|
|
-}
|
|
|
-
|
|
|
/**
|
|
|
* Gets the initial $match query when $match is the first pipeline stage
|
|
|
* @method run
|
|
|
@@ -368,7 +376,7 @@ proto.serialize = function serialize() {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- serialized[klass.COMMAND_NAME] = this.ctx && this.ctx.ns && this.ctx.ns.coll ? this.ctx.ns.coll : '';
|
|
|
+ serialized[klass.COMMAND_NAME] = this.ctx && this.ctx.ns && this.ctx.ns.coll ? this.ctx.ns.coll : "";
|
|
|
serialized[klass.PIPELINE_NAME] = array;
|
|
|
|
|
|
if(this.explain) serialized[klass.EXPLAIN_NAME] = this.explain;
|
|
|
@@ -475,7 +483,7 @@ proto.getDependencies = function getDependencies () {
|
|
|
}
|
|
|
|
|
|
if (!knowAllFields) {
|
|
|
- for (var key in localDeps.fields)
|
|
|
+ for (var key in localDeps.fields) //jshint ignore:line
|
|
|
deps.fields[key] = localDeps.fields[key];
|
|
|
|
|
|
if (localDeps.needWholeDocument)
|