CursorDocumentSource.js 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. "use strict";
  2. /**
  3. * Constructs and returns Documents from the objects produced by a supplied Cursor.
  4. * An object of this type may only be used by one thread, see SERVER-6123.
  5. *
  6. * This is usually put at the beginning of a chain of document sources
  7. * in order to fetch data from the database.
  8. *
  9. * @class CursorDocumentSource
  10. * @namespace mungedb-aggregate.pipeline.documentSources
  11. * @module mungedb-aggregate
  12. * @constructor
  13. * @param {CursorDocumentSource.CursorWithContext} cursorWithContext the cursor to use to fetch data
  14. **/
  15. var CursorDocumentSource = module.exports = CursorDocumentSource = function CursorDocumentSource(cursorWithContext, expCtx){
  16. base.call(this, expCtx);
  17. this.current = null;
  18. // this.ns = null;
  19. // /*
  20. // The bson dependencies must outlive the Cursor wrapped by this
  21. // source. Therefore, bson dependencies must appear before pCursor
  22. // in order cause its destructor to be called *after* pCursor's.
  23. // */
  24. // this.query = null;
  25. // this.sort = null;
  26. this._projection = null;
  27. this._cursorWithContext = cursorWithContext;
  28. if (!this._cursorWithContext || !this._cursorWithContext._cursor) throw new Error("CursorDocumentSource requires a valid cursorWithContext");
  29. }, klass = CursorDocumentSource, base = require('./DocumentSource'), proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
  30. klass.CursorWithContext = (function (){
  31. /**
  32. * Holds a Cursor and all associated state required to access the cursor.
  33. * @class CursorWithContext
  34. * @namespace mungedb-aggregate.pipeline.documentSources.CursorDocumentSource
  35. * @module mungedb-aggregate
  36. * @constructor
  37. **/
  38. var klass = function CursorWithContext(ns){
  39. this._cursor = null;
  40. };
  41. return klass;
  42. })();
  43. /**
  44. * Release the Cursor and the read lock it requires, but without changing the other data.
  45. * Releasing the lock is required for proper concurrency, see SERVER-6123. This
  46. * functionality is also used by the explain version of pipeline execution.
  47. *
  48. * @method dispose
  49. **/
  50. proto.dispose = function dispose() {
  51. this._cursorWithContext = null;
  52. };
  53. ///**
  54. // * Record the namespace. Required for explain.
  55. // *
  56. // * @method setNamespace
  57. // * @param {String} ns the namespace
  58. // **/
  59. //proto.setNamespace = function setNamespace(ns) {}
  60. //
  61. ///**
  62. // * Record the query that was specified for the cursor this wraps, if any.
  63. // * This should be captured after any optimizations are applied to
  64. // * the pipeline so that it reflects what is really used.
  65. // * This gets used for explain output.
  66. // *
  67. // * @method setQuery
  68. // * @param {Object} pBsonObj the query to record
  69. // **/
  70. //proto.setQuery = function setQuery(pBsonObj) {};
  71. //
  72. //
  73. ///**
  74. // * Record the sort that was specified for the cursor this wraps, if any.
  75. // * This should be captured after any optimizations are applied to
  76. // * the pipeline so that it reflects what is really used.
  77. // * This gets used for explain output.
  78. // *
  79. // * @method setSort
  80. // * @param {Object} pBsonObj the query to record
  81. // **/
  82. //proto.setSort = function setSort(pBsonObj) {};
  83. /**
  84. * setProjection method
  85. *
  86. * @method setProjection
  87. * @param {Object} projection
  88. **/
  89. proto.setProjection = function setProjection(projection) {
  90. if (this._projection){
  91. throw new Error("projection is already set");
  92. }
  93. //dont think we need this yet
  94. // this._projection = new Projection();
  95. // this._projection.init(projection);
  96. //
  97. // this.cursor().fields = this._projection;
  98. this._projection = projection; //just for testing
  99. };
  100. //----------------virtuals from DocumentSource--------------
  101. /**
  102. * Is the source at EOF?
  103. * @method eof
  104. **/
  105. proto.eof = function eof() {
  106. if (!this.current) this.findNext(); // if we haven't gotten the first one yet, do so now
  107. return (this.current === null);
  108. };
  109. /**
  110. * Advance the state of the DocumentSource so that it will return the next Document.
  111. * The default implementation returns false, after checking for interrupts.
  112. * Derived classes can call the default implementation in their own implementations in order to check for interrupts.
  113. *
  114. * @method advance
  115. * @returns {Boolean} whether there is another document to fetch, i.e., whether or not getCurrent() will succeed. This default implementation always returns false.
  116. **/
  117. proto.advance = function advance() {
  118. base.prototype.advance.call(this); // check for interrupts
  119. if (!this.current) this.findNext(); // if we haven't gotten the first one yet, do so now
  120. this.findNext();
  121. return (this.current !== null);
  122. };
  123. /**
  124. * some implementations do the equivalent of verify(!eof()) so check eof() first
  125. * @method getCurrent
  126. * @returns {Document} the current Document without advancing
  127. **/
  128. proto.getCurrent = function getCurrent() {
  129. if (!this.current) this.findNext(); // if we haven't gotten the first one yet, do so now
  130. return this.current;
  131. };
  132. /**
  133. * Set the underlying source this source should use to get Documents
  134. * from.
  135. * It is an error to set the source more than once. This is to
  136. * prevent changing sources once the original source has been started;
  137. * this could break the state maintained by the DocumentSource.
  138. * This pointer is not reference counted because that has led to
  139. * some circular references. As a result, this doesn't keep
  140. * sources alive, and is only intended to be used temporarily for
  141. * the lifetime of a Pipeline::run().
  142. *
  143. * @method setSource
  144. * @param source {DocumentSource} the underlying source to use
  145. * @param callback {Function} a `mungedb-aggregate`-specific extension to the API to half-way support reading from async sources
  146. **/
  147. proto.setSource = function setSource(theSource, callback) {
  148. if (theSource) throw new Error("CursorDocumentSource doesn't take a source"); //TODO: This needs to put back without the if once async is fully and properly supported
  149. if (callback) return setTimeout(callback, 0);
  150. };
  151. /**
  152. * Create an object that represents the document source. The object
  153. * will have a single field whose name is the source's name. This
  154. * will be used by the default implementation of addToBsonArray()
  155. * to add this object to a pipeline being represented in BSON.
  156. *
  157. * @method sourceToJson
  158. * @param {Object} pBuilder BSONObjBuilder: a blank object builder to write to
  159. * @param {Boolean} explain create explain output
  160. **/
  161. proto.sourceToJson = function sourceToJson(pBuilder, explain) {
  162. /* this has no analog in the BSON world, so only allow it for explain */
  163. //if (explain){
  164. ////we are not currently supporting explain in mungedb-aggregate
  165. //}
  166. };
  167. //----------------private--------------
  168. proto.findNext = function findNext(){
  169. if ( !this._cursorWithContext ) {
  170. this.current = null;
  171. return;
  172. }
  173. for( ; this.cursor().ok(); this.cursor().advance() ) {
  174. //yieldSometimes();
  175. // if ( !this.cursor().ok() ) {
  176. // // The cursor was exhausted during the yield.
  177. // break;
  178. // }
  179. // if ( !this.cursor().currentMatches() || this.cursor().currentIsDup() )
  180. // continue;
  181. // grab the matching document
  182. var documentObj;
  183. // if (this.canUseCoveredIndex()) { ... Dont need any of this, I think
  184. documentObj = this.cursor().current();
  185. this.current = documentObj;
  186. this.cursor().advance();
  187. return;
  188. }
  189. // If we got here, there aren't any more documents.
  190. // The CursorWithContext (and its read lock) must be released, see SERVER-6123.
  191. this.dispose();
  192. this.current = null;
  193. };
  194. proto.cursor = function cursor(){
  195. if( this._cursorWithContext && this._cursorWithContext._cursor){
  196. return this._cursorWithContext._cursor;
  197. }
  198. throw new Error("cursor not defined");
  199. };
  200. //proto.chunkMgr = function chunkMgr(){};
  201. //proto.canUseCoveredIndex = function canUseCoveredIndex(){};
  202. //proto.yieldSometimes = function yieldSometimes(){};