"use strict"; var DocumentSource = require("./DocumentSource"), Accumulators = require("../accumulators/"), Document = require("../Document"), Expression = require("../expressions/Expression"), ConstantExpression = require("../expressions/ConstantExpression"), FieldPathExpression = require("../expressions/FieldPathExpression"), Variables = require("../expressions/Variables"), VariablesIdGenerator = require("../expressions/VariablesIdGenerator"), VariablesParseState = require("../expressions/VariablesParseState"), async = require("async"); /** * A class for grouping documents together * * @class GroupDocumentSource * @namespace mungedb-aggregate.pipeline.documentSources * @module mungedb-aggregate * @constructor * @param [expCtx] {ExpressionContext} **/ var GroupDocumentSource = module.exports = function GroupDocumentSource(expCtx) { if (arguments.length > 1) throw new Error("up to one arg expected"); expCtx = !expCtx ? {} : expCtx; base.call(this, expCtx); this.populated = false; this.doingMerge = false; this.spilled = false; this.extSortAllowed = expCtx.extSortAllowed && !expCtx.inRouter; this.accumulatorFactories = []; this.currentAccumulators = []; this.groups = {}; // GroupsType Value -> Accumulators[] this.groupsKeys = []; // This is to faciliate easier look up of groups this.originalGroupsKeys = []; this.variables = null; this.fieldNames = []; this.idFieldNames = []; this.expressions = []; this.idExpressions = []; this.currentGroupsKeysIndex = 0; }, klass = GroupDocumentSource, base = DocumentSource, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}}); // TODO: Do we need this? klass.groupOps = { "$addToSet": Accumulators.AddToSetAccumulator.create, "$avg": Accumulators.AvgAccumulator.create, "$first": Accumulators.FirstAccumulator.create, "$last": Accumulators.LastAccumulator.create, "$max": Accumulators.MinMaxAccumulator.createMax, // $min and $max have special constructors because they share base features "$min": Accumulators.MinMaxAccumulator.createMin, "$push": Accumulators.PushAccumulator.create, "$sum": Accumulators.SumAccumulator.create, }; klass.groupName = "$group"; /** * Factory for making GroupDocumentSources * * @method create * @static * @param [expCtx] {ExpressionContext} **/ klass.create = function create(expCtx) { return new GroupDocumentSource(expCtx); }; /** * Factory for making GroupDocumentSources * * @method getSourceName * @return {GroupDocumentSource} **/ proto.getSourceName = function getSourceName() { return klass.groupName; }; /** * Gets the next document or null if none * * @method getNext * @return {Object} **/ proto.getNext = function getNext(callback) { if (!callback) throw new Error(this.getSourceName() + ' #getNext() requires callback.'); if (this.expCtx.checkForInterrupt && this.expCtx.checkForInterrupt() === false) return callback(new Error("Interrupted")); var self = this; async.series([ function(next) { if (!self.populated) self.populate(function(err) { return next(err); }); else return next(); }, function(next) { // NOTE: Skipped the spilled functionality if (self.spilled) { throw new Error("Spilled is not implemented."); } else { if(self.currentGroupsKeysIndex === self.groupsKeys.length) { return next(null, null); } var id = self.originalGroupsKeys[self.currentGroupsKeysIndex], stringifiedId = self.groupsKeys[self.currentGroupsKeysIndex], accumulators = self.groups[stringifiedId], out = self.makeDocument(id, accumulators, self.expCtx.inShard); if(++self.currentGroupsKeysIndex === self.groupsKeys.length) { self.dispose(); } return next(null, out); } } ], function(err, results) { callback(err, results[1]); }); }; /** * Sets this source as apparently empty * * @method dispose **/ proto.dispose = function dispose() { //NOTE: Skipped 'freeing' our resources; at best we could remove some references to things, but our parent will probably forget us anyways! // make us look done this.currentGroupsKeysIndex = this.groupsKeys.length; // free our source's resources this.source.dispose(); }; /** * Optimizes the expressions in the group * @method optimize **/ proto.optimize = function optimize() { // TODO if all _idExpressions are ExpressionConstants after optimization, then we know there // will only be one group. We should take advantage of that to avoid going through the hash // table. var self = this; self.idExpressions.forEach(function(expression, i) { self.idExpressions[i] = expression.optimize(); }); self.expressions.forEach(function(expression, i) { self.expressions[i] = expression.optimize(); }); }; /** * Create an object that represents the document source. The object * will have a single field whose name is the source's name. * * @method serialize * @param explain {Boolean} Create explain output **/ proto.serialize = function serialize(explain) { var self = this, insides = {}; // add the _id if (self.idFieldNames.length === 0) { if (self.idExpressions.length !== 1) throw new Error("Should only have one _id field"); insides._id = self.idExpressions[0].serialize(explain); } else { if (self.idExpressions.length !== self.idFieldNames.length) throw new Error("Should have the same number of idExpressions and idFieldNames."); var md = {}; self.idExpressions.forEach(function(expression, i) { md[self.idFieldNames[i]] = expression.serialize(explain); }); insides._id = md; } //add the remaining fields var aFacs = self.accumulatorFactories, aFacLen = aFacs.length; for(var i=0; i < aFacLen; i++) { var aFac = new aFacs[i](), serialExpression = self.expressions[i].serialize(explain), //Get the accumulator's expression serialAccumulator = {}; //Where we'll put the expression serialAccumulator[aFac.getOpName()] = serialExpression; insides[self.fieldNames[i]] = serialAccumulator; } var serialSource = {}; serialSource[self.getSourceName()] = insides; return serialSource; }; /** * Creates a GroupDocumentSource from the given elem * * @method createFromJson * @param elem {Object} The group specification object; the right hand side of the $group **/ klass.createFromJson = function createFromJson(elem, expCtx) { if (!(elem instanceof Object && elem.constructor === Object)) throw new Error("a group's fields must be specified in an object"); var group = GroupDocumentSource.create(expCtx), idSet = false; var groupObj = elem, idGenerator = new VariablesIdGenerator(), vps = new VariablesParseState(idGenerator); for (var groupFieldName in groupObj) { if (groupObj.hasOwnProperty(groupFieldName)) { var groupField = groupObj[groupFieldName]; if (groupFieldName === "_id") { if(idSet) throw new Error("15948 a group's _id may only be specified once"); group.parseIdExpression(groupField, vps); idSet = true; } else if (groupFieldName === '$doingMerge' && groupField) { throw new Error("17030 $doingMerge should be true if present"); } else { /* Treat as a projection field with the additional ability to add aggregation operators. */ if (groupFieldName.indexOf(".") !== -1) throw new Error("16414 the group aggregate field name '" + groupFieldName + "' cannot contain '.'"); if (groupFieldName[0] === "$") throw new Error("15950 the group aggregate field name '" + groupFieldName + "' cannot be an operator name"); if (group._getTypeStr(groupFieldName) === "Object") throw new Error("15951 the group aggregate field '" + groupFieldName + "' must be defined as an expression inside an object"); var subElementCount = 0; for (var subElementName in groupField) { if (groupField.hasOwnProperty(subElementName)) { var subElement = groupField[subElementName], op = klass.groupOps[subElementName]; if (!op) throw new Error("15952 unknown group operator '" + subElementName + "'"); var groupExpression, subElementTypeStr = group._getTypeStr(subElement); if (subElementTypeStr === "Object") { var subElementObjCtx = new Expression.ObjectCtx({isDocumentOk:true}); groupExpression = Expression.parseObject(subElement, subElementObjCtx, vps); } else if (subElementTypeStr === "Array") { throw new Error("15953 aggregating group operators are unary (" + subElementName + ")"); } else { /* assume its an atomic single operand */ groupExpression = Expression.parseOperand(subElement, vps); } group.addAccumulator(groupFieldName, op, groupExpression); ++subElementCount; } } if (subElementCount !== 1) throw new Error("15954 the computed aggregate '" + groupFieldName + "' must specify exactly one operator"); } } } if (!idSet) throw new Error("15955 a group specification must include an _id"); group.variables = new Variables(idGenerator.getIdCount()); return group; }; /** * Populates the GroupDocumentSource by grouping all of the input documents at once. * * @method populate * @param callback {Function} Required. callback(err) when done populating. * @async **/ proto.populate = function populate(callback) { var numAccumulators = this.accumulatorFactories.length; // NOTE: this is not in mongo, does it belong here? if(numAccumulators !== this.expressions.length) { callback(new Error("Must have equal number of accumulators and expressions")); } var input, self = this; async.whilst( function() { return input !== null; }, function(cb) { self.source.getNext(function(err, doc) { if(err) return cb(err); if(doc === null) { input = doc; return cb(); //Need to stop now, no new input } input = doc; self.variables.setRoot(input); /* get the _id value */ var id = self.computeId(self.variables); if(undefined === id) id = null; var groupKey = JSON.stringify(id), group = self.groups[groupKey]; if(!group) { self.originalGroupsKeys.push(id); self.groupsKeys.push(groupKey); group = []; self.groups[groupKey] = group; // Add the accumulators for(var afi = 0; afi