Pipeline.js 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. "use strict";
  2. var Pipeline = module.exports = (function(){
  3. // CONSTRUCTOR
  4. /**
  5. * mongodb "commands" (sent via db.$cmd.findOne(...)) subclass to make a command. define a singleton object for it.
  6. * @class Pipeline
  7. * @namespace mungedb.aggregate.pipeline
  8. * @module mungedb-aggregate
  9. * @constructor
  10. **/
  11. var klass = function Pipeline(/*theCtx*/){
  12. this.sourceVector = [];//should be provate?
  13. }, base = Object, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
  14. var LimitDocumentSource = require('./documentSources/LimitDocumentSource'),
  15. MatchDocumentSource = require('./documentSources/MatchDocumentSource'),
  16. ProjectDocumentSource = require('./documentSources/ProjectDocumentSource'),
  17. SkipDocumentSource = require('./documentSources/SkipDocumentSource'),
  18. UnwindDocumentSource = require('./documentSources/UnwindDocumentSource'),
  19. GroupDocumentSource = require('./documentSources/GroupDocumentSource'),
  20. SortDocumentSource = require('./documentSources/SortDocumentSource'),
  21. SplitDocumentSource = require('./documentSources/SplitDocumentSource');
  22. klass.StageDesc = {};//attaching this to the class for test cases
  23. klass.StageDesc[LimitDocumentSource.limitName] = LimitDocumentSource.createFromJson;
  24. klass.StageDesc[MatchDocumentSource.matchName] = MatchDocumentSource.createFromJson;
  25. klass.StageDesc[ProjectDocumentSource.projectName] = ProjectDocumentSource.createFromJson;
  26. klass.StageDesc[SkipDocumentSource.skipName] = SkipDocumentSource.createFromJson;
  27. klass.StageDesc[UnwindDocumentSource.unwindName] = UnwindDocumentSource.createFromJson;
  28. klass.StageDesc[GroupDocumentSource.groupName] = GroupDocumentSource.createFromJson;
  29. klass.StageDesc[SortDocumentSource.sortName] = SortDocumentSource.createFromJson;
  30. klass.StageDesc[SplitDocumentSource.splitName] = SplitDocumentSource.createFromJson;
  31. /**
  32. * Create a pipeline from the command.
  33. *
  34. * @static
  35. * @method parseCommand
  36. * @param {Object} cmdObj the command object sent from the client
  37. * @returns {Array} the pipeline, if created, otherwise a NULL reference
  38. **/
  39. klass.parseCommand = function parseCommand(cmdObj){
  40. var pipelineInstance = new Pipeline(),
  41. pipeline = cmdObj;//munge: skipping the command parsing since all we care about is the pipeline
  42. var sourceVector = pipelineInstance.sourceVector,
  43. nSteps = pipeline.length;
  44. for( var iStep = 0; iStep<nSteps; ++iStep){
  45. /* pull out the pipeline element as an object */
  46. var pipeElement = pipeline[iStep];
  47. if (!(pipeElement instanceof Object)){
  48. throw new Error("pipeline element " + iStep + " is not an object; code 15942" );
  49. }
  50. // Parse a pipeline stage from 'obj'.
  51. var obj = pipeElement;
  52. if (Object.keys(obj).length !== 1){
  53. throw new Error("A pipeline stage specification object must contain exactly one field; code 16435" );
  54. }
  55. // Create a DocumentSource pipeline stage from 'stageSpec'.
  56. var stageName = Object.keys(obj)[0],
  57. stageSpec = obj[stageName],
  58. desc = klass.StageDesc[stageName];
  59. if (!desc){
  60. throw new Error("Unrecognized pipeline stage name: '" + stageName + "'; code 16435" );
  61. }
  62. var stage = desc(stageSpec);
  63. //verify(stage);
  64. stage.setPipelineStep(iStep);
  65. sourceVector.push(stage);
  66. }
  67. /* if there aren't any pipeline stages, there's nothing more to do */
  68. if (!sourceVector.length){
  69. return pipelineInstance;
  70. }
  71. /* Move filters up where possible.
  72. CW TODO -- move filter past projections where possible, and noting corresponding field renaming.
  73. */
  74. /*
  75. Wherever there is a match immediately following a sort, swap them.
  76. This means we sort fewer items. Neither changes the documents in the stream, so this transformation shouldn't affect the result.
  77. We do this first, because then when we coalesce operators below, any adjacent matches will be combined.
  78. */
  79. for(var srcn = sourceVector.length, srci = 1; srci < srcn; ++srci) {
  80. var source = sourceVector[srci];
  81. if (source.constructor === MatchDocumentSource) {
  82. var previous = sourceVector[srci - 1];
  83. if (previous.constructor === klass.SortDocumentSource) { //TODO: remove 'sort.' once sort is implemented!!!
  84. /* swap this item with the previous */
  85. sourceVector[srci - 1] = source;
  86. sourceVector[srci] = previous;
  87. }
  88. }
  89. }
  90. /*
  91. Coalesce adjacent filters where possible. Two adjacent filters are equivalent to one filter whose predicate is the conjunction of the two original filters' predicates.
  92. For now, capture this by giving any DocumentSource the option to absorb it's successor; this will also allow adjacent projections to coalesce when possible.
  93. Run through the DocumentSources, and give each one the opportunity to coalesce with its successor. If successful, remove the successor.
  94. Move all document sources to a temporary list.
  95. */
  96. var tempVector = sourceVector.slice(0);
  97. sourceVector.length = 0;
  98. /* move the first one to the final list */
  99. sourceVector.push(tempVector[0]);
  100. /* run through the sources, coalescing them or keeping them */
  101. for(var tempn = tempVector.length, tempi = 1; tempi < tempn; ++tempi) {
  102. /*
  103. If we can't coalesce the source with the last, then move it to the final list, and make it the new last.
  104. (If we succeeded, then we're still on the same last, and there's no need to move or do anything with the source -- the destruction of tempVector will take care of the rest.)
  105. */
  106. var lastSource = sourceVector[sourceVector.length - 1];
  107. var temp = tempVector[tempi];
  108. if (!temp || !lastSource){
  109. throw new Error("null document sources found");
  110. }
  111. if (!lastSource.coalesce(temp)){
  112. sourceVector.push(temp);
  113. }
  114. }
  115. /* optimize the elements in the pipeline */
  116. for(var i = 0, l = sourceVector.length; i<l; i++) {
  117. var iter = sourceVector[i];
  118. if (!iter) {
  119. throw new Error("Pipeline received empty document as argument");
  120. }
  121. iter.optimize();
  122. }
  123. return pipelineInstance;
  124. };
  125. /**
  126. * Run the pipeline
  127. *
  128. * @method run
  129. * @param {Object} result the results of running the pipeline will be stored on this object
  130. * @param {CursorDocumentSource} source the primary document source of the data
  131. **/
  132. proto.run = function run(result, source){
  133. for(var i = 0, l = this.sourceVector.length; i<l; i++) {
  134. var temp = this.sourceVector[i];
  135. temp.setSource(source);
  136. source = temp;
  137. }
  138. /* source is left pointing at the last source in the chain */
  139. /*
  140. Iterate through the resulting documents, and add them to the result.
  141. We do this even if we're doing an explain, in order to capture the document counts and other stats.
  142. However, we don't capture the result documents for explain.
  143. */
  144. // the array in which the aggregation results reside
  145. // cant use subArrayStart() due to error handling
  146. var resultArray = [];
  147. for(var hasDoc = !source.eof(); hasDoc; hasDoc = source.advance()) {
  148. var document = source.getCurrent();
  149. /* add the document to the result set */
  150. resultArray.push(document);
  151. //Commenting out this assertion for munge. MUHAHAHA!!!
  152. // object will be too large, assert. the extra 1KB is for headers
  153. // uassert(16389,
  154. // str::stream() << "aggregation result exceeds maximum document size (" << BSONObjMaxUserSize / (1024 * 1024) << "MB)",
  155. // resultArray.len() < BSONObjMaxUserSize - 1024);
  156. }
  157. result.result = resultArray;
  158. return true;
  159. };
  160. return klass;
  161. })();