Pipeline.js 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. var Pipeline = module.exports = (function(){
  2. // CONSTRUCTOR
  3. var klass = function Pipeline(){
  4. this.sourceVector = [];//should be provate?
  5. }, base = Object, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
  6. var LimitDocumentSource = require('./documentSources/LimitDocumentSource'),
  7. MatchDocumentSource = require('./documentSources/MatchDocumentSource'),
  8. ProjectDocumentSource = require('./documentSources/ProjectDocumentSource');
  9. // GroupDocumentSource = require('./documentSources/GroupDocumentSource'),
  10. // SkipDocumentSource = require('./documentSources/SkipDocumentSource'),
  11. // SortDocumentSource = require('./documentSources/SortDocumentSource'),
  12. // UnwindDocumentSource = require('./documentSources/UnwindDocumentSource');
  13. klass.StageDesc = {};//attaching this to the class for test cases
  14. klass.StageDesc[LimitDocumentSource.limitName] = LimitDocumentSource.createFromJson;
  15. klass.StageDesc[MatchDocumentSource.matchName] = MatchDocumentSource.createFromJson;
  16. klass.StageDesc[ProjectDocumentSource.projectName] = ProjectDocumentSource.createFromJson;
  17. // klass.StageDesc[GroupDocumentSource.groupName] = GroupDocumentSource.createFromJson;
  18. // klass.StageDesc[SkipDocumentSource.skipName] = SkipDocumentSource.createFromJson;
  19. // klass.StageDesc[SortDocumentSource.sortName] = SortDocumentSource.createFromJson;
  20. // klass.StageDesc[UnwindDocumentSource.unwindName] = UnwindDocumentSource.createFromJson;
  21. /**
  22. * Create a pipeline from the command.
  23. *
  24. * @param {Object} cmdObj the command object sent from the client
  25. * @returns {Array} the pipeline, if created, otherwise a NULL reference
  26. **/
  27. klass.parseCommand = function parseCommand(cmdObj){
  28. var pipelineInstance = new Pipeline(),
  29. pipeline = cmdObj;//munge: skipping the command parsing since all we care about is the pipeline
  30. var sourceVector = pipelineInstance.sourceVector,
  31. nSteps = pipeline.length;
  32. for( var iStep = 0; iStep<nSteps; ++iStep){
  33. /* pull out the pipeline element as an object */
  34. var pipeElement = pipeline[iStep];
  35. if (!(pipeElement instanceof Object)){
  36. throw new Error("pipeline element " + iStep + " is not an object; code 15942" );
  37. }
  38. // Parse a pipeline stage from 'obj'.
  39. var obj = pipeElement;
  40. if (Object.keys(obj).length !== 1){
  41. throw new Error("A pipeline stage specification object must contain exactly one field; code 16435" );
  42. }
  43. // Create a DocumentSource pipeline stage from 'stageSpec'.
  44. var stageName = Object.keys(obj)[0],
  45. stageSpec = obj[stageName],
  46. desc = klass.StageDesc[stageName];
  47. if (!desc){
  48. throw new Error("Unrecognized pipeline stage name: '" + stageName + "'; code 16435" );
  49. }
  50. var stage = desc(stageSpec);
  51. //verify(stage);
  52. stage.setPipelineStep(iStep);
  53. sourceVector.push(stage);
  54. }
  55. /* if there aren't any pipeline stages, there's nothing more to do */
  56. if (!sourceVector.length){
  57. return pipelineInstance;
  58. }
  59. /*
  60. Move filters up where possible.
  61. CW TODO -- move filter past projections where possible, and noting
  62. corresponding field renaming.
  63. */
  64. /*
  65. Wherever there is a match immediately following a sort, swap them.
  66. This means we sort fewer items. Neither changes the documents in
  67. the stream, so this transformation shouldn't affect the result.
  68. We do this first, because then when we coalesce operators below,
  69. any adjacent matches will be combined.
  70. */
  71. for(var srcn = sourceVector.length, srci = 1; srci < srcn; ++srci) {
  72. var source = sourceVector[srci];
  73. if (source.constructor === MatchDocumentSource) {
  74. var previous = sourceVector[srci - 1];
  75. if (previous.constructor === klass.SortDocumentSource) { //TODO: remove 'sort.' once sort is implemented!!!
  76. /* swap this item with the previous */
  77. sourceVector[srci - 1] = source;
  78. sourceVector[srci] = previous;
  79. }
  80. }
  81. }
  82. /*
  83. Coalesce adjacent filters where possible. Two adjacent filters
  84. are equivalent to one filter whose predicate is the conjunction of
  85. the two original filters' predicates. For now, capture this by
  86. giving any DocumentSource the option to absorb it's successor; this
  87. will also allow adjacent projections to coalesce when possible.
  88. Run through the DocumentSources, and give each one the opportunity
  89. to coalesce with its successor. If successful, remove the
  90. successor.
  91. Move all document sources to a temporary list.
  92. */
  93. var tempVector = sourceVector.slice(0);
  94. sourceVector.length = 0;
  95. /* move the first one to the final list */
  96. sourceVector.push(tempVector[0]);
  97. /* run through the sources, coalescing them or keeping them */
  98. for(var tempn = tempVector.length, tempi = 1; tempi < tempn; ++tempi) {
  99. /*
  100. If we can't coalesce the source with the last, then move it
  101. to the final list, and make it the new last. (If we succeeded,
  102. then we're still on the same last, and there's no need to move
  103. or do anything with the source -- the destruction of tempVector
  104. will take care of the rest.)
  105. */
  106. var lastSource = sourceVector[sourceVector.length - 1];
  107. var temp = tempVector[tempi];
  108. if (!temp || !lastSource){
  109. throw new Error("null document sources found");
  110. }
  111. if (!lastSource.coalesce(temp)){
  112. sourceVector.push(temp);
  113. }
  114. }
  115. /* optimize the elements in the pipeline */
  116. for(var i = 0, l = sourceVector.length; i<l; i++) {
  117. var iter = sourceVector[i];
  118. if (!iter) {
  119. throw new Error("Pipeline received empty document as argument");
  120. }
  121. iter.optimize();
  122. }
  123. return pipelineInstance;
  124. };
  125. /**
  126. * Run the pipeline
  127. *
  128. * @param {Object} result the results of running the pipeline will be stored on this object
  129. * @param {CursorDocumentSource} source the primary document source of the data
  130. **/
  131. proto.run = function run(result, source){
  132. for(var i = 0, l = this.sourceVector.length; i<l; i++) {
  133. var temp = this.sourceVector[i];
  134. temp.setSource(source);
  135. source = temp;
  136. }
  137. /* source is left pointing at the last source in the chain */
  138. /*
  139. Iterate through the resulting documents, and add them to the result.
  140. We do this even if we're doing an explain, in order to capture
  141. the document counts and other stats. However, we don't capture
  142. the result documents for explain.
  143. */
  144. // the array in which the aggregation results reside
  145. // cant use subArrayStart() due to error handling
  146. var resultArray = [];
  147. for(var hasDoc = !source.eof(); hasDoc; hasDoc = source.advance()) {
  148. var document = source.getCurrent();
  149. /* add the document to the result set */
  150. resultArray.push(document);
  151. //Commenting out this assertion for munge. MUHAHAHA!!!
  152. // object will be too large, assert. the extra 1KB is for headers
  153. // uassert(16389,
  154. // str::stream() << "aggregation result exceeds maximum document size ("
  155. // << BSONObjMaxUserSize / (1024 * 1024) << "MB)",
  156. // resultArray.len() < BSONObjMaxUserSize - 1024);
  157. }
  158. result.result = resultArray;
  159. return true;
  160. };
  161. return klass;
  162. })();