GroupDocumentSource.js 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. var DocumentSource = require("./DocumentSource"),
  2. Document = require("../Document"),
  3. Expression = require("../expressions/Expression"),
  4. Accumulators = require("../accumulators/"),
  5. GroupDocumentSource = module.exports = (function(){
  6. // CONSTRUCTOR
  7. /**
  8. * A class for grouping documents together
  9. *
  10. * @class GroupDocumentSource
  11. * @namespace munge.pipeline.documentsource
  12. * @module munge
  13. * @constructor
  14. * @param {ExpressionContext}
  15. **/
  16. var klass = module.exports = GroupDocumentSource = function GroupDocumentSource(groupElement){
  17. if(!(groupElement instanceof Object && groupElement.constructor.name === "Object") || Object.keys(groupElement).length < 1)
  18. throw new Error("a group's fields must be specified in an object");
  19. this.populated = false;
  20. this.idExpression = null;
  21. this.groups = {}; // GroupsType Value -> Accumulators[]
  22. this.groupsKeys = []; // This is to faciliate easier look up of groups
  23. this.fieldNames = [];
  24. this.accumulatorFactories = [];
  25. this.expressions = [];
  26. this.currentDocument = null;
  27. this.groupCurrentIndex = 0;
  28. var groupObj = groupElement[this.getSourceName()];
  29. for(var groupFieldName in groupObj){
  30. if(groupObj.hasOwnProperty(groupFieldName)){
  31. var groupField = groupObj[groupFieldName];
  32. if(groupFieldName === "_id"){
  33. if(groupField instanceof Object && groupField.constructor.name === "Object"){
  34. var objCtx = new Expression.ObjectCtx({isDocumentOk:true});
  35. this.idExpression = Expression.parseObject(groupField, objCtx);
  36. }else if( typeof groupField === "string"){
  37. if(groupField[0] !== "$")
  38. this.idExpression = new ConstantExpression(groupField);
  39. var pathString = Expression.removeFieldPrefix(groupField);
  40. this.idExpression = new FieldPathExpression(pathString);
  41. }else{
  42. var typeStr = this._getTypeStr(groupField);
  43. switch(typeStr){
  44. case "number":
  45. case "string":
  46. case "boolean":
  47. case "object":
  48. this.idExpression = new ConstantExpression(groupField);
  49. break;
  50. default:
  51. throw new Error("a group's _id may not include fields of type " + typeStr + "");
  52. }
  53. }
  54. }else{
  55. if(groupFieldName.indexOf(".") !== -1)
  56. throw new Error("16414 the group aggregate field name '" + groupFieldName + "' cannot contain '.'");
  57. if(groupFieldName[0] === "$")
  58. throw new Error("15950 the group aggregate field name '" + groupFieldName + "' cannot be an operator name");
  59. if(this._getTypeStr(groupFieldName) === "object")
  60. throw new Error("15951 the group aggregate field '" + groupFieldName + "' must be defined as an expression inside an object");
  61. var subFieldCount = 0;
  62. for(var subFieldName in groupField){
  63. if(groupField.hasOwnProperty(subFieldName)){
  64. var subField = groupField[subField],
  65. op = DocumentSource.GroupOps[subFieldName];
  66. if(!op)
  67. throw new Exception("15952 unknown group operator '" + subFieldName + "'");
  68. var groupExpression,
  69. subFieldTypeStr = this._getTypeStr(subField);
  70. if(subFieldTypeStr === "object"){
  71. var subFieldObjCtx = new Expression.ObjectCtx({isDocumentOk:true});
  72. groupExpression = Expression.parseObject(groupField, subFieldObjCtx);
  73. }else if(subFieldTypeStr === "Array"){
  74. throw new Exception("15953 aggregating group operators are unary (" + subFieldName + ")");
  75. }else{
  76. groupExpression = Expression.parseOperand(subField);
  77. }
  78. this.addAccumulator(groupFieldName,op, groupExpression);
  79. ++subFieldCount;
  80. }
  81. if(subFieldCount != 1)
  82. throw new Error("15954 the computed aggregate '" + groupFieldName + "' must specify exactly one operator");
  83. }
  84. }
  85. }
  86. }
  87. }, base = DocumentSource, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
  88. klass.GroupOps = {
  89. "$addToSet": Accumulators.AddToSet,
  90. "$avg": Accumulators.Avg,
  91. "$first": Accumulators.First,
  92. "$last": Accumulators.Last,
  93. "$max": Accumulators.MinMax.bind(null, 1),
  94. "$min": Accumulators.MinMax.bind(null, -1),
  95. "$push": Accumulators.Push,
  96. "$sum": Accumulators.Sum
  97. };
  98. proto._getTypeStr = function _getTypeStr(obj){
  99. var typeofStr=typeof groupField, typeStr = (typeofStr == "object" ? groupField.constructor.name : typeStr);
  100. return typeofStr;
  101. };
  102. proto.getSourceName = function getSourceName(){
  103. return "$group";
  104. };
  105. proto.advance = function advance(){
  106. base.prototype.advance.call(this); // Check for interupts ????
  107. if(!this.populated)
  108. this.populate();
  109. ++this.currentGroupsKeysIndex;
  110. if(this.currentGroupsKeysIndex === this.groupKeys.length){
  111. this.currentDocument = null;
  112. return false;
  113. }
  114. return true;
  115. };
  116. proto.eof = function eof(){
  117. if(!this.populated)
  118. this.populate();
  119. return this.currentGroupsKeysIndex === this.groupsKeys.length;
  120. };
  121. proto.getCurrent = function getCurrent(){
  122. if(!this.populated)
  123. this.populate();
  124. return this.currentDocument;
  125. };
  126. proto.addAccumulator = function addAccumulator(fieldName, accumulatorFactory, expression){
  127. this.fieldNames.push(fieldName);
  128. this.accumulatorFactories.push(accumulatorFactory);
  129. this.expressions.push(expression);
  130. };
  131. proto.populate = function populate(){
  132. for(var hasNext = !this.pSource.eof(); hasNext; hasNext = this.pSource.advance()){
  133. var currentDocument = this.pSource.getCurrent(),
  134. _id = this.idExpression.evaluate(currentDocument) || null,
  135. group;
  136. if(_id in this.groups){
  137. group = this.groups[_id];
  138. }else{
  139. this.groups[_id] = group = [];
  140. this.groupsKeys[this.currentGroupsKeysIndex] = _id;
  141. for(var ai =0; ai < this.accumulators.length; ++ai){
  142. var accumulator = new this.accumulatorFactories[ai]();
  143. accumulators.addOperand(this.expressions[ai]);
  144. group.push(accumulator);
  145. }
  146. }
  147. // tickle all the accumulators for the group we found
  148. for(var gi=0; gi < group.length; ++gi)
  149. group[gi].evaluate(currentDocument);
  150. this.currentGroupsKeysIndex = 0; // Start the group
  151. if(this.currentGroupsKeysIndex === this.groups.length-1)
  152. this.currentDocument = makeDocument(this.currentGroupsKeysIndex);
  153. this.populated = true;
  154. }
  155. };
  156. proto.makeDocument = function makeDocument(groupKeyIndex){
  157. var groupKey = this.groupKeys[groupKeyIndex],
  158. group = this.groups[groupKey],
  159. doc = {};
  160. doc[Document.ID_PROPERTY_NAME] = groupKey;
  161. for(var i = 0; i < this.fieldNames.length; ++i){
  162. var fieldName = this.fieldNames[i],
  163. value = this.group[i].getValue();
  164. if(typeof value !== "undefined"){
  165. doc[fieldName] = value;
  166. }
  167. }
  168. return doc;
  169. };
  170. return klass;
  171. })();