GroupDocumentSource.js 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422
  1. "use strict";
  2. var DocumentSource = require("./DocumentSource"),
  3. Accumulators = require("../accumulators/"),
  4. Document = require("../Document"),
  5. Expression = require("../expressions/Expression"),
  6. ConstantExpression = require("../expressions/ConstantExpression"),
  7. FieldPathExpression = require("../expressions/FieldPathExpression"),
  8. Variables = require("../expressions/Variables"),
  9. VariablesIdGenerator = require("../expressions/VariablesIdGenerator"),
  10. VariablesParseState = require("../expressions/VariablesParseState"),
  11. async = require("async");
  12. /**
  13. * A class for grouping documents together
  14. *
  15. * @class GroupDocumentSource
  16. * @namespace mungedb-aggregate.pipeline.documentSources
  17. * @module mungedb-aggregate
  18. * @constructor
  19. * @param [expCtx] {ExpressionContext}
  20. **/
  21. var GroupDocumentSource = module.exports = function GroupDocumentSource(expCtx) {
  22. if (arguments.length > 1) throw new Error("up to one arg expected");
  23. base.call(this, expCtx);
  24. this.populated = false;
  25. this.idExpression = null;
  26. this.groups = {}; // GroupsType Value -> Accumulators[]
  27. this.groupsKeys = []; // This is to faciliate easier look up of groups
  28. this.originalGroupsKeys = []; // This stores the original group key un-hashed/stringified/whatever
  29. this._variables = null;
  30. this.fieldNames = [];
  31. this.accumulatorFactories = [];
  32. this.expressions = [];
  33. this.currentDocument = null;
  34. this.currentGroupsKeysIndex = 0;
  35. }, klass = GroupDocumentSource, base = DocumentSource, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
  36. klass.groupOps = {
  37. "$addToSet": Accumulators.AddToSet,
  38. "$avg": Accumulators.Avg,
  39. "$first": Accumulators.First,
  40. "$last": Accumulators.Last,
  41. "$max": Accumulators.MinMax.createMax, // $min and $max have special constructors because they share base features
  42. "$min": Accumulators.MinMax.createMin,
  43. "$push": Accumulators.Push,
  44. "$sum": Accumulators.Sum
  45. };
  46. klass.groupName = "$group";
  47. /**
  48. * Factory for making GroupDocumentSources
  49. *
  50. * @method create
  51. * @static
  52. * @param [expCtx] {ExpressionContext}
  53. **/
  54. klass.create = function create(expCtx) {
  55. return new GroupDocumentSource(expCtx);
  56. };
  57. /**
  58. * Factory for making GroupDocumentSources
  59. *
  60. * @method getSourceName
  61. * @return {GroupDocumentSource}
  62. **/
  63. proto.getSourceName = function getSourceName() {
  64. return klass.groupName;
  65. };
  66. /**
  67. * Gets the next document or DocumentSource.EOF if none
  68. *
  69. * @method getNext
  70. * @return {Object}
  71. **/
  72. proto.getNext = function getNext(callback) {
  73. var self = this;
  74. async.series([
  75. function(next) {
  76. if (!self.populated)
  77. self.populate(function(err) {
  78. return next(err);
  79. });
  80. else
  81. return next();
  82. },
  83. function(next) {
  84. if(Object.keys(self.groups).length === 0) {
  85. return next(null, DocumentSource.EOF);
  86. }
  87. //Note: Skipped the spilled logic
  88. if(self.currentGroupsKeysIndex === self.groupsKeys.length) {
  89. return next(null, DocumentSource.EOF);
  90. }
  91. var id = self.groupsKeys[self.currentGroupsKeysIndex],
  92. accumulators = self.groups[id],
  93. out = self.makeDocument(id, accumulators /*,mergeableOutput*/);
  94. if(++self.currentGroupsKeysIndex === self.groupsKeys.length) {
  95. self.dispose();
  96. }
  97. return next(null, out);
  98. }
  99. ], function(err, results) {
  100. callback(err, results[1]);
  101. });
  102. };
  103. /**
  104. * Sets this source as apparently empty
  105. *
  106. * @method dispose
  107. **/
  108. proto.dispose = function dispose() {
  109. //NOTE: Skipped 'freeing' our resources; at best we could remove some references to things, but our parent will probably forget us anyways!
  110. // make us look done
  111. this.currentGroupsKeysIndex = this.groupsKeys.length;
  112. // free our source's resources
  113. this.source.dispose();
  114. };
  115. /**
  116. * Optimizes the expressions in the group
  117. * @method optimize
  118. **/
  119. proto.optimize = function optimize() {
  120. var self = this;
  121. self.idExpression = self.idExpression.optimize();
  122. self.expressions.forEach(function(expression, i) {
  123. self.expressions[i] = expression.optimize();
  124. });
  125. };
  126. /**
  127. * Create an object that represents the document source. The object
  128. * will have a single field whose name is the source's name.
  129. *
  130. * @method serialize
  131. * @param explain {Boolean} Create explain output
  132. **/
  133. proto.serialize = function serialize(explain) {
  134. var insides = {};
  135. // add the _id
  136. insides._id = this.idExpression.serialize(explain);
  137. //add the remaining fields
  138. var aFacs = this.accumulatorFactories,
  139. aFacLen = aFacs.length;
  140. for(var i=0; i < aFacLen; i++) {
  141. var aFac = aFacs[i](),
  142. serialExpression = this.expressions[i].serialize(explain), //Get the accumulator's expression
  143. serialAccumulator = {}; //Where we'll put the expression
  144. serialAccumulator[aFac.getOpName()] = serialExpression;
  145. insides[this.fieldNames[i]] = serialAccumulator;
  146. }
  147. var serialSource = {};
  148. serialSource[this.getSourceName()] = insides;
  149. return serialSource;
  150. };
  151. /**
  152. * Creates a GroupDocumentSource from the given elem
  153. *
  154. * @method createFromJson
  155. * @param elem {Object} The group specification object; the right hand side of the $group
  156. **/
  157. klass.createFromJson = function createFromJson(elem, expCtx) {
  158. if (!(elem instanceof Object && elem.constructor === Object)) throw new Error("a group's fields must be specified in an object");
  159. var group = GroupDocumentSource.create(expCtx),
  160. idSet = false;
  161. var groupObj = elem,
  162. idGenerator = new VariablesIdGenerator(),
  163. vps = new VariablesParseState(idGenerator);
  164. for (var groupFieldName in groupObj) {
  165. if (groupObj.hasOwnProperty(groupFieldName)) {
  166. var groupField = groupObj[groupFieldName];
  167. if (groupFieldName === "_id") {
  168. if(idSet) throw new Error("15948 a group's _id may only be specified once");
  169. if (groupField instanceof Object && groupField.constructor === Object) {
  170. /*
  171. Use the projection-like set of field paths to create the
  172. group-by key.
  173. */
  174. var objCtx = new Expression.ObjectCtx({isDocumentOk:true});
  175. group.setIdExpression(Expression.parseObject(groupField, objCtx, vps));
  176. idSet = true;
  177. } else if (typeof groupField === "string") {
  178. if (groupField[0] === "$") {
  179. group.setIdExpression(FieldPathExpression.parse(groupField, vps));
  180. idSet = true;
  181. }
  182. }
  183. if (!idSet) {
  184. // constant id - single group
  185. group.setIdExpression(ConstantExpression.create(groupField));
  186. idSet = true;
  187. }
  188. } else {
  189. /*
  190. Treat as a projection field with the additional ability to
  191. add aggregation operators.
  192. */
  193. if (groupFieldName.indexOf(".") !== -1) throw new Error("16414 the group aggregate field name '" + groupFieldName + "' cannot contain '.'");
  194. if (groupFieldName[0] === "$") throw new Error("15950 the group aggregate field name '" + groupFieldName + "' cannot be an operator name");
  195. if (group._getTypeStr(groupFieldName) === "Object") throw new Error("15951 the group aggregate field '" + groupFieldName + "' must be defined as an expression inside an object");
  196. var subElementCount = 0;
  197. for (var subElementName in groupField) {
  198. if (groupField.hasOwnProperty(subElementName)) {
  199. var subElement = groupField[subElementName],
  200. op = klass.groupOps[subElementName];
  201. if (!op) throw new Error("15952 unknown group operator '" + subElementName + "'");
  202. var groupExpression,
  203. subElementTypeStr = group._getTypeStr(subElement);
  204. if (subElementTypeStr === "Object") {
  205. var subElementObjCtx = new Expression.ObjectCtx({isDocumentOk:true});
  206. groupExpression = Expression.parseObject(subElement, subElementObjCtx, vps);
  207. } else if (subElementTypeStr === "Array") {
  208. throw new Error("15953 aggregating group operators are unary (" + subElementName + ")");
  209. } else { /* assume its an atomic single operand */
  210. groupExpression = Expression.parseOperand(subElement, vps);
  211. }
  212. group.addAccumulator(groupFieldName, op, groupExpression);
  213. ++subElementCount;
  214. }
  215. }
  216. if (subElementCount !== 1) throw new Error("15954 the computed aggregate '" + groupFieldName + "' must specify exactly one operator");
  217. }
  218. }
  219. }
  220. if (!idSet) throw new Error("15955 a group specification must include an _id");
  221. group._variables = new Variables(idGenerator.getIdCount());
  222. return group;
  223. };
  224. /**
  225. * Populates the GroupDocumentSource by grouping all of the input documents at once.
  226. *
  227. * @method populate
  228. * @param callback {Function} Required. callback(err) when done populating.
  229. * @async
  230. **/
  231. proto.populate = function populate(callback) {
  232. var numAccumulators = this.accumulatorFactories.length;
  233. if(numAccumulators !== this.expressions.length) {
  234. callback(new Error("Must have equal number of accumulators and expressions"));
  235. }
  236. var input,
  237. self = this;
  238. async.whilst(
  239. function() {
  240. return input !== DocumentSource.EOF;
  241. },
  242. function(cb) {
  243. self.source.getNext(function(err, doc) {
  244. if(err) return cb(err);
  245. if(doc === DocumentSource.EOF) {
  246. input = doc;
  247. return cb(); //Need to stop now, no new input
  248. }
  249. input = doc;
  250. self._variables.setRoot(input);
  251. /* get the _id value */
  252. var id = self.idExpression.evaluate(self._variables);
  253. if(undefined === id) id = null;
  254. var groupKey = JSON.stringify(id),
  255. group = self.groups[JSON.stringify(id)];
  256. if(!group) {
  257. self.groupsKeys.push(groupKey);
  258. group = [];
  259. self.groups[groupKey] = group;
  260. // Add the accumulators
  261. for(var afi = 0; afi<self.accumulatorFactories.length; afi++) {
  262. group.push(self.accumulatorFactories[afi]());
  263. }
  264. }
  265. //NOTE: Skipped memory usage stuff for case when group already existed
  266. if(numAccumulators !== group.length) {
  267. throw new Error('Group must have one of each accumulator');
  268. }
  269. //NOTE: passing the input to each accumulator
  270. for(var gi=0; gi<group.length; gi++) {
  271. group[gi].process(self.expressions[gi].evaluate(self._variables /*, doingMerge*/));
  272. }
  273. // We are done with the ROOT document so release it.
  274. self._variables.clearRoot();
  275. //NOTE: Skipped the part about sorted files
  276. return cb();
  277. });
  278. },
  279. function(err) {
  280. if(err) return callback(err);
  281. self.populated = true;
  282. return callback();
  283. }
  284. );
  285. };
  286. /**
  287. * Get the type of something. Handles objects specially to return their true type; i.e. their constructor
  288. *
  289. * @method populate
  290. * @param obj {Object} The object to get the type of
  291. * @return {String} The type of the object as a string
  292. * @async
  293. **/
  294. proto._getTypeStr = function _getTypeStr(obj) {
  295. var typeofStr = typeof obj,
  296. typeStr = (typeofStr == "object" && obj !== null) ? obj.constructor.name : typeofStr;
  297. return typeStr;
  298. };
  299. /**
  300. * Get the dependencies of the group
  301. *
  302. * @method getDependencies
  303. * @param deps {Object} The
  304. * @return {DocumentSource.getDepsReturn} An enum value specifying that these dependencies are exhaustive
  305. * @async
  306. **/
  307. proto.getDependencies = function getDependencies(deps) {
  308. var self = this;
  309. // add _id
  310. this.idExpression.addDependencies(deps);
  311. // add the rest
  312. this.fieldNames.forEach(function (field, i) {
  313. self.expressions[i].addDependencies(deps);
  314. });
  315. return DocumentSource.GetDepsReturn.EXHAUSTIVE;
  316. };
  317. /**
  318. * Called internally only. Adds an accumulator for each matching group.
  319. *
  320. * @method addAccumulator
  321. * @param fieldName {String} The name of the field where the accumulated value will be placed
  322. * @param accumulatorFactory {Accumulator} The constructor for creating accumulators
  323. * @param epxression {Expression} The expression to be evaluated on incoming documents before they are accumulated
  324. **/
  325. proto.addAccumulator = function addAccumulator(fieldName, accumulatorFactory, expression) {
  326. this.fieldNames.push(fieldName);
  327. this.accumulatorFactories.push(accumulatorFactory);
  328. this.expressions.push(expression);
  329. };
  330. /**
  331. * Makes a document with the given id and accumulators
  332. *
  333. * @method makeDocument
  334. * @param fieldName {String} The name of the field where the accumulated value will be placed
  335. * @param accums {Array} An array of accumulators
  336. * @param epxression {Expression} The expression to be evaluated on incoming documents before they are accumulated
  337. **/
  338. proto.makeDocument = function makeDocument(id, accums /*,mergeableOutput*/) {
  339. var out = {};
  340. /* add the _id field */
  341. out._id = id;
  342. /* add the rest of the fields */
  343. this.fieldNames.forEach(function(fieldName, i) {
  344. var val = accums[i].getValue(/*mergeableOutput*/);
  345. if(!val) {
  346. out[fieldName] = null;
  347. } else {
  348. out[fieldName] = val;
  349. }
  350. });
  351. return out;
  352. };
  353. /**
  354. * Sets the id expression for the group
  355. *
  356. * @method setIdExpression
  357. * @param epxression {Expression} The expression to set
  358. **/
  359. proto.setIdExpression = function setIdExpression(expression) {
  360. this.idExpression = expression;
  361. };