DocumentSource.js 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. "use strict";
  2. /**
  3. * A base class for all document sources
  4. * @class DocumentSource
  5. * @namespace mungedb-aggregate.pipeline.documentSources
  6. * @module mungedb-aggregate
  7. * @constructor
  8. * @param expCtx {ExpressionContext}
  9. **/
  10. var DocumentSource = module.exports = function DocumentSource(expCtx){
  11. if(arguments.length !== 1) throw new Error("one arg expected");
  12. /*
  13. * Most DocumentSources have an underlying source they get their data
  14. * from. This is a convenience for them.
  15. * The default implementation of setSource() sets this; if you don't
  16. * need a source, override that to verify(). The default is to
  17. * verify() if this has already been set.
  18. */
  19. this.source = null;
  20. /*
  21. * The zero-based user-specified pipeline step. Used for diagnostics.
  22. * Will be set to -1 for artificial pipeline steps that were not part
  23. * of the original user specification.
  24. */
  25. this.step = -1;
  26. this.expCtx = expCtx || {};
  27. /*
  28. * for explain: # of rows returned by this source
  29. * This is *not* unsigned so it can be passed to JSONObjBuilder.append().
  30. */
  31. this.nRowsOut = 0;
  32. }, klass = DocumentSource, base = Object, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
  33. /*
  34. class DocumentSource :
  35. public IntrusiveCounterUnsigned,
  36. public StringWriter {
  37. public:
  38. virtual ~DocumentSource();
  39. // virtuals from StringWriter
  40. virtual void writeString(stringstream &ss) const;
  41. */
  42. /**
  43. * Set the step for a user-specified pipeline step.
  44. * @method setPipelineStep
  45. * @param {Number} step number 0 to n.
  46. **/
  47. proto.setPipelineStep = function setPipelineStep(step) {
  48. this.step = step;
  49. };
  50. /**
  51. * Get the user-specified pipeline step.
  52. * @method getPipelineStep
  53. * @returns {Number} step
  54. **/
  55. proto.getPipelineStep = function getPipelineStep() {
  56. return this.step;
  57. };
  58. /**
  59. * Returns the next Document if there is one or null if at EOF.
  60. *
  61. * some implementations do the equivalent of verify(!eof()) so check eof() first
  62. * @method getNext
  63. * @returns {Document} the current Document without advancing
  64. **/
  65. proto.getNext = function getNext(callback) {
  66. throw new Error("not implemented");
  67. };
  68. /**
  69. * Inform the source that it is no longer needed and may release its resources. After
  70. * dispose() is called the source must still be able to handle iteration requests, but may
  71. * become eof().
  72. * NOTE: For proper mutex yielding, dispose() must be called on any DocumentSource that will
  73. * not be advanced until eof(), see SERVER-6123.
  74. *
  75. * @method dispose
  76. **/
  77. proto.dispose = function dispose() {
  78. if ( this.source ) {
  79. // This is required for the DocumentSourceCursor to release its read lock, see
  80. // SERVER-6123.
  81. this.source.dispose();
  82. }
  83. };
  84. /**
  85. * Get the source's name.
  86. * @method getSourceName
  87. * @returns {String} the string name of the source as a constant string; this is static, and there's no need to worry about adopting it
  88. **/
  89. proto.getSourceName = function getSourceName() {
  90. return "[UNKNOWN]";
  91. };
  92. /**
  93. * Set the underlying source this source should use to get Documents
  94. * from.
  95. * It is an error to set the source more than once. This is to
  96. * prevent changing sources once the original source has been started;
  97. * this could break the state maintained by the DocumentSource.
  98. * This pointer is not reference counted because that has led to
  99. * some circular references. As a result, this doesn't keep
  100. * sources alive, and is only intended to be used temporarily for
  101. * the lifetime of a Pipeline::run().
  102. *
  103. * @method setSource
  104. * @param {DocumentSource} source the underlying source to use
  105. **/
  106. proto.setSource = function setSource(theSource) {
  107. if (this.source) throw new Error("It is an error to set the source more than once");
  108. this.source = theSource;
  109. };
  110. /**
  111. * Attempt to coalesce this DocumentSource with its successor in the
  112. * document processing pipeline. If successful, the successor
  113. * DocumentSource should be removed from the pipeline and discarded.
  114. * If successful, this operation can be applied repeatedly, in an
  115. * attempt to coalesce several sources together.
  116. * The default implementation is to do nothing, and return false.
  117. *
  118. * @method coalesce
  119. * @param {DocumentSource} nextSource the next source in the document processing chain.
  120. * @returns {Boolean} whether or not the attempt to coalesce was successful or not; if the attempt was not successful, nothing has been changed
  121. **/
  122. proto.coalesce = function coalesce(nextSource) {
  123. return false;
  124. };
  125. /**
  126. * Optimize the pipeline operation, if possible. This is a local
  127. * optimization that only looks within this DocumentSource. For best
  128. * results, first coalesce compatible sources using coalesce().
  129. * This is intended for any operations that include expressions, and
  130. * provides a hook for those to optimize those operations.
  131. * The default implementation is to do nothing.
  132. *
  133. * @method optimize
  134. **/
  135. proto.optimize = function optimize() {
  136. };
  137. klass.GetDepsReturn = {
  138. NOT_SUPPORTED: "NOT_SUPPORTED", // This means the set should be ignored
  139. SEE_NEXT: "SEE_NEXT", // Add the next Source's deps to the set
  140. EXHAUSTIVE_FIELDS:"EXHAUSTIVE_FIELDS", // Later stages won"t need more fields from input
  141. EXHAUSTIVE_META: "EXHAUSTIVE_META", // Later stages won"t need more metadata from input
  142. EXHAUSTIVE_ALL: "EXHAUSTIVE_ALL" // Later stages won"t need either NOTE: This is an | of FIELDS and META in mongo C++
  143. };
  144. /**
  145. * Get the fields this operation needs to do its job.
  146. * Deps should be in "a.b.c" notation
  147. *
  148. * @method getDependencies
  149. * @param {Object} deps set (unique array) of strings
  150. * @returns DocumentSource.GetDepsReturn
  151. **/
  152. proto.getDependencies = function getDependencies(deps) {
  153. return klass.GetDepsReturn.NOT_SUPPORTED;
  154. };
  155. proto._serialize = function _serialize(explain) {
  156. throw new Error("not implemented");
  157. };
  158. proto.serializeToArray = function serializeToArray(array, explain) {
  159. var entry = this.serialize(explain);
  160. if (entry) {
  161. array.push(entry);
  162. }
  163. };
  164. /**
  165. * A function compatible as a getNext for document sources.
  166. * Does nothing except pass the documents through. To use,
  167. * Attach this function on a DocumentSource prototype.
  168. *
  169. * @method GET_NEXT_PASS_THROUGH
  170. * @param callback {Function}
  171. * @param callback.err {Error} An error or falsey
  172. * @param callback.doc {Object} The source's next object or null
  173. **/
  174. klass.GET_NEXT_PASS_THROUGH = function GET_NEXT_PASS_THROUGH(callback) {
  175. if (!callback) throw new Error(this.getSourceName() + ' #getNext() requires callback');
  176. var out;
  177. this.source.getNext(function(err, doc) {
  178. out = doc;
  179. return callback(err, doc);
  180. });
  181. return out; // For the sync people in da house
  182. };