SplitDocumentSource.js 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. "use strict";
  2. var SplitDocumentSource = module.exports = (function(){
  3. // CONSTRUCTOR
  4. /**
  5. * A document stream splitter
  6. *
  7. * @class SortDocumentSource
  8. * @namespace munge.pipeline.documentSources
  9. * @module munge
  10. * @constructor
  11. **/
  12. var klass = module.exports = SplitDocumentSource = function SplitDocumentSource(/* pCtx*/){
  13. if(arguments.length !== 0) throw new Error("zero args expected");
  14. base.call(this);
  15. /*
  16. * Before returning anything, this source must fetch everything from
  17. * the underlying source and group it. populate() is used to do that
  18. * on the first call to any method on this source. The populated
  19. * boolean indicates that this has been done
  20. **/
  21. this.populated = false;
  22. this.current = null;
  23. this.docIterator = null; // a number tracking our position in the documents array
  24. this.documents = []; // an array of documents
  25. this.pipelines = {};
  26. }, base = require('./DocumentSource'), proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
  27. klass.splitName = "$split";
  28. proto.getSourceName = function getSourceName(){
  29. return klass.splitName;
  30. };
  31. proto.getFactory = function getFactory(){
  32. return klass; // using the ctor rather than a separate .create() method
  33. };
  34. /**
  35. * Is the source at EOF?
  36. *
  37. * @method eof
  38. * @return {bool} return if we have hit the end of input
  39. **/
  40. proto.eof = function eof() {
  41. if (!this.populated)
  42. this.populate();
  43. return (this.docIterator == this.documents.length);
  44. };
  45. /**
  46. * some implementations do the equivalent of verify(!eof()) so check eof() first
  47. *
  48. * @method getCurrent
  49. * @returns {Document} the current Document without advancing
  50. **/
  51. proto.getCurrent = function getCurrent() {
  52. if (!this.populated)
  53. this.populate();
  54. return this.current;
  55. };
  56. /**
  57. * Advance the state of the DocumentSource so that it will return the next Document.
  58. * The default implementation returns false, after checking for interrupts.
  59. * Derived classes can call the default implementation in their own implementations in order to check for interrupts.
  60. *
  61. * @method advance
  62. * @returns {Boolean} whether there is another document to fetch, i.e., whether or not getCurrent() will succeed. This default implementation always returns false.
  63. **/
  64. proto.advance = function advance() {
  65. base.prototype.advance.call(this); // check for interrupts
  66. if (!this.populated)
  67. this.populate();
  68. if (this.docIterator == this.documents.length) throw new Error("This should never happen");
  69. ++this.docIterator;
  70. if (this.docIterator == this.documents.length) {
  71. this.current = null;
  72. return false;
  73. }
  74. this.current = this.documents[this.docIterator];
  75. return true;
  76. };
  77. /**
  78. * Create an object that represents the document source. The object
  79. * will have a single field whose name is the source's name. This
  80. * will be used by the default implementation of addToJsonArray()
  81. * to add this object to a pipeline being represented in JSON.
  82. *
  83. * @method sourceToJson
  84. * @param {Object} builder JSONObjBuilder: a blank object builder to write to
  85. * @param {Boolean} explain create explain output
  86. **/
  87. proto.sourceToJson = function sourceToJson(builder, explain) {
  88. builder.$split = {}; // TODO: this is the default for split but it may need to have a key?
  89. };
  90. proto.populate = function populate() {
  91. /* pull everything from the underlying source */
  92. for(var hasNext = !this.pSource.eof(); hasNext; hasNext = this.pSource.advance()) {
  93. var doc = this.pSource.getCurrent();
  94. this.documents.push(doc);
  95. }
  96. var splitDocument = {};
  97. for(var pipelineKey in this.pipelines){
  98. var pipeline = this.pipelines[pipelineKey],
  99. result = {};
  100. result.ok = pipeline.run(this.documents, result);
  101. splitDocument[pipelineKey] = result.result;
  102. }
  103. //"Join" all documents by placing the various pipeline results as the only doc in this.documents
  104. this.documents = [splitDocument];
  105. this.docIterator = 0;
  106. if (this.docIterator < this.documents.length)
  107. this.current = this.documents[this.docIterator];
  108. this.populated = true;
  109. };
  110. /**
  111. * Creates a new SortDocumentSource
  112. *
  113. * @param {Object} JsonElement
  114. **/
  115. klass.createFromJson = function createFromJson(jsonElement) {
  116. if (typeof jsonElement !== "object") throw new Error("code 15973; the " + klass.sortName + " key specification must be an object");
  117. var split = new SplitDocumentSource(),
  118. splitKeys = 0,
  119. PipelineCommand = require('../../commands/PipelineCommand');
  120. for(var key in jsonElement) {
  121. split.pipelines[key] = new PipelineCommand(jsonElement[key]);
  122. ++splitKeys;
  123. }
  124. if ( splitKeys <= 0) throw new Error("code 15977; " + klass.splitName + " must have at least one split key");
  125. return split;
  126. };
  127. /**
  128. * Reset the document source so that it is ready for a new stream of data.
  129. * Note that this is a deviation from the mongo implementation.
  130. *
  131. * @method reset
  132. **/
  133. proto.reset = function reset(){
  134. this.populated = false;
  135. this.current = null;
  136. this.docIterator = null; // a number tracking our position in the documents array
  137. this.documents = []; // an array of documents
  138. };
  139. return klass;
  140. })();