CursorDocumentSource.js 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. var CursorDocumentSource = module.exports = (function(){
  2. // CONSTRUCTOR
  3. /**
  4. * Constructs and returns Documents from the objects produced by a supplied Cursor.
  5. * An object of this type may only be used by one thread, see SERVER-6123.
  6. *
  7. * This is usually put at the beginning of a chain of document sources
  8. * in order to fetch data from the database.
  9. *
  10. * @param {CursorDocumentSource.CursorWithContext} cursorWithContext the cursor to use to fetch data
  11. **/
  12. var klass = module.exports = CursorDocumentSource = function CursorDocumentSource(cursorWithContext/*, pExpCtx*/){
  13. base.call(this/*, pExpCtx*/);
  14. this.current = null;
  15. // this.ns = null;
  16. // /*
  17. // The bson dependencies must outlive the Cursor wrapped by this
  18. // source. Therefore, bson dependencies must appear before pCursor
  19. // in order cause its destructor to be called *after* pCursor's.
  20. // */
  21. // this.pQuery = null;
  22. // this.pSort = null;
  23. this._projection = null;
  24. this._cursorWithContext = cursorWithContext;
  25. if (!this._cursorWithContext || !this._cursorWithContext._cursor){
  26. throw new Error("CursorDocumentSource requires a valid cursor");
  27. }
  28. }, base = require('./DocumentSource'), proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
  29. // DEPENDENCIES
  30. //var Document = require("../Document");
  31. klass.CursorWithContext = (function (){
  32. /**
  33. * Holds a Cursor and all associated state required to access the cursor. An object of this
  34. * type may only be used by one thread.
  35. **/
  36. var klass = function CursorWithContext(){
  37. this._cursor = null;
  38. };
  39. return klass;
  40. })();
  41. /**
  42. * Release the Cursor and the read lock it requires, but without changing the other data.
  43. * Releasing the lock is required for proper concurrency, see SERVER-6123. This
  44. * functionality is also used by the explain version of pipeline execution.
  45. *
  46. * @method dispose
  47. **/
  48. proto.dispose = function dispose() {
  49. this._cursorWithContext = null;
  50. };
  51. // /**
  52. // * Record the namespace. Required for explain.
  53. // *
  54. // * @method setNamespace
  55. // * @param {String} ns the namespace
  56. // **/
  57. // proto.setNamespace = function setNamespace(ns) {}
  58. //
  59. // /**
  60. // * Record the query that was specified for the cursor this wraps, if any.
  61. // * This should be captured after any optimizations are applied to
  62. // * the pipeline so that it reflects what is really used.
  63. // * This gets used for explain output.
  64. // *
  65. // * @method setQuery
  66. // * @param {Object} pBsonObj the query to record
  67. // **/
  68. // proto.setQuery = function setQuery(pBsonObj) {};
  69. //
  70. //
  71. // /**
  72. // * Record the sort that was specified for the cursor this wraps, if any.
  73. // * This should be captured after any optimizations are applied to
  74. // * the pipeline so that it reflects what is really used.
  75. // * This gets used for explain output.
  76. // *
  77. // * @method setSort
  78. // * @param {Object} pBsonObj the query to record
  79. // **/
  80. // proto.setSort = function setSort(pBsonObj) {};
  81. /**
  82. * setProjection method
  83. *
  84. * @method setProjection
  85. * @param {Object} projection
  86. **/
  87. proto.setProjection = function setProjection(projection) {
  88. if (this._projection){
  89. throw new Error("projection is already set");
  90. }
  91. //dont think we need this yet
  92. // this._projection = new Projection();
  93. // this._projection.init(projection);
  94. //
  95. // this.cursor().fields = this._projection;
  96. this._projection = projection; //just for testing
  97. };
  98. //----------------virtuals from DocumentSource--------------
  99. /**
  100. * Is the source at EOF?
  101. *
  102. * @method eof
  103. **/
  104. proto.eof = function eof() {
  105. /* if we haven't gotten the first one yet, do so now */
  106. if (!this.current){
  107. this.findNext();
  108. }
  109. return (this.current === null);
  110. };
  111. /**
  112. * Advance the state of the DocumentSource so that it will return the next Document.
  113. * The default implementation returns false, after checking for interrupts.
  114. * Derived classes can call the default implementation in their own implementations in order to check for interrupts.
  115. *
  116. * @method advance
  117. * @returns {Boolean} whether there is another document to fetch, i.e., whether or not getCurrent() will succeed. This default implementation always returns false.
  118. **/
  119. proto.advance = function advance() {
  120. base.prototype.advance.call(this); // check for interrupts
  121. /* if we haven't gotten the first one yet, do so now */
  122. if (!this.current){
  123. this.findNext();
  124. }
  125. this.findNext();
  126. return (this.current !== null);
  127. };
  128. /**
  129. * some implementations do the equivalent of verify(!eof()) so check eof() first
  130. *
  131. * @method getCurrent
  132. * @returns {Document} the current Document without advancing
  133. **/
  134. proto.getCurrent = function getCurrent() {
  135. /* if we haven't gotten the first one yet, do so now */
  136. if (!this.current){
  137. this.findNext();
  138. }
  139. return this.current;
  140. };
  141. /**
  142. * Set the underlying source this source should use to get Documents
  143. * from.
  144. * It is an error to set the source more than once. This is to
  145. * prevent changing sources once the original source has been started;
  146. * this could break the state maintained by the DocumentSource.
  147. * This pointer is not reference counted because that has led to
  148. * some circular references. As a result, this doesn't keep
  149. * sources alive, and is only intended to be used temporarily for
  150. * the lifetime of a Pipeline::run().
  151. *
  152. * @method setSource
  153. * @param {DocumentSource} pSource the underlying source to use
  154. **/
  155. proto.setSource = function setSource(pTheSource) {
  156. throw new Error("CursorDocumentSource doesn't take a source");
  157. };
  158. /**
  159. * Create an object that represents the document source. The object
  160. * will have a single field whose name is the source's name. This
  161. * will be used by the default implementation of addToBsonArray()
  162. * to add this object to a pipeline being represented in BSON.
  163. *
  164. * @method sourceToJson
  165. * @param {Object} pBuilder BSONObjBuilder: a blank object builder to write to
  166. * @param {Boolean} explain create explain output
  167. **/
  168. proto.sourceToJson = function sourceToJson(pBuilder, explain) {
  169. /* this has no analog in the BSON world, so only allow it for explain */
  170. if (explain)
  171. {
  172. //we are not currently supporting explain in munge
  173. }
  174. };
  175. //----------------private--------------
  176. proto.findNext = function findNext(){
  177. if ( !this._cursorWithContext ) {
  178. this.current = null;
  179. return;
  180. }
  181. for( ; this.cursor().ok(); this.cursor().advance() ) {
  182. //yieldSometimes();
  183. // if ( !this.cursor().ok() ) {
  184. // // The cursor was exhausted during the yield.
  185. // break;
  186. // }
  187. // if ( !this.cursor().currentMatches() || this.cursor().currentIsDup() )
  188. // continue;
  189. // grab the matching document
  190. var documentObj;
  191. // if (this.canUseCoveredIndex()) { ... Dont need any of this, I think
  192. documentObj = this.cursor().current();
  193. this.current = documentObj;
  194. this.cursor().advance();
  195. return;
  196. }
  197. // If we got here, there aren't any more documents.
  198. // The CursorWithContext (and its read lock) must be released, see SERVER-6123.
  199. this.dispose();
  200. this.current = null;
  201. };
  202. proto.cursor = function cursor(){
  203. if( this._cursorWithContext && this._cursorWithContext._cursor){
  204. return this._cursorWithContext._cursor;
  205. }
  206. throw new Error("cursor not defined");
  207. };
  208. // proto.chunkMgr = function chunkMgr(){};
  209. // proto.canUseCoveredIndex = function canUseCoveredIndex(){};
  210. // proto.yieldSometimes = function yieldSometimes(){};
  211. return klass;
  212. })();