CursorDocumentSource.js 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. "use strict";
  2. var CursorDocumentSource = module.exports = (function(){
  3. // CONSTRUCTOR
  4. /**
  5. * Constructs and returns Documents from the objects produced by a supplied Cursor.
  6. * An object of this type may only be used by one thread, see SERVER-6123.
  7. *
  8. * This is usually put at the beginning of a chain of document sources
  9. * in order to fetch data from the database.
  10. *
  11. * @class CursorDocumentSource
  12. * @namespace munge.pipeline.documentsource
  13. * @module munge
  14. * @constructor
  15. * @param {CursorDocumentSource.CursorWithContext} cursorWithContext the cursor to use to fetch data
  16. **/
  17. var klass = module.exports = CursorDocumentSource = function CursorDocumentSource(cursorWithContext/*, pExpCtx*/){
  18. base.call(this/*, pExpCtx*/);
  19. this.current = null;
  20. // this.ns = null;
  21. // /*
  22. // The bson dependencies must outlive the Cursor wrapped by this
  23. // source. Therefore, bson dependencies must appear before pCursor
  24. // in order cause its destructor to be called *after* pCursor's.
  25. // */
  26. // this.pQuery = null;
  27. // this.pSort = null;
  28. this._projection = null;
  29. this._cursorWithContext = cursorWithContext;
  30. if (!this._cursorWithContext || !this._cursorWithContext._cursor){
  31. throw new Error("CursorDocumentSource requires a valid cursor");
  32. }
  33. }, base = require('./DocumentSource'), proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
  34. // DEPENDENCIES
  35. //var Document = require("../Document");
  36. klass.CursorWithContext = (function (){
  37. /**
  38. * Holds a Cursor and all associated state required to access the cursor. An object of this
  39. * type may only be used by one thread.
  40. **/
  41. var klass = function CursorWithContext(){
  42. this._cursor = null;
  43. };
  44. return klass;
  45. })();
  46. /**
  47. * Release the Cursor and the read lock it requires, but without changing the other data.
  48. * Releasing the lock is required for proper concurrency, see SERVER-6123. This
  49. * functionality is also used by the explain version of pipeline execution.
  50. *
  51. * @method dispose
  52. **/
  53. proto.dispose = function dispose() {
  54. this._cursorWithContext = null;
  55. };
  56. // /**
  57. // * Record the namespace. Required for explain.
  58. // *
  59. // * @method setNamespace
  60. // * @param {String} ns the namespace
  61. // **/
  62. // proto.setNamespace = function setNamespace(ns) {}
  63. //
  64. // /**
  65. // * Record the query that was specified for the cursor this wraps, if any.
  66. // * This should be captured after any optimizations are applied to
  67. // * the pipeline so that it reflects what is really used.
  68. // * This gets used for explain output.
  69. // *
  70. // * @method setQuery
  71. // * @param {Object} pBsonObj the query to record
  72. // **/
  73. // proto.setQuery = function setQuery(pBsonObj) {};
  74. //
  75. //
  76. // /**
  77. // * Record the sort that was specified for the cursor this wraps, if any.
  78. // * This should be captured after any optimizations are applied to
  79. // * the pipeline so that it reflects what is really used.
  80. // * This gets used for explain output.
  81. // *
  82. // * @method setSort
  83. // * @param {Object} pBsonObj the query to record
  84. // **/
  85. // proto.setSort = function setSort(pBsonObj) {};
  86. /**
  87. * setProjection method
  88. *
  89. * @method setProjection
  90. * @param {Object} projection
  91. **/
  92. proto.setProjection = function setProjection(projection) {
  93. if (this._projection){
  94. throw new Error("projection is already set");
  95. }
  96. //dont think we need this yet
  97. // this._projection = new Projection();
  98. // this._projection.init(projection);
  99. //
  100. // this.cursor().fields = this._projection;
  101. this._projection = projection; //just for testing
  102. };
  103. //----------------virtuals from DocumentSource--------------
  104. /**
  105. * Is the source at EOF?
  106. *
  107. * @method eof
  108. **/
  109. proto.eof = function eof() {
  110. /* if we haven't gotten the first one yet, do so now */
  111. if (!this.current){
  112. this.findNext();
  113. }
  114. return (this.current === null);
  115. };
  116. /**
  117. * Advance the state of the DocumentSource so that it will return the next Document.
  118. * The default implementation returns false, after checking for interrupts.
  119. * Derived classes can call the default implementation in their own implementations in order to check for interrupts.
  120. *
  121. * @method advance
  122. * @returns {Boolean} whether there is another document to fetch, i.e., whether or not getCurrent() will succeed. This default implementation always returns false.
  123. **/
  124. proto.advance = function advance() {
  125. base.prototype.advance.call(this); // check for interrupts
  126. /* if we haven't gotten the first one yet, do so now */
  127. if (!this.current){
  128. this.findNext();
  129. }
  130. this.findNext();
  131. return (this.current !== null);
  132. };
  133. /**
  134. * some implementations do the equivalent of verify(!eof()) so check eof() first
  135. *
  136. * @method getCurrent
  137. * @returns {Document} the current Document without advancing
  138. **/
  139. proto.getCurrent = function getCurrent() {
  140. /* if we haven't gotten the first one yet, do so now */
  141. if (!this.current){
  142. this.findNext();
  143. }
  144. return this.current;
  145. };
  146. /**
  147. * Set the underlying source this source should use to get Documents
  148. * from.
  149. * It is an error to set the source more than once. This is to
  150. * prevent changing sources once the original source has been started;
  151. * this could break the state maintained by the DocumentSource.
  152. * This pointer is not reference counted because that has led to
  153. * some circular references. As a result, this doesn't keep
  154. * sources alive, and is only intended to be used temporarily for
  155. * the lifetime of a Pipeline::run().
  156. *
  157. * @method setSource
  158. * @param {DocumentSource} pSource the underlying source to use
  159. **/
  160. proto.setSource = function setSource(pTheSource, callback) {
  161. //TODO: This needs to put back at some point. Once async is properly supported
  162. //throw new Error("CursorDocumentSource doesn't take a source");
  163. if (callback)
  164. return process.nextTick(callback);
  165. };
  166. /**
  167. * Create an object that represents the document source. The object
  168. * will have a single field whose name is the source's name. This
  169. * will be used by the default implementation of addToBsonArray()
  170. * to add this object to a pipeline being represented in BSON.
  171. *
  172. * @method sourceToJson
  173. * @param {Object} pBuilder BSONObjBuilder: a blank object builder to write to
  174. * @param {Boolean} explain create explain output
  175. **/
  176. proto.sourceToJson = function sourceToJson(pBuilder, explain) {
  177. /* this has no analog in the BSON world, so only allow it for explain */
  178. if (explain)
  179. {
  180. //we are not currently supporting explain in munge
  181. }
  182. };
  183. //----------------private--------------
  184. proto.findNext = function findNext(){
  185. if ( !this._cursorWithContext ) {
  186. this.current = null;
  187. return;
  188. }
  189. for( ; this.cursor().ok(); this.cursor().advance() ) {
  190. //yieldSometimes();
  191. // if ( !this.cursor().ok() ) {
  192. // // The cursor was exhausted during the yield.
  193. // break;
  194. // }
  195. // if ( !this.cursor().currentMatches() || this.cursor().currentIsDup() )
  196. // continue;
  197. // grab the matching document
  198. var documentObj;
  199. // if (this.canUseCoveredIndex()) { ... Dont need any of this, I think
  200. documentObj = this.cursor().current();
  201. this.current = documentObj;
  202. this.cursor().advance();
  203. return;
  204. }
  205. // If we got here, there aren't any more documents.
  206. // The CursorWithContext (and its read lock) must be released, see SERVER-6123.
  207. this.dispose();
  208. this.current = null;
  209. };
  210. proto.cursor = function cursor(){
  211. if( this._cursorWithContext && this._cursorWithContext._cursor){
  212. return this._cursorWithContext._cursor;
  213. }
  214. throw new Error("cursor not defined");
  215. };
  216. // proto.chunkMgr = function chunkMgr(){};
  217. // proto.canUseCoveredIndex = function canUseCoveredIndex(){};
  218. // proto.yieldSometimes = function yieldSometimes(){};
  219. return klass;
  220. })();