GroupDocumentSource.js 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309
  1. "use strict";
  2. var DocumentSource = require("./DocumentSource"),
  3. Accumulators = require("../accumulators/"),
  4. Document = require("../Document"),
  5. Expression = require("../expressions/Expression"),
  6. ConstantExpression = require("../expressions/ConstantExpression"),
  7. FieldPathExpression = require("../expressions/FieldPathExpression");
  8. var GroupDocumentSource = module.exports = (function(){
  9. // CONSTRUCTOR
  10. /**
  11. * A class for grouping documents together
  12. *
  13. * @class GroupDocumentSource
  14. * @namespace munge.pipeline.documentsource
  15. * @module munge
  16. * @constructor
  17. * @param {ExpressionContext}
  18. **/
  19. var klass = module.exports = GroupDocumentSource = function GroupDocumentSource(/*pExpCtx*/){
  20. this.populated = false;
  21. this.idExpression = null;
  22. this.groups = {}; // GroupsType Value -> Accumulators[]
  23. this.groupsKeys = []; // This is to faciliate easier look up of groups
  24. this.fieldNames = [];
  25. this.accumulatorFactories = [];
  26. this.expressions = [];
  27. this.currentDocument = null;
  28. this.currentGroupsKeysIndex = 0;
  29. }, base = DocumentSource, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
  30. klass.GroupOps = {
  31. "$addToSet": Accumulators.AddToSet,
  32. "$avg": Accumulators.Avg,
  33. "$first": Accumulators.First,
  34. "$last": Accumulators.Last,
  35. "$max": Accumulators.MinMax.createMax,
  36. "$min": Accumulators.MinMax.createMin,
  37. "$push": Accumulators.Push,
  38. "$sum": Accumulators.Sum
  39. };
  40. /**
  41. * Create an object that represents the document source. The object
  42. * will have a single field whose name is the source's name. This
  43. * will be used by the default implementation of addToJsonArray()
  44. * to add this object to a pipeline being represented in JSON.
  45. *
  46. * @method sourceToJson
  47. * @param {Object} builder JSONObjBuilder: a blank object builder to write to
  48. * @param {Boolean} explain create explain output
  49. **/
  50. proto.sourceToJson = function sourceToJson(builder, explain) {
  51. var idExp = this.idExpression,
  52. insides = {
  53. _id: idExp ? idExp.toJson() : {}
  54. },
  55. aFac = this.accumulatorFactories,
  56. aFacLen = aFac.length;
  57. for(var i=0; i < aFacLen; ++i) {
  58. var acc = new aFac[i](/*pExpCtx*/);
  59. acc.addOperand(this.expressions[i]);
  60. insides[this.fieldNames[i]] = acc.toJson(true);
  61. }
  62. builder[this.getSourceName()] = insides;
  63. };
  64. klass.createFromJson = function createFromJson(groupObj) {
  65. if(!(groupObj instanceof Object && groupObj.constructor.name === "Object"))
  66. throw new Error("a group's fields must be specified in an object");
  67. var idSet = false,
  68. group = new GroupDocumentSource();
  69. for(var groupFieldName in groupObj){
  70. if(groupObj.hasOwnProperty(groupFieldName)){
  71. var groupField = groupObj[groupFieldName];
  72. if(groupFieldName === "_id"){
  73. if(idSet) {
  74. throw new Error("15948 a group's _id may only be specified once");
  75. }
  76. if(groupField instanceof Object && groupField.constructor.name === "Object"){
  77. var objCtx = new Expression.ObjectCtx({isDocumentOk:true});
  78. group.idExpression = Expression.parseObject(groupField, objCtx);
  79. idSet = true;
  80. }else if( typeof groupField === "string"){
  81. if(groupField[0] !== "$") {
  82. group.idExpression = new ConstantExpression(groupField);
  83. }
  84. else {
  85. var pathString = Expression.removeFieldPrefix(groupField);
  86. group.idExpression = new FieldPathExpression(pathString);
  87. }
  88. idSet = true;
  89. }else{
  90. var typeStr = group._getTypeStr(groupField);
  91. switch(typeStr){
  92. case "number":
  93. case "string":
  94. case "boolean":
  95. case "Object":
  96. case "object": // null returns "object" Xp
  97. case "Array":
  98. group.idExpression = new ConstantExpression(groupField);
  99. idSet = true;
  100. break;
  101. default:
  102. throw new Error("a group's _id may not include fields of type " + typeStr + "");
  103. }
  104. }
  105. }else{
  106. if(groupFieldName.indexOf(".") !== -1)
  107. throw new Error("16414 the group aggregate field name '" + groupFieldName + "' cannot contain '.'");
  108. if(groupFieldName[0] === "$")
  109. throw new Error("15950 the group aggregate field name '" + groupFieldName + "' cannot be an operator name");
  110. if(group._getTypeStr(groupFieldName) === "Object")
  111. throw new Error("15951 the group aggregate field '" + groupFieldName + "' must be defined as an expression inside an object");
  112. var subFieldCount = 0;
  113. for(var subFieldName in groupField){
  114. if(groupField.hasOwnProperty(subFieldName)){
  115. var subField = groupField[subFieldName],
  116. op = klass.GroupOps[subFieldName];
  117. if(!op)
  118. throw new Error("15952 unknown group operator '" + subFieldName + "'");
  119. var groupExpression,
  120. subFieldTypeStr = group._getTypeStr(subField);
  121. if(subFieldTypeStr === "Object"){
  122. var subFieldObjCtx = new Expression.ObjectCtx({isDocumentOk:true});
  123. groupExpression = Expression.parseObject(subField, subFieldObjCtx);
  124. }else if(subFieldTypeStr === "Array"){
  125. throw new Error("15953 aggregating group operators are unary (" + subFieldName + ")");
  126. }else{
  127. groupExpression = Expression.parseOperand(subField);
  128. }
  129. group.addAccumulator(groupFieldName,op, groupExpression);
  130. ++subFieldCount;
  131. }
  132. }
  133. if(subFieldCount != 1)
  134. throw new Error("15954 the computed aggregate '" + groupFieldName + "' must specify exactly one operator");
  135. }
  136. }
  137. }
  138. if(!idSet) {
  139. throw new Error("15955 a group specification must include an _id");
  140. }
  141. return group;
  142. };
  143. proto._getTypeStr = function _getTypeStr(obj){
  144. var typeofStr=typeof obj,
  145. typeStr=((typeofStr == "object" && obj !== null ) ? obj.constructor.name : typeofStr);
  146. return typeStr;
  147. };
  148. klass.groupName = "$group";
  149. proto.getSourceName = function getSourceName(){
  150. return klass.groupName;
  151. };
  152. proto.advance = function advance(){
  153. base.prototype.advance.call(this); // Check for interupts ????
  154. if(!this.populated)
  155. this.populate();
  156. //verify(this.currentGroupsKeysIndex < this.groupsKeys.length);
  157. ++this.currentGroupsKeysIndex;
  158. if(this.currentGroupsKeysIndex === this.groupsKeys.length){
  159. this.currentDocument = null;
  160. return false;
  161. }
  162. this.currentDocument = this.makeDocument(this.currentGroupsKeysIndex);
  163. return true;
  164. };
  165. proto.eof = function eof(){
  166. if(!this.populated)
  167. this.populate();
  168. return this.currentGroupsKeysIndex === this.groupsKeys.length;
  169. };
  170. proto.getCurrent = function getCurrent(){
  171. if(!this.populated)
  172. this.populate();
  173. return this.currentDocument;
  174. };
  175. proto.getDependencies = function getDependencies(deps) {
  176. var self = this;
  177. // add _id
  178. this.idExpression.addDependencies(deps);
  179. // add the rest
  180. this.fieldNames.forEach(function(field, i) {
  181. self.expressions[i].addDependencies(deps);
  182. });
  183. return DocumentSource.GetDepsReturn.EXHAUSTIVE;
  184. };
  185. proto.addAccumulator = function addAccumulator(fieldName, accumulatorFactory, expression){
  186. this.fieldNames.push(fieldName);
  187. this.accumulatorFactories.push(accumulatorFactory);
  188. this.expressions.push(expression);
  189. };
  190. proto.populate = function populate(){
  191. for(var hasNext = !this.pSource.eof(); hasNext; hasNext = this.pSource.advance()){
  192. var group,
  193. currentDocument = this.pSource.getCurrent(),
  194. _id = this.idExpression.evaluate(currentDocument);
  195. if(undefined === _id) {
  196. _id = null;
  197. }
  198. var idHash = JSON.stringify(_id); //! @todo USE A REAL HASH. I didn't have time to take collision into account.
  199. if(idHash in this.groups){
  200. group = this.groups[idHash];
  201. }else{
  202. this.groups[idHash] = group = [];
  203. this.groupsKeys[this.currentGroupsKeysIndex] = idHash;
  204. ++this.currentGroupsKeysIndex;
  205. for(var ai =0; ai < this.accumulatorFactories.length; ++ai){
  206. var accumulator = new this.accumulatorFactories[ai]();
  207. accumulator.addOperand(this.expressions[ai]);
  208. group.push(accumulator);
  209. }
  210. }
  211. // tickle all the accumulators for the group we found
  212. for(var gi=0; gi < group.length; ++gi)
  213. group[gi].evaluate(currentDocument);
  214. }
  215. this.currentGroupsKeysIndex = 0; // Start the group
  216. if(this.groupsKeys.length > 0)
  217. this.currentDocument = this.makeDocument(this.currentGroupsKeysIndex);
  218. this.populated = true;
  219. };
  220. proto.makeDocument = function makeDocument(groupKeyIndex){
  221. var groupKey = this.groupsKeys[groupKeyIndex],
  222. group = this.groups[groupKey],
  223. doc = {};
  224. doc[Document.ID_PROPERTY_NAME] = JSON.parse(groupKey);
  225. for(var i = 0; i < this.fieldNames.length; ++i){
  226. var fieldName = this.fieldNames[i],
  227. item = group[i];
  228. if((item !== "null") && (typeof item !== "undefined")){
  229. doc[fieldName] = item.getValue();
  230. }
  231. }
  232. return doc;
  233. };
  234. /**
  235. * Reset the document source so that it is ready for a new stream of data.
  236. * Note that this is a deviation from the mongo implementation.
  237. *
  238. * @method reset
  239. **/
  240. proto.reset = function reset(){
  241. this.populated = false;
  242. this.groups = [];
  243. this.groupsKeys = [];
  244. this.currentDocument = null;
  245. this.currentGroupsKeysIndex = 0;
  246. };
  247. return klass;
  248. })();