DocumentSource.js 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. "use strict";
  2. var DocumentSource = module.exports = (function(){
  3. // CONSTRUCTOR
  4. /**
  5. * A base class for all document sources
  6. *
  7. * @class DocumentSource
  8. * @namespace munge.pipeline.documentsource
  9. * @module munge
  10. * @constructor
  11. * @param {ExpressionContext}
  12. **/
  13. var klass = module.exports = DocumentSource = function DocumentSource(/*pCtx*/){
  14. if(arguments.length !== 0) throw new Error("zero args expected");
  15. /*
  16. Most DocumentSources have an underlying source they get their data
  17. from. This is a convenience for them.
  18. The default implementation of setSource() sets this; if you don't
  19. need a source, override that to verify(). The default is to
  20. verify() if this has already been set.
  21. */
  22. this.pSource = null;
  23. /*
  24. The zero-based user-specified pipeline step. Used for diagnostics.
  25. Will be set to -1 for artificial pipeline steps that were not part
  26. of the original user specification.
  27. */
  28. this.step = -1;
  29. //we dont need this because we are not sharding
  30. //intrusive_ptr<ExpressionContext> pExpCtx;
  31. //this.pExpCtx = pCtx;
  32. /*
  33. for explain: # of rows returned by this source
  34. This is *not* unsigned so it can be passed to JSONObjBuilder.append().
  35. */
  36. this.nRowsOut = 0;
  37. }, base = Object, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
  38. /*
  39. class DocumentSource :
  40. public IntrusiveCounterUnsigned,
  41. public StringWriter {
  42. public:
  43. virtual ~DocumentSource();
  44. // virtuals from StringWriter
  45. virtual void writeString(stringstream &ss) const;
  46. */
  47. /**
  48. * Set the step for a user-specified pipeline step.
  49. *
  50. * @method setPipelineStep
  51. * @param {Number} step number 0 to n.
  52. **/
  53. proto.setPipelineStep = function setPipelineStep(step) {
  54. this.step = step;
  55. };
  56. /**
  57. * Get the user-specified pipeline step.
  58. *
  59. * @method getPipelineStep
  60. * @returns {Number} step
  61. **/
  62. proto.getPipelineStep = function getPipelineStep() {
  63. return this.step;
  64. };
  65. /**
  66. * Is the source at EOF?
  67. *
  68. * @method eof
  69. **/
  70. proto.eof = function eof() {
  71. throw new Error("not implemented");
  72. };
  73. /**
  74. * Advance the state of the DocumentSource so that it will return the next Document.
  75. * The default implementation returns false, after checking for interrupts.
  76. * Derived classes can call the default implementation in their own implementations in order to check for interrupts.
  77. *
  78. * @method advance
  79. * @returns {Boolean} whether there is another document to fetch, i.e., whether or not getCurrent() will succeed. This default implementation always returns false.
  80. **/
  81. proto.advance = function advance() {
  82. //pExpCtx->checkForInterrupt(); // might not return
  83. return false;
  84. };
  85. /**
  86. * some implementations do the equivalent of verify(!eof()) so check eof() first
  87. *
  88. * @method getCurrent
  89. * @returns {Document} the current Document without advancing
  90. **/
  91. proto.getCurrent = function getCurrent() {
  92. throw new Error("not implemented");
  93. };
  94. /**
  95. * Inform the source that it is no longer needed and may release its resources. After
  96. * dispose() is called the source must still be able to handle iteration requests, but may
  97. * become eof().
  98. * NOTE: For proper mutex yielding, dispose() must be called on any DocumentSource that will
  99. * not be advanced until eof(), see SERVER-6123.
  100. *
  101. * @method dispose
  102. **/
  103. proto.dispose = function dispose() {
  104. if ( this.pSource ) {
  105. // This is required for the DocumentSourceCursor to release its read lock, see
  106. // SERVER-6123.
  107. this.pSource.dispose();
  108. }
  109. };
  110. /**
  111. * Get the source's name.
  112. *
  113. * @method getSourceName
  114. * @returns {String} the string name of the source as a constant string; this is static, and there's no need to worry about adopting it
  115. **/
  116. proto.getSourceName = function getSourceName() {
  117. return "[UNKNOWN]";
  118. };
  119. /**
  120. * Set the underlying source this source should use to get Documents
  121. * from.
  122. * It is an error to set the source more than once. This is to
  123. * prevent changing sources once the original source has been started;
  124. * this could break the state maintained by the DocumentSource.
  125. * This pointer is not reference counted because that has led to
  126. * some circular references. As a result, this doesn't keep
  127. * sources alive, and is only intended to be used temporarily for
  128. * the lifetime of a Pipeline::run().
  129. *
  130. * @method setSource
  131. * @param {DocumentSource} pSource the underlying source to use
  132. **/
  133. proto.setSource = function setSource(pTheSource, callback) {
  134. if(this.pSource){
  135. throw new Error("It is an error to set the source more than once");
  136. }
  137. this.pSource = pTheSource;
  138. if (callback)
  139. return process.nextTick(function(){callback();});
  140. };
  141. /**
  142. * Attempt to coalesce this DocumentSource with its successor in the
  143. * document processing pipeline. If successful, the successor
  144. * DocumentSource should be removed from the pipeline and discarded.
  145. * If successful, this operation can be applied repeatedly, in an
  146. * attempt to coalesce several sources together.
  147. * The default implementation is to do nothing, and return false.
  148. *
  149. * @method coalesce
  150. * @param {DocumentSource} pNextSource the next source in the document processing chain.
  151. * @returns {Boolean} whether or not the attempt to coalesce was successful or not; if the attempt was not successful, nothing has been changed
  152. **/
  153. proto.coalesce = function coalesce(pNextSource) {
  154. return false;
  155. };
  156. /**
  157. * Optimize the pipeline operation, if possible. This is a local
  158. * optimization that only looks within this DocumentSource. For best
  159. * results, first coalesce compatible sources using coalesce().
  160. * This is intended for any operations that include expressions, and
  161. * provides a hook for those to optimize those operations.
  162. * The default implementation is to do nothing.
  163. *
  164. * @method optimize
  165. **/
  166. proto.optimize = function optimize() {
  167. };
  168. klass.GetDepsReturn = {
  169. NOT_SUPPORTED:"NOT_SUPPORTED", // This means the set should be ignored
  170. EXHAUSTIVE:"EXHAUSTIVE", // This means that everything needed should be in the set
  171. SEE_NEXT:"SEE_NEXT" // Add the next Source's deps to the set
  172. };
  173. /**
  174. * Get the fields this operation needs to do its job.
  175. * Deps should be in "a.b.c" notation
  176. *
  177. * @method getDependencies
  178. * @param {Object} deps set (unique array) of strings
  179. * @returns DocumentSource.GetDepsReturn
  180. **/
  181. proto.getDependencies = function getDependencies(deps) {
  182. return klass.GetDepsReturn.NOT_SUPPORTED;
  183. };
  184. /**
  185. * This takes dependencies from getDependencies and
  186. * returns a projection that includes all of them
  187. *
  188. * @method depsToProjection
  189. * @param {Object} deps set (unique array) of strings
  190. * @returns {Object} JSONObj
  191. **/
  192. klass.depsToProjection = function depsToProjection(deps) {
  193. var bb = {};
  194. if (deps._id === undefined)
  195. bb._id = 0;
  196. var last = "";
  197. Object.keys(deps).sort().forEach(function(it){
  198. if (last !== "" && it.slice(0, last.length) === last){
  199. // we are including a parent of *it so we don't need to
  200. // include this field explicitly. In fact, due to
  201. // SERVER-6527 if we included this field, the parent
  202. // wouldn't be fully included.
  203. return;
  204. }
  205. last = it + ".";
  206. bb[it] = 1;
  207. });
  208. return bb;
  209. };
  210. /**
  211. * Add the DocumentSource to the array builder.
  212. * The default implementation calls sourceToJson() in order to
  213. * convert the inner part of the object which will be added to the
  214. * array being built here.
  215. *
  216. * @method addToJsonArray
  217. * @param {Array} pBuilder JSONArrayBuilder: the array builder to add the operation to.
  218. * @param {Boolean} explain create explain output
  219. * @returns {Object}
  220. **/
  221. proto.addToJsonArray = function addToJsonArray(pBuilder, explain) {
  222. pBuilder.push(this.sourceToJson({}, explain));
  223. };
  224. /**
  225. * Create an object that represents the document source. The object
  226. * will have a single field whose name is the source's name. This
  227. * will be used by the default implementation of addToJsonArray()
  228. * to add this object to a pipeline being represented in JSON.
  229. *
  230. * @method sourceToJson
  231. * @param {Object} pBuilder JSONObjBuilder: a blank object builder to write to
  232. * @param {Boolean} explain create explain output
  233. **/
  234. proto.sourceToJson = function sourceToJson(pBuilder, explain) {
  235. throw new Error("not implemented");
  236. };
  237. /**
  238. * Reset the document source so that it is ready for a new stream of data.
  239. * Note that this is a deviation from the mongo implementation.
  240. *
  241. * @method reset
  242. **/
  243. proto.reset = function reset(){
  244. throw new Error("not implemented");
  245. };
  246. return klass;
  247. })();