CursorDocumentSource.js 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. "use strict";
  2. var DocumentSource = require('./DocumentSource');
  3. // Mimicking max memory size from mongo/db/query/new_find.cpp
  4. // Need to actually decide some size for this?
  5. var MAX_BATCH_DOCS = 150;
  6. /**
  7. * Constructs and returns Documents from the objects produced by a supplied Cursor.
  8. * An object of this type may only be used by one thread, see SERVER-6123.
  9. *
  10. * This is usually put at the beginning of a chain of document sources
  11. * in order to fetch data from the database.
  12. *
  13. * @class CursorDocumentSource
  14. * @namespace mungedb-aggregate.pipeline.documentSources
  15. * @module mungedb-aggregate
  16. * @constructor
  17. * @param {CursorDocumentSource.CursorWithContext} cursorWithContext the cursor to use to fetch data
  18. **/
  19. var CursorDocumentSource = module.exports = CursorDocumentSource = function CursorDocumentSource(cursorWithContext, expCtx){
  20. base.call(this, expCtx);
  21. this.current = null;
  22. // this.ns = null;
  23. // /*
  24. // The bson dependencies must outlive the Cursor wrapped by this
  25. // source. Therefore, bson dependencies must appear before pCursor
  26. // in order cause its destructor to be called *after* pCursor's.
  27. // */
  28. // this.query = null;
  29. // this.sort = null;
  30. this._projection = null;
  31. this._cursorWithContext = cursorWithContext;
  32. this._curIdx = 0;
  33. this._currentBatch = [];
  34. this._limit = undefined;
  35. this._docsAddedToBatches = 0;
  36. if (!this._cursorWithContext || !this._cursorWithContext._cursor) throw new Error("CursorDocumentSource requires a valid cursorWithContext");
  37. }, klass = CursorDocumentSource, base = require('./DocumentSource'), proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
  38. klass.CursorWithContext = (function (){
  39. /**
  40. * Holds a Cursor and all associated state required to access the cursor.
  41. * @class CursorWithContext
  42. * @namespace mungedb-aggregate.pipeline.documentSources.CursorDocumentSource
  43. * @module mungedb-aggregate
  44. * @constructor
  45. **/
  46. var klass = function CursorWithContext(ns){
  47. this._cursor = null;
  48. };
  49. return klass;
  50. })();
  51. /**
  52. * Release the Cursor and the read lock it requires, but without changing the other data.
  53. * Releasing the lock is required for proper concurrency, see SERVER-6123. This
  54. * functionality is also used by the explain version of pipeline execution.
  55. *
  56. * @method dispose
  57. **/
  58. proto.dispose = function dispose() {
  59. this._cursorWithContext = null;
  60. this._currentBatch = [];
  61. this._curIdx = 0;
  62. };
  63. proto.getSourceName = function getSourceName() {
  64. return "$cursor";
  65. };
  66. proto.getNext = function getNext(callback) {
  67. if (this._currentBatch.length <= this._curIdx) {
  68. this.loadBatch();
  69. if (!this._currentBatch) {
  70. if (callback)
  71. return callback(DocumentSource.EOF);
  72. return DocumentSource.EOF;
  73. }
  74. }
  75. // Don't unshift. It's expensiver.
  76. var out = this._currentBatch[this._curIdx];
  77. this._curIdx++;
  78. if (callback)
  79. return callback(out);
  80. return out;
  81. };
  82. proto.coalesce = function coalesce(nextSource) {
  83. if (!this._limit) {
  84. this._limit = nextSource;
  85. return this._limit;
  86. } else {
  87. return this._limit.coalesce(nextSource);
  88. }
  89. };
  90. ///**
  91. // * Record the namespace. Required for explain.
  92. // *
  93. // * @method setNamespace
  94. // * @param {String} ns the namespace
  95. // **/
  96. //proto.setNamespace = function setNamespace(ns) {}
  97. //
  98. ///**
  99. // * Record the query that was specified for the cursor this wraps, if any.
  100. // * This should be captured after any optimizations are applied to
  101. // * the pipeline so that it reflects what is really used.
  102. // * This gets used for explain output.
  103. // *
  104. // * @method setQuery
  105. // * @param {Object} pBsonObj the query to record
  106. // **/
  107. proto.setQuery = function setQuery(query) {
  108. this._query = query;
  109. };
  110. ///**
  111. // * Record the sort that was specified for the cursor this wraps, if any.
  112. // * This should be captured after any optimizations are applied to
  113. // * the pipeline so that it reflects what is really used.
  114. // * This gets used for explain output.
  115. // *
  116. // * @method setSort
  117. // * @param {Object} pBsonObj the query to record
  118. // **/
  119. //proto.setSort = function setSort(pBsonObj) {};
  120. /**
  121. * setProjection method
  122. *
  123. * @method setProjection
  124. * @param {Object} projection
  125. **/
  126. proto.setProjection = function setProjection(projection, deps) {
  127. if (this._projection){
  128. throw new Error("projection is already set");
  129. }
  130. //dont think we need this yet
  131. // this._projection = new Projection();
  132. // this._projection.init(projection);
  133. //
  134. // this.cursor().fields = this._projection;
  135. this._projection = projection; //just for testing
  136. this._dependencies = deps;
  137. };
  138. //----------------virtuals from DocumentSource--------------
  139. /**
  140. * Set the underlying source this source should use to get Documents
  141. * from.
  142. * It is an error to set the source more than once. This is to
  143. * prevent changing sources once the original source has been started;
  144. * this could break the state maintained by the DocumentSource.
  145. * This pointer is not reference counted because that has led to
  146. * some circular references. As a result, this doesn't keep
  147. * sources alive, and is only intended to be used temporarily for
  148. * the lifetime of a Pipeline::run().
  149. *
  150. * @method setSource
  151. * @param source {DocumentSource} the underlying source to use
  152. * @param callback {Function} a `mungedb-aggregate`-specific extension to the API to half-way support reading from async sources
  153. **/
  154. proto.setSource = function setSource(theSource) {
  155. if (theSource) throw new Error("CursorDocumentSource doesn't take a source"); //TODO: This needs to put back without the if once async is fully and properly supported
  156. };
  157. proto.serialize = function serialize(explain) {
  158. if (!explain)
  159. return null;
  160. if (!this._cursorWithContext)
  161. throw new Error("code 17135; Cursor deleted.");
  162. // A stab at what mongo wants
  163. return {
  164. query: this._query,
  165. sort: this._sort ? this._sort : null,
  166. limit: this._limit ? this._limit : null,
  167. fields: this._projection ? this._projection : null,
  168. indexonly: false,
  169. cursorType: this._cursorWithContext ? "cursor" : null
  170. };
  171. };
  172. // LimitDocumentSource has the setLimit function which trickles down to any documentsource
  173. proto.getLimit = function getLimit() {
  174. return this._limit;
  175. };
  176. //----------------private--------------
  177. //proto.chunkMgr = function chunkMgr(){};
  178. //proto.canUseCoveredIndex = function canUseCoveredIndex(){};
  179. //proto.yieldSometimes = function yieldSometimes(){};
  180. proto.loadBatch = function loadBatch() {
  181. var nDocs = 0,
  182. cursor = this._cursorWithContext ? this._cursorWithContext._cursor : null;
  183. if (!cursor)
  184. return this.dispose();
  185. for(;cursor.ok(); cursor.advance()) {
  186. if (!cursor.ok())
  187. break;
  188. // these methods do not exist
  189. // if (!cursor.currentMatches() || cursor.currentIsDup())
  190. // continue;
  191. var next = cursor.current();
  192. this._currentBatch.push(this._projection ? this.documentFromBsonDeps(next, this._dependencies) : next);
  193. if (this._limit) {
  194. if (++this._docsAddedToBatches == this._limit.getLimit())
  195. break;
  196. if (this._docsAddedToBatches >= this._limit.getLimit()) {
  197. throw new Error("added documents to the batch over limit size");
  198. }
  199. }
  200. // Mongo uses number of bytes, but that doesn't make sense here. Yield when nDocs is over a threshold
  201. if (nDocs > MAX_BATCH_DOCS) {
  202. this._curIdx++; // advance the deque
  203. nDocs++;
  204. return;
  205. }
  206. }
  207. };