|
@@ -4,16 +4,20 @@ var DocumentSource = require("./DocumentSource"),
|
|
|
Document = require("../Document"),
|
|
Document = require("../Document"),
|
|
|
Expression = require("../expressions/Expression"),
|
|
Expression = require("../expressions/Expression"),
|
|
|
ConstantExpression = require("../expressions/ConstantExpression"),
|
|
ConstantExpression = require("../expressions/ConstantExpression"),
|
|
|
- FieldPathExpression = require("../expressions/FieldPathExpression");
|
|
|
|
|
-
|
|
|
|
|
|
|
+ FieldPathExpression = require("../expressions/FieldPathExpression"),
|
|
|
|
|
+ Variables = require("../expressions/Variables"),
|
|
|
|
|
+ VariablesIdGenerator = require("../expressions/VariablesIdGenerator"),
|
|
|
|
|
+ VariablesParseState = require("../expressions/VariablesParseState"),
|
|
|
|
|
+ async = require("async");
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* A class for grouping documents together
|
|
* A class for grouping documents together
|
|
|
|
|
+ *
|
|
|
* @class GroupDocumentSource
|
|
* @class GroupDocumentSource
|
|
|
* @namespace mungedb-aggregate.pipeline.documentSources
|
|
* @namespace mungedb-aggregate.pipeline.documentSources
|
|
|
* @module mungedb-aggregate
|
|
* @module mungedb-aggregate
|
|
|
* @constructor
|
|
* @constructor
|
|
|
- * @param [ctx] {ExpressionContext}
|
|
|
|
|
|
|
+ * @param [expCtx] {ExpressionContext}
|
|
|
**/
|
|
**/
|
|
|
var GroupDocumentSource = module.exports = function GroupDocumentSource(expCtx) {
|
|
var GroupDocumentSource = module.exports = function GroupDocumentSource(expCtx) {
|
|
|
if (arguments.length > 1) throw new Error("up to one arg expected");
|
|
if (arguments.length > 1) throw new Error("up to one arg expected");
|
|
@@ -24,7 +28,7 @@ var GroupDocumentSource = module.exports = function GroupDocumentSource(expCtx)
|
|
|
this.groups = {}; // GroupsType Value -> Accumulators[]
|
|
this.groups = {}; // GroupsType Value -> Accumulators[]
|
|
|
this.groupsKeys = []; // This is to faciliate easier look up of groups
|
|
this.groupsKeys = []; // This is to faciliate easier look up of groups
|
|
|
this.originalGroupsKeys = []; // This stores the original group key un-hashed/stringified/whatever
|
|
this.originalGroupsKeys = []; // This stores the original group key un-hashed/stringified/whatever
|
|
|
-
|
|
|
|
|
|
|
+ this._variables = null;
|
|
|
this.fieldNames = [];
|
|
this.fieldNames = [];
|
|
|
this.accumulatorFactories = [];
|
|
this.accumulatorFactories = [];
|
|
|
this.expressions = [];
|
|
this.expressions = [];
|
|
@@ -38,7 +42,7 @@ klass.groupOps = {
|
|
|
"$avg": Accumulators.Avg,
|
|
"$avg": Accumulators.Avg,
|
|
|
"$first": Accumulators.First,
|
|
"$first": Accumulators.First,
|
|
|
"$last": Accumulators.Last,
|
|
"$last": Accumulators.Last,
|
|
|
- "$max": Accumulators.MinMax.createMax,
|
|
|
|
|
|
|
+ "$max": Accumulators.MinMax.createMax, // $min and $max have special constructors because they share base features
|
|
|
"$min": Accumulators.MinMax.createMin,
|
|
"$min": Accumulators.MinMax.createMin,
|
|
|
"$push": Accumulators.Push,
|
|
"$push": Accumulators.Push,
|
|
|
"$sum": Accumulators.Sum
|
|
"$sum": Accumulators.Sum
|
|
@@ -46,43 +50,142 @@ klass.groupOps = {
|
|
|
|
|
|
|
|
klass.groupName = "$group";
|
|
klass.groupName = "$group";
|
|
|
|
|
|
|
|
|
|
+/**
|
|
|
|
|
+ * Factory for making GroupDocumentSources
|
|
|
|
|
+ *
|
|
|
|
|
+ * @method create
|
|
|
|
|
+ * @static
|
|
|
|
|
+ * @param [expCtx] {ExpressionContext}
|
|
|
|
|
+ **/
|
|
|
|
|
+klass.create = function create(expCtx) {
|
|
|
|
|
+ return new GroupDocumentSource(expCtx);
|
|
|
|
|
+};
|
|
|
|
|
+
|
|
|
|
|
+/**
|
|
|
|
|
+ * Factory for making GroupDocumentSources
|
|
|
|
|
+ *
|
|
|
|
|
+ * @method getSourceName
|
|
|
|
|
+ * @return {GroupDocumentSource}
|
|
|
|
|
+ **/
|
|
|
proto.getSourceName = function getSourceName() {
|
|
proto.getSourceName = function getSourceName() {
|
|
|
return klass.groupName;
|
|
return klass.groupName;
|
|
|
};
|
|
};
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * Create an object that represents the document source. The object
|
|
|
|
|
- * will have a single field whose name is the source's name. This
|
|
|
|
|
- * will be used by the default implementation of addToJsonArray()
|
|
|
|
|
- * to add this object to a pipeline being represented in JSON.
|
|
|
|
|
|
|
+ * Gets the next document or DocumentSource.EOF if none
|
|
|
*
|
|
*
|
|
|
- * @method sourceToJson
|
|
|
|
|
- * @param {Object} builder JSONObjBuilder: a blank object builder to write to
|
|
|
|
|
- * @param {Boolean} explain create explain output
|
|
|
|
|
|
|
+ * @method getNext
|
|
|
|
|
+ * @return {Object}
|
|
|
**/
|
|
**/
|
|
|
-proto.sourceToJson = function sourceToJson(builder, explain) {
|
|
|
|
|
- var idExp = this.idExpression,
|
|
|
|
|
- insides = {
|
|
|
|
|
- _id: idExp ? idExp.toJSON() : {}
|
|
|
|
|
|
|
+proto.getNext = function getNext(callback) {
|
|
|
|
|
+ var self = this;
|
|
|
|
|
+ async.series([
|
|
|
|
|
+ function(next) {
|
|
|
|
|
+ if (!self.populated)
|
|
|
|
|
+ self.populate(function(err) {
|
|
|
|
|
+ return next(err);
|
|
|
|
|
+ });
|
|
|
|
|
+ else
|
|
|
|
|
+ return next();
|
|
|
},
|
|
},
|
|
|
- aFac = this.accumulatorFactories,
|
|
|
|
|
- aFacLen = aFac.length;
|
|
|
|
|
|
|
+ function(next) {
|
|
|
|
|
+ if(Object.keys(self.groups).length === 0) {
|
|
|
|
|
+ return next(null, DocumentSource.EOF);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ //Note: Skipped the spilled logic
|
|
|
|
|
+
|
|
|
|
|
+ if(self.currentGroupsKeysIndex === self.groupsKeys.length) {
|
|
|
|
|
+ return next(null, DocumentSource.EOF);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ var id = self.groupsKeys[self.currentGroupsKeysIndex],
|
|
|
|
|
+ accumulators = self.groups[id],
|
|
|
|
|
+ out = self.makeDocument(id, accumulators /*,mergeableOutput*/);
|
|
|
|
|
+
|
|
|
|
|
+ if(++self.currentGroupsKeysIndex === self.groupsKeys.length) {
|
|
|
|
|
+ self.dispose();
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ return next(null, out);
|
|
|
|
|
+ }
|
|
|
|
|
+ ], function(err, results) {
|
|
|
|
|
+ callback(err, results[1]);
|
|
|
|
|
+ });
|
|
|
|
|
+};
|
|
|
|
|
+
|
|
|
|
|
+/**
|
|
|
|
|
+ * Sets this source as apparently empty
|
|
|
|
|
+ *
|
|
|
|
|
+ * @method dispose
|
|
|
|
|
+ **/
|
|
|
|
|
+proto.dispose = function dispose() {
|
|
|
|
|
+ //NOTE: Skipped 'freeing' our resources; at best we could remove some references to things, but our parent will probably forget us anyways!
|
|
|
|
|
|
|
|
- for(var i=0; i < aFacLen; ++i) {
|
|
|
|
|
- var acc = new aFac[i](/*pExpCtx*/);
|
|
|
|
|
- acc.addOperand(this.expressions[i]);
|
|
|
|
|
|
|
+ // make us look done
|
|
|
|
|
+ this.currentGroupsKeysIndex = this.groupsKeys.length;
|
|
|
|
|
|
|
|
- insides[this.fieldNames[i]] = acc.toJSON(true);
|
|
|
|
|
|
|
+ // free our source's resources
|
|
|
|
|
+ this.source.dispose();
|
|
|
|
|
+};
|
|
|
|
|
+
|
|
|
|
|
+/**
|
|
|
|
|
+ * Optimizes the expressions in the group
|
|
|
|
|
+ * @method optimize
|
|
|
|
|
+ **/
|
|
|
|
|
+proto.optimize = function optimize() {
|
|
|
|
|
+ var self = this;
|
|
|
|
|
+ self.idExpression = self.idExpression.optimize();
|
|
|
|
|
+ self.expressions.forEach(function(expression, i) {
|
|
|
|
|
+ self.expressions[i] = expression.optimize();
|
|
|
|
|
+ });
|
|
|
|
|
+};
|
|
|
|
|
+
|
|
|
|
|
+/**
|
|
|
|
|
+ * Create an object that represents the document source. The object
|
|
|
|
|
+ * will have a single field whose name is the source's name.
|
|
|
|
|
+ *
|
|
|
|
|
+ * @method serialize
|
|
|
|
|
+ * @param explain {Boolean} Create explain output
|
|
|
|
|
+ **/
|
|
|
|
|
+proto.serialize = function serialize(explain) {
|
|
|
|
|
+ var insides = {};
|
|
|
|
|
+
|
|
|
|
|
+ // add the _id
|
|
|
|
|
+ insides._id = this.idExpression.serialize(explain);
|
|
|
|
|
+
|
|
|
|
|
+ //add the remaining fields
|
|
|
|
|
+ var aFacs = this.accumulatorFactories,
|
|
|
|
|
+ aFacLen = aFacs.length;
|
|
|
|
|
+
|
|
|
|
|
+ for(var i=0; i < aFacLen; i++) {
|
|
|
|
|
+ var aFac = aFacs[i](),
|
|
|
|
|
+ serialExpression = this.expressions[i].serialize(explain), //Get the accumulator's expression
|
|
|
|
|
+ serialAccumulator = {}; //Where we'll put the expression
|
|
|
|
|
+ serialAccumulator[aFac.getOpName()] = serialExpression;
|
|
|
|
|
+ insides[this.fieldNames[i]] = serialAccumulator;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- builder[this.getSourceName()] = insides;
|
|
|
|
|
|
|
+ var serialSource = {};
|
|
|
|
|
+ serialSource[this.getSourceName()] = insides;
|
|
|
|
|
+ return serialSource;
|
|
|
};
|
|
};
|
|
|
|
|
|
|
|
-klass.createFromJson = function createFromJson(groupObj, ctx) {
|
|
|
|
|
- if (!(groupObj instanceof Object && groupObj.constructor === Object)) throw new Error("a group's fields must be specified in an object");
|
|
|
|
|
|
|
+/**
|
|
|
|
|
+ * Creates a GroupDocumentSource from the given elem
|
|
|
|
|
+ *
|
|
|
|
|
+ * @method createFromJson
|
|
|
|
|
+ * @param elem {Object} The group specification object; the right hand side of the $group
|
|
|
|
|
+ **/
|
|
|
|
|
+klass.createFromJson = function createFromJson(elem, expCtx) {
|
|
|
|
|
+ if (!(elem instanceof Object && elem.constructor === Object)) throw new Error("a group's fields must be specified in an object");
|
|
|
|
|
+
|
|
|
|
|
+ var group = GroupDocumentSource.create(expCtx),
|
|
|
|
|
+ idSet = false;
|
|
|
|
|
|
|
|
- var idSet = false,
|
|
|
|
|
- group = new GroupDocumentSource(ctx);
|
|
|
|
|
|
|
+ var groupObj = elem,
|
|
|
|
|
+ idGenerator = new VariablesIdGenerator(),
|
|
|
|
|
+ vps = new VariablesParseState(idGenerator);
|
|
|
|
|
|
|
|
for (var groupFieldName in groupObj) {
|
|
for (var groupFieldName in groupObj) {
|
|
|
if (groupObj.hasOwnProperty(groupFieldName)) {
|
|
if (groupObj.hasOwnProperty(groupFieldName)) {
|
|
@@ -93,106 +196,168 @@ klass.createFromJson = function createFromJson(groupObj, ctx) {
|
|
|
if(idSet) throw new Error("15948 a group's _id may only be specified once");
|
|
if(idSet) throw new Error("15948 a group's _id may only be specified once");
|
|
|
|
|
|
|
|
if (groupField instanceof Object && groupField.constructor === Object) {
|
|
if (groupField instanceof Object && groupField.constructor === Object) {
|
|
|
|
|
+ /*
|
|
|
|
|
+ Use the projection-like set of field paths to create the
|
|
|
|
|
+ group-by key.
|
|
|
|
|
+ */
|
|
|
var objCtx = new Expression.ObjectCtx({isDocumentOk:true});
|
|
var objCtx = new Expression.ObjectCtx({isDocumentOk:true});
|
|
|
- group.idExpression = Expression.parseObject(groupField, objCtx);
|
|
|
|
|
|
|
+ group.setIdExpression(Expression.parseObject(groupField, objCtx, vps));
|
|
|
idSet = true;
|
|
idSet = true;
|
|
|
|
|
|
|
|
} else if (typeof groupField === "string") {
|
|
} else if (typeof groupField === "string") {
|
|
|
- if (groupField[0] !== "$") {
|
|
|
|
|
- group.idExpression = new ConstantExpression(groupField);
|
|
|
|
|
- } else {
|
|
|
|
|
- var pathString = Expression.removeFieldPrefix(groupField);
|
|
|
|
|
- group.idExpression = new FieldPathExpression(pathString);
|
|
|
|
|
- }
|
|
|
|
|
- idSet = true;
|
|
|
|
|
-
|
|
|
|
|
- } else {
|
|
|
|
|
- var typeStr = group._getTypeStr(groupField);
|
|
|
|
|
- switch (typeStr) {
|
|
|
|
|
- case "number":
|
|
|
|
|
- case "string":
|
|
|
|
|
- case "boolean":
|
|
|
|
|
- case "Object":
|
|
|
|
|
- case "object": // null returns "object" Xp
|
|
|
|
|
- case "Array":
|
|
|
|
|
- group.idExpression = new ConstantExpression(groupField);
|
|
|
|
|
- idSet = true;
|
|
|
|
|
- break;
|
|
|
|
|
- default:
|
|
|
|
|
- throw new Error("a group's _id may not include fields of type " + typeStr + "");
|
|
|
|
|
|
|
+ if (groupField[0] === "$") {
|
|
|
|
|
+ group.setIdExpression(FieldPathExpression.parse(groupField, vps));
|
|
|
|
|
+ idSet = true;
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ if (!idSet) {
|
|
|
|
|
+ // constant id - single group
|
|
|
|
|
+ group.setIdExpression(ConstantExpression.create(groupField));
|
|
|
|
|
+ idSet = true;
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
} else {
|
|
} else {
|
|
|
|
|
+ /*
|
|
|
|
|
+ Treat as a projection field with the additional ability to
|
|
|
|
|
+ add aggregation operators.
|
|
|
|
|
+ */
|
|
|
if (groupFieldName.indexOf(".") !== -1) throw new Error("16414 the group aggregate field name '" + groupFieldName + "' cannot contain '.'");
|
|
if (groupFieldName.indexOf(".") !== -1) throw new Error("16414 the group aggregate field name '" + groupFieldName + "' cannot contain '.'");
|
|
|
if (groupFieldName[0] === "$") throw new Error("15950 the group aggregate field name '" + groupFieldName + "' cannot be an operator name");
|
|
if (groupFieldName[0] === "$") throw new Error("15950 the group aggregate field name '" + groupFieldName + "' cannot be an operator name");
|
|
|
if (group._getTypeStr(groupFieldName) === "Object") throw new Error("15951 the group aggregate field '" + groupFieldName + "' must be defined as an expression inside an object");
|
|
if (group._getTypeStr(groupFieldName) === "Object") throw new Error("15951 the group aggregate field '" + groupFieldName + "' must be defined as an expression inside an object");
|
|
|
|
|
|
|
|
- var subFieldCount = 0;
|
|
|
|
|
- for (var subFieldName in groupField) {
|
|
|
|
|
- if (groupField.hasOwnProperty(subFieldName)) {
|
|
|
|
|
- var subField = groupField[subFieldName],
|
|
|
|
|
- op = klass.groupOps[subFieldName];
|
|
|
|
|
- if (!op) throw new Error("15952 unknown group operator '" + subFieldName + "'");
|
|
|
|
|
|
|
+ var subElementCount = 0;
|
|
|
|
|
+ for (var subElementName in groupField) {
|
|
|
|
|
+ if (groupField.hasOwnProperty(subElementName)) {
|
|
|
|
|
+ var subElement = groupField[subElementName],
|
|
|
|
|
+ op = klass.groupOps[subElementName];
|
|
|
|
|
+ if (!op) throw new Error("15952 unknown group operator '" + subElementName + "'");
|
|
|
|
|
|
|
|
var groupExpression,
|
|
var groupExpression,
|
|
|
- subFieldTypeStr = group._getTypeStr(subField);
|
|
|
|
|
- if (subFieldTypeStr === "Object") {
|
|
|
|
|
- var subFieldObjCtx = new Expression.ObjectCtx({isDocumentOk:true});
|
|
|
|
|
- groupExpression = Expression.parseObject(subField, subFieldObjCtx);
|
|
|
|
|
- } else if (subFieldTypeStr === "Array") {
|
|
|
|
|
- throw new Error("15953 aggregating group operators are unary (" + subFieldName + ")");
|
|
|
|
|
- } else {
|
|
|
|
|
- groupExpression = Expression.parseOperand(subField);
|
|
|
|
|
|
|
+ subElementTypeStr = group._getTypeStr(subElement);
|
|
|
|
|
+ if (subElementTypeStr === "Object") {
|
|
|
|
|
+ var subElementObjCtx = new Expression.ObjectCtx({isDocumentOk:true});
|
|
|
|
|
+ groupExpression = Expression.parseObject(subElement, subElementObjCtx, vps);
|
|
|
|
|
+ } else if (subElementTypeStr === "Array") {
|
|
|
|
|
+ throw new Error("15953 aggregating group operators are unary (" + subElementName + ")");
|
|
|
|
|
+ } else { /* assume its an atomic single operand */
|
|
|
|
|
+ groupExpression = Expression.parseOperand(subElement, vps);
|
|
|
}
|
|
}
|
|
|
- group.addAccumulator(groupFieldName,op, groupExpression);
|
|
|
|
|
|
|
+ group.addAccumulator(groupFieldName, op, groupExpression);
|
|
|
|
|
|
|
|
- ++subFieldCount;
|
|
|
|
|
|
|
+ ++subElementCount;
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
- if (subFieldCount != 1) throw new Error("15954 the computed aggregate '" + groupFieldName + "' must specify exactly one operator");
|
|
|
|
|
|
|
+ if (subElementCount !== 1) throw new Error("15954 the computed aggregate '" + groupFieldName + "' must specify exactly one operator");
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
if (!idSet) throw new Error("15955 a group specification must include an _id");
|
|
if (!idSet) throw new Error("15955 a group specification must include an _id");
|
|
|
|
|
|
|
|
|
|
+ group._variables = new Variables(idGenerator.getIdCount());
|
|
|
|
|
+
|
|
|
return group;
|
|
return group;
|
|
|
};
|
|
};
|
|
|
|
|
|
|
|
-proto._getTypeStr = function _getTypeStr(obj) {
|
|
|
|
|
- var typeofStr = typeof obj,
|
|
|
|
|
- typeStr = (typeofStr == "object" && obj !== null) ? obj.constructor.name : typeofStr;
|
|
|
|
|
- return typeStr;
|
|
|
|
|
-};
|
|
|
|
|
|
|
+/**
|
|
|
|
|
+ * Populates the GroupDocumentSource by grouping all of the input documents at once.
|
|
|
|
|
+ *
|
|
|
|
|
+ * @method populate
|
|
|
|
|
+ * @param callback {Function} Required. callback(err) when done populating.
|
|
|
|
|
+ * @async
|
|
|
|
|
+ **/
|
|
|
|
|
+proto.populate = function populate(callback) {
|
|
|
|
|
+ var numAccumulators = this.accumulatorFactories.length;
|
|
|
|
|
+ if(numAccumulators !== this.expressions.length) {
|
|
|
|
|
+ callback(new Error("Must have equal number of accumulators and expressions"));
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
-proto.advance = function advance() {
|
|
|
|
|
- base.prototype.advance.call(this); // Check for interupts ????
|
|
|
|
|
- if(!this.populated) this.populate();
|
|
|
|
|
|
|
+ var input,
|
|
|
|
|
+ self = this;
|
|
|
|
|
+ async.whilst(
|
|
|
|
|
+ function() {
|
|
|
|
|
+ return input !== DocumentSource.EOF;
|
|
|
|
|
+ },
|
|
|
|
|
+ function(cb) {
|
|
|
|
|
+ self.source.getNext(function(err, doc) {
|
|
|
|
|
+ if(err) return cb(err);
|
|
|
|
|
+ if(doc === DocumentSource.EOF) {
|
|
|
|
|
+ input = doc;
|
|
|
|
|
+ return cb(); //Need to stop now, no new input
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
- //verify(this.currentGroupsKeysIndex < this.groupsKeys.length);
|
|
|
|
|
|
|
+ input = doc;
|
|
|
|
|
+ self._variables.setRoot(input);
|
|
|
|
|
|
|
|
- ++this.currentGroupsKeysIndex;
|
|
|
|
|
- if (this.currentGroupsKeysIndex >= this.groupsKeys.length) {
|
|
|
|
|
- this.currentDocument = null;
|
|
|
|
|
- return false;
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ /* get the _id value */
|
|
|
|
|
+ var id = self.idExpression.evaluate(self._variables);
|
|
|
|
|
|
|
|
- this.currentDocument = this.makeDocument(this.currentGroupsKeysIndex);
|
|
|
|
|
- return true;
|
|
|
|
|
-};
|
|
|
|
|
|
|
+ if(undefined === id) id = null;
|
|
|
|
|
+
|
|
|
|
|
+ var groupKey = JSON.stringify(id),
|
|
|
|
|
+ group = self.groups[JSON.stringify(id)];
|
|
|
|
|
+
|
|
|
|
|
+ if(!group) {
|
|
|
|
|
+ self.groupsKeys.push(groupKey);
|
|
|
|
|
+ group = [];
|
|
|
|
|
+ self.groups[groupKey] = group;
|
|
|
|
|
+ // Add the accumulators
|
|
|
|
|
+ for(var afi = 0; afi<self.accumulatorFactories.length; afi++) {
|
|
|
|
|
+ group.push(self.accumulatorFactories[afi]());
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ //NOTE: Skipped memory usage stuff for case when group already existed
|
|
|
|
|
+
|
|
|
|
|
+ if(numAccumulators !== group.length) {
|
|
|
|
|
+ throw new Error('Group must have one of each accumulator');
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ //NOTE: passing the input to each accumulator
|
|
|
|
|
+ for(var gi=0; gi<group.length; gi++) {
|
|
|
|
|
+ group[gi].process(self.expressions[gi].evaluate(self._variables /*, doingMerge*/));
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // We are done with the ROOT document so release it.
|
|
|
|
|
+ self._variables.clearRoot();
|
|
|
|
|
|
|
|
-proto.eof = function eof() {
|
|
|
|
|
- if (!this.populated) this.populate();
|
|
|
|
|
- return this.currentGroupsKeysIndex === this.groupsKeys.length;
|
|
|
|
|
|
|
+ //NOTE: Skipped the part about sorted files
|
|
|
|
|
+
|
|
|
|
|
+ return cb();
|
|
|
|
|
+ });
|
|
|
|
|
+ },
|
|
|
|
|
+ function(err) {
|
|
|
|
|
+ if(err) return callback(err);
|
|
|
|
|
+
|
|
|
|
|
+ self.populated = true;
|
|
|
|
|
+
|
|
|
|
|
+ return callback();
|
|
|
|
|
+ }
|
|
|
|
|
+ );
|
|
|
};
|
|
};
|
|
|
|
|
|
|
|
-proto.getCurrent = function getCurrent() {
|
|
|
|
|
- if (!this.populated) this.populate();
|
|
|
|
|
- return this.currentDocument;
|
|
|
|
|
|
|
+/**
|
|
|
|
|
+ * Get the type of something. Handles objects specially to return their true type; i.e. their constructor
|
|
|
|
|
+ *
|
|
|
|
|
+ * @method populate
|
|
|
|
|
+ * @param obj {Object} The object to get the type of
|
|
|
|
|
+ * @return {String} The type of the object as a string
|
|
|
|
|
+ * @async
|
|
|
|
|
+ **/
|
|
|
|
|
+proto._getTypeStr = function _getTypeStr(obj) {
|
|
|
|
|
+ var typeofStr = typeof obj,
|
|
|
|
|
+ typeStr = (typeofStr == "object" && obj !== null) ? obj.constructor.name : typeofStr;
|
|
|
|
|
+ return typeStr;
|
|
|
};
|
|
};
|
|
|
|
|
|
|
|
|
|
+/**
|
|
|
|
|
+ * Get the dependencies of the group
|
|
|
|
|
+ *
|
|
|
|
|
+ * @method getDependencies
|
|
|
|
|
+ * @param deps {Object} The
|
|
|
|
|
+ * @return {DocumentSource.getDepsReturn} An enum value specifying that these dependencies are exhaustive
|
|
|
|
|
+ * @async
|
|
|
|
|
+ **/
|
|
|
proto.getDependencies = function getDependencies(deps) {
|
|
proto.getDependencies = function getDependencies(deps) {
|
|
|
var self = this;
|
|
var self = this;
|
|
|
// add _id
|
|
// add _id
|
|
@@ -205,67 +370,53 @@ proto.getDependencies = function getDependencies(deps) {
|
|
|
return DocumentSource.GetDepsReturn.EXHAUSTIVE;
|
|
return DocumentSource.GetDepsReturn.EXHAUSTIVE;
|
|
|
};
|
|
};
|
|
|
|
|
|
|
|
|
|
+/**
|
|
|
|
|
+ * Called internally only. Adds an accumulator for each matching group.
|
|
|
|
|
+ *
|
|
|
|
|
+ * @method addAccumulator
|
|
|
|
|
+ * @param fieldName {String} The name of the field where the accumulated value will be placed
|
|
|
|
|
+ * @param accumulatorFactory {Accumulator} The constructor for creating accumulators
|
|
|
|
|
+ * @param epxression {Expression} The expression to be evaluated on incoming documents before they are accumulated
|
|
|
|
|
+ **/
|
|
|
proto.addAccumulator = function addAccumulator(fieldName, accumulatorFactory, expression) {
|
|
proto.addAccumulator = function addAccumulator(fieldName, accumulatorFactory, expression) {
|
|
|
this.fieldNames.push(fieldName);
|
|
this.fieldNames.push(fieldName);
|
|
|
this.accumulatorFactories.push(accumulatorFactory);
|
|
this.accumulatorFactories.push(accumulatorFactory);
|
|
|
this.expressions.push(expression);
|
|
this.expressions.push(expression);
|
|
|
};
|
|
};
|
|
|
|
|
|
|
|
-proto.populate = function populate() {
|
|
|
|
|
- for (var hasNext = !this.source.eof(); hasNext; hasNext = this.source.advance()) {
|
|
|
|
|
- var group,
|
|
|
|
|
- currentDocument = this.source.getCurrent(),
|
|
|
|
|
- _id = this.idExpression.evaluate(currentDocument);
|
|
|
|
|
-
|
|
|
|
|
- if (undefined === _id) _id = null;
|
|
|
|
|
|
|
+/**
|
|
|
|
|
+ * Makes a document with the given id and accumulators
|
|
|
|
|
+ *
|
|
|
|
|
+ * @method makeDocument
|
|
|
|
|
+ * @param fieldName {String} The name of the field where the accumulated value will be placed
|
|
|
|
|
+ * @param accums {Array} An array of accumulators
|
|
|
|
|
+ * @param epxression {Expression} The expression to be evaluated on incoming documents before they are accumulated
|
|
|
|
|
+ **/
|
|
|
|
|
+proto.makeDocument = function makeDocument(id, accums /*,mergeableOutput*/) {
|
|
|
|
|
+ var out = {};
|
|
|
|
|
|
|
|
- var idHash = JSON.stringify(_id); //TODO: USE A REAL HASH. I didn't have time to take collision into account.
|
|
|
|
|
|
|
+ /* add the _id field */
|
|
|
|
|
+ out._id = id;
|
|
|
|
|
|
|
|
- if (idHash in this.groups) {
|
|
|
|
|
- group = this.groups[idHash];
|
|
|
|
|
|
|
+ /* add the rest of the fields */
|
|
|
|
|
+ this.fieldNames.forEach(function(fieldName, i) {
|
|
|
|
|
+ var val = accums[i].getValue(/*mergeableOutput*/);
|
|
|
|
|
+ if(!val) {
|
|
|
|
|
+ out[fieldName] = null;
|
|
|
} else {
|
|
} else {
|
|
|
- this.groups[idHash] = group = [];
|
|
|
|
|
- this.groupsKeys[this.currentGroupsKeysIndex] = idHash;
|
|
|
|
|
- this.originalGroupsKeys[this.currentGroupsKeysIndex] = (_id && typeof _id === 'object') ? Document.clone(_id) : _id;
|
|
|
|
|
- ++this.currentGroupsKeysIndex;
|
|
|
|
|
- for (var ai = 0; ai < this.accumulatorFactories.length; ++ai) {
|
|
|
|
|
- var accumulator = new this.accumulatorFactories[ai]();
|
|
|
|
|
- accumulator.addOperand(this.expressions[ai]);
|
|
|
|
|
- group.push(accumulator);
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
- // tickle all the accumulators for the group we found
|
|
|
|
|
- for (var gi = 0; gi < group.length; ++gi) {
|
|
|
|
|
- group[gi].evaluate(currentDocument);
|
|
|
|
|
|
|
+ out[fieldName] = val;
|
|
|
}
|
|
}
|
|
|
|
|
+ });
|
|
|
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- this.currentGroupsKeysIndex = 0; // Start the group
|
|
|
|
|
- if (this.groupsKeys.length > 0) {
|
|
|
|
|
- this.currentDocument = this.makeDocument(this.currentGroupsKeysIndex);
|
|
|
|
|
- }
|
|
|
|
|
- this.populated = true;
|
|
|
|
|
-
|
|
|
|
|
|
|
+ return out;
|
|
|
};
|
|
};
|
|
|
|
|
|
|
|
-proto.makeDocument = function makeDocument(groupKeyIndex) {
|
|
|
|
|
- var groupKey = this.groupsKeys[groupKeyIndex],
|
|
|
|
|
- originalGroupKey = this.originalGroupsKeys[groupKeyIndex],
|
|
|
|
|
- group = this.groups[groupKey],
|
|
|
|
|
- doc = {};
|
|
|
|
|
-
|
|
|
|
|
- doc[Document.ID_PROPERTY_NAME] = originalGroupKey;
|
|
|
|
|
-
|
|
|
|
|
- for (var i = 0; i < this.fieldNames.length; ++i) {
|
|
|
|
|
- var fieldName = this.fieldNames[i],
|
|
|
|
|
- item = group[i];
|
|
|
|
|
- if (item !== "null" && item !== undefined) {
|
|
|
|
|
- doc[fieldName] = item.getValue();
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- return doc;
|
|
|
|
|
|
|
+/**
|
|
|
|
|
+ * Sets the id expression for the group
|
|
|
|
|
+ *
|
|
|
|
|
+ * @method setIdExpression
|
|
|
|
|
+ * @param epxression {Expression} The expression to set
|
|
|
|
|
+ **/
|
|
|
|
|
+proto.setIdExpression = function setIdExpression(expression) {
|
|
|
|
|
+ this.idExpression = expression;
|
|
|
};
|
|
};
|