GroupDocumentSource.js 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. "use strict";
  2. var DocumentSource = require("./DocumentSource"),
  3. Accumulators = require("../accumulators/"),
  4. Document = require("../Document"),
  5. Expression = require("../expressions/Expression"),
  6. ConstantExpression = require("../expressions/ConstantExpression"),
  7. FieldPathExpression = require("../expressions/FieldPathExpression");
  8. var GroupDocumentSource = module.exports = (function(){
  9. // CONSTRUCTOR
  10. /**
  11. * A class for grouping documents together
  12. *
  13. * @class GroupDocumentSource
  14. * @namespace munge.pipeline.documentsource
  15. * @module munge
  16. * @constructor
  17. * @param {ExpressionContext}
  18. **/
  19. var klass = module.exports = GroupDocumentSource = function GroupDocumentSource(){
  20. this.populated = false;
  21. this.idExpression = null;
  22. this.groups = {}; // GroupsType Value -> Accumulators[]
  23. this.groupsKeys = []; // This is to faciliate easier look up of groups
  24. this.fieldNames = [];
  25. this.accumulatorFactories = [];
  26. this.expressions = [];
  27. this.currentDocument = null;
  28. this.currentGroupsKeysIndex = 0;
  29. }, base = DocumentSource, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
  30. klass.GroupOps = {
  31. "$addToSet": Accumulators.AddToSet,
  32. "$avg": Accumulators.Avg,
  33. "$first": Accumulators.First,
  34. "$last": Accumulators.Last,
  35. "$max": Accumulators.MinMax.createMax,
  36. "$min": Accumulators.MinMax.createMin,
  37. "$push": Accumulators.Push,
  38. "$sum": Accumulators.Sum
  39. };
  40. klass.createFromJson = function createFromJson(groupObj) {
  41. if(!(groupObj instanceof Object && groupObj.constructor.name === "Object"))
  42. throw new Error("a group's fields must be specified in an object");
  43. var idSet = false,
  44. group = new GroupDocumentSource();
  45. for(var groupFieldName in groupObj){
  46. if(groupObj.hasOwnProperty(groupFieldName)){
  47. var groupField = groupObj[groupFieldName];
  48. if(groupFieldName === "_id"){
  49. if(idSet) {
  50. throw new Error("15948 a group's _id may only be specified once");
  51. }
  52. if(groupField instanceof Object && groupField.constructor.name === "Object"){
  53. var objCtx = new Expression.ObjectCtx({isDocumentOk:true});
  54. group.idExpression = Expression.parseObject(groupField, objCtx);
  55. idSet = true;
  56. }else if( typeof groupField === "string"){
  57. if(groupField[0] !== "$") {
  58. group.idExpression = new ConstantExpression(groupField);
  59. }
  60. else {
  61. var pathString = Expression.removeFieldPrefix(groupField);
  62. group.idExpression = new FieldPathExpression(pathString);
  63. }
  64. idSet = true;
  65. }else{
  66. var typeStr = group._getTypeStr(groupField);
  67. switch(typeStr){
  68. case "number":
  69. case "string":
  70. case "boolean":
  71. case "Object":
  72. case "Array":
  73. group.idExpression = new ConstantExpression(groupField);
  74. idSet = true;
  75. break;
  76. default:
  77. throw new Error("a group's _id may not include fields of type " + typeStr + "");
  78. }
  79. }
  80. }else{
  81. if(groupFieldName.indexOf(".") !== -1)
  82. throw new Error("16414 the group aggregate field name '" + groupFieldName + "' cannot contain '.'");
  83. if(groupFieldName[0] === "$")
  84. throw new Error("15950 the group aggregate field name '" + groupFieldName + "' cannot be an operator name");
  85. if(group._getTypeStr(groupFieldName) === "Object")
  86. throw new Error("15951 the group aggregate field '" + groupFieldName + "' must be defined as an expression inside an object");
  87. var subFieldCount = 0;
  88. for(var subFieldName in groupField){
  89. if(groupField.hasOwnProperty(subFieldName)){
  90. var subField = groupField[subFieldName],
  91. op = klass.GroupOps[subFieldName];
  92. if(!op)
  93. throw new Error("15952 unknown group operator '" + subFieldName + "'");
  94. var groupExpression,
  95. subFieldTypeStr = group._getTypeStr(subField);
  96. if(subFieldTypeStr === "Object"){
  97. var subFieldObjCtx = new Expression.ObjectCtx({isDocumentOk:true});
  98. groupExpression = Expression.parseObject(subField, subFieldObjCtx);
  99. }else if(subFieldTypeStr === "Array"){
  100. throw new Error("15953 aggregating group operators are unary (" + subFieldName + ")");
  101. }else{
  102. groupExpression = Expression.parseOperand(subField);
  103. }
  104. group.addAccumulator(groupFieldName,op, groupExpression);
  105. ++subFieldCount;
  106. }
  107. }
  108. if(subFieldCount != 1)
  109. throw new Error("15954 the computed aggregate '" + groupFieldName + "' must specify exactly one operator");
  110. }
  111. }
  112. }
  113. if(!idSet) {
  114. throw new Error("15955 a group specification must include an _id");
  115. }
  116. return group;
  117. };
  118. proto._getTypeStr = function _getTypeStr(obj){
  119. var typeofStr=typeof obj,
  120. typeStr=(typeofStr == "object" ? obj.constructor.name : typeofStr);
  121. return typeStr;
  122. };
  123. klass.groupName = "$group";
  124. proto.getSourceName = function getSourceName(){
  125. return klass.groupName;
  126. };
  127. proto.advance = function advance(){
  128. base.prototype.advance.call(this); // Check for interupts ????
  129. if(!this.populated)
  130. this.populate();
  131. //verify(this.currentGroupsKeysIndex < this.groupsKeys.length);
  132. ++this.currentGroupsKeysIndex;
  133. if(this.currentGroupsKeysIndex === this.groupsKeys.length){
  134. this.currentDocument = null;
  135. return false;
  136. }
  137. this.currentDocument = this.makeDocument(this.currentGroupsKeysIndex);
  138. return true;
  139. };
  140. proto.eof = function eof(){
  141. if(!this.populated)
  142. this.populate();
  143. return this.currentGroupsKeysIndex === this.groupsKeys.length;
  144. };
  145. proto.getCurrent = function getCurrent(){
  146. if(!this.populated)
  147. this.populate();
  148. return this.currentDocument;
  149. };
  150. proto.addAccumulator = function addAccumulator(fieldName, accumulatorFactory, expression){
  151. this.fieldNames.push(fieldName);
  152. this.accumulatorFactories.push(accumulatorFactory);
  153. this.expressions.push(expression);
  154. };
  155. proto.populate = function populate(){
  156. for(var hasNext = !this.pSource.eof(); hasNext; hasNext = this.pSource.advance()){
  157. var group,
  158. currentDocument = this.pSource.getCurrent(),
  159. _id = this.idExpression.evaluate(currentDocument);
  160. if(undefined === _id) {
  161. _id = null;
  162. }
  163. var idHash = JSON.stringify(_id); //! @todo USE A REAL HASH. I didn't have time to take collision into account.
  164. if(idHash in this.groups){
  165. group = this.groups[idHash];
  166. }else{
  167. this.groups[idHash] = group = [];
  168. this.groupsKeys[this.currentGroupsKeysIndex] = idHash;
  169. ++this.currentGroupsKeysIndex;
  170. for(var ai =0; ai < this.accumulatorFactories.length; ++ai){
  171. var accumulator = new this.accumulatorFactories[ai]();
  172. accumulator.addOperand(this.expressions[ai]);
  173. group.push(accumulator);
  174. }
  175. }
  176. // tickle all the accumulators for the group we found
  177. for(var gi=0; gi < group.length; ++gi)
  178. group[gi].evaluate(currentDocument);
  179. }
  180. this.currentGroupsKeysIndex = 0; // Start the group
  181. if(this.groupsKeys.length > 0)
  182. this.currentDocument = this.makeDocument(this.currentGroupsKeysIndex);
  183. this.populated = true;
  184. };
  185. proto.makeDocument = function makeDocument(groupKeyIndex){
  186. var groupKey = this.groupsKeys[groupKeyIndex],
  187. group = this.groups[groupKey],
  188. doc = {};
  189. doc[Document.ID_PROPERTY_NAME] = JSON.parse(groupKey);
  190. for(var i = 0; i < this.fieldNames.length; ++i){
  191. var fieldName = this.fieldNames[i],
  192. item = group[i];
  193. if((item !== "null") && (typeof item !== "undefined")){
  194. doc[fieldName] = item.getValue();
  195. }
  196. }
  197. return doc;
  198. };
  199. /**
  200. * Reset the document source so that it is ready for a new stream of data.
  201. * Note that this is a deviation from the mongo implementation.
  202. *
  203. * @method reset
  204. **/
  205. proto.reset = function reset(){
  206. this.populated = false;
  207. this.groups = [];
  208. this.groupsKeys = [];
  209. this.currentDocument = null;
  210. this.currentGroupsKeysIndex = 0;
  211. };
  212. return klass;
  213. })();