CursorDocumentSource.js 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. "use strict";
  2. // Mimicking max memory size from mongo/db/query/new_find.cpp
  3. // Need to actually decide some size for this?
  4. var MAX_BATCH_DOCS = 150;
  5. /**
  6. * Constructs and returns Documents from the objects produced by a supplied Cursor.
  7. * An object of this type may only be used by one thread, see SERVER-6123.
  8. *
  9. * This is usually put at the beginning of a chain of document sources
  10. * in order to fetch data from the database.
  11. *
  12. * @class CursorDocumentSource
  13. * @namespace mungedb-aggregate.pipeline.documentSources
  14. * @module mungedb-aggregate
  15. * @constructor
  16. * @param {CursorDocumentSource.CursorWithContext} cursorWithContext the cursor to use to fetch data
  17. **/
  18. var CursorDocumentSource = module.exports = CursorDocumentSource = function CursorDocumentSource(cursorWithContext, expCtx){
  19. base.call(this, expCtx);
  20. this.current = null;
  21. // this.ns = null;
  22. // /*
  23. // The bson dependencies must outlive the Cursor wrapped by this
  24. // source. Therefore, bson dependencies must appear before pCursor
  25. // in order cause its destructor to be called *after* pCursor's.
  26. // */
  27. // this.query = null;
  28. // this.sort = null;
  29. this._projection = null;
  30. this._cursorWithContext = cursorWithContext;
  31. this._curIdx = 0;
  32. this._currentBatch = [];
  33. this._limit = undefined;
  34. this._docsAddedToBatches = 0;
  35. if (!this._cursorWithContext || !this._cursorWithContext._cursor) throw new Error("CursorDocumentSource requires a valid cursorWithContext");
  36. }, klass = CursorDocumentSource, base = require('./DocumentSource'), proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
  37. klass.CursorWithContext = (function (){
  38. /**
  39. * Holds a Cursor and all associated state required to access the cursor.
  40. * @class CursorWithContext
  41. * @namespace mungedb-aggregate.pipeline.documentSources.CursorDocumentSource
  42. * @module mungedb-aggregate
  43. * @constructor
  44. **/
  45. var klass = function CursorWithContext(ns){
  46. this._cursor = null;
  47. };
  48. return klass;
  49. })();
  50. /**
  51. * Release the Cursor and the read lock it requires, but without changing the other data.
  52. * Releasing the lock is required for proper concurrency, see SERVER-6123. This
  53. * functionality is also used by the explain version of pipeline execution.
  54. *
  55. * @method dispose
  56. **/
  57. proto.dispose = function dispose() {
  58. this._cursorWithContext = null;
  59. this._currentBatch = [];
  60. this._curIdx = 0;
  61. };
  62. proto.getSourceName = function getSourceName() {
  63. return "$cursor";
  64. };
  65. proto.getNext = function getNext(callback) {
  66. if (this._currentBatch.length <= this._curIdx) {
  67. this.loadBatch();
  68. if (!this._currentBatch) {
  69. if (callback)
  70. return callback(null);
  71. return null;
  72. }
  73. }
  74. // Don't unshift. It's expensiver.
  75. var out = this._currentBatch[this._curIdx];
  76. this._curIdx++;
  77. if (callback)
  78. return callback(out);
  79. return out;
  80. };
  81. proto.coalesce = function coalesce(nextSource) {
  82. if (!this._limit) {
  83. this._limit = nextSource;
  84. return this._limit;
  85. } else {
  86. return this._limit.coalesce(nextSource);
  87. }
  88. };
  89. ///**
  90. // * Record the namespace. Required for explain.
  91. // *
  92. // * @method setNamespace
  93. // * @param {String} ns the namespace
  94. // **/
  95. //proto.setNamespace = function setNamespace(ns) {}
  96. //
  97. ///**
  98. // * Record the query that was specified for the cursor this wraps, if any.
  99. // * This should be captured after any optimizations are applied to
  100. // * the pipeline so that it reflects what is really used.
  101. // * This gets used for explain output.
  102. // *
  103. // * @method setQuery
  104. // * @param {Object} pBsonObj the query to record
  105. // **/
  106. proto.setQuery = function setQuery(query) {
  107. this._query = query;
  108. };
  109. ///**
  110. // * Record the sort that was specified for the cursor this wraps, if any.
  111. // * This should be captured after any optimizations are applied to
  112. // * the pipeline so that it reflects what is really used.
  113. // * This gets used for explain output.
  114. // *
  115. // * @method setSort
  116. // * @param {Object} pBsonObj the query to record
  117. // **/
  118. //proto.setSort = function setSort(pBsonObj) {};
  119. /**
  120. * setProjection method
  121. *
  122. * @method setProjection
  123. * @param {Object} projection
  124. **/
  125. proto.setProjection = function setProjection(projection, deps) {
  126. if (this._projection){
  127. throw new Error("projection is already set");
  128. }
  129. //dont think we need this yet
  130. // this._projection = new Projection();
  131. // this._projection.init(projection);
  132. //
  133. // this.cursor().fields = this._projection;
  134. this._projection = projection; //just for testing
  135. this._dependencies = deps;
  136. };
  137. //----------------virtuals from DocumentSource--------------
  138. /**
  139. * Set the underlying source this source should use to get Documents
  140. * from.
  141. * It is an error to set the source more than once. This is to
  142. * prevent changing sources once the original source has been started;
  143. * this could break the state maintained by the DocumentSource.
  144. * This pointer is not reference counted because that has led to
  145. * some circular references. As a result, this doesn't keep
  146. * sources alive, and is only intended to be used temporarily for
  147. * the lifetime of a Pipeline::run().
  148. *
  149. * @method setSource
  150. * @param source {DocumentSource} the underlying source to use
  151. * @param callback {Function} a `mungedb-aggregate`-specific extension to the API to half-way support reading from async sources
  152. **/
  153. proto.setSource = function setSource(theSource) {
  154. if (theSource) throw new Error("CursorDocumentSource doesn't take a source"); //TODO: This needs to put back without the if once async is fully and properly supported
  155. };
  156. proto.serialize = function serialize(explain) {
  157. if (!explain)
  158. return null;
  159. if (!this._cursorWithContext)
  160. throw new Error("code 17135; Cursor deleted.");
  161. // A stab at what mongo wants
  162. return {
  163. query: this._query,
  164. sort: this._sort ? this._sort : null,
  165. limit: this._limit ? this._limit : null,
  166. fields: this._projection ? this._projection : null,
  167. indexonly: false,
  168. cursorType: this._cursorWithContext ? "cursor" : null
  169. };
  170. };
  171. // LimitDocumentSource has the setLimit function which trickles down to any documentsource
  172. proto.getLimit = function getLimit() {
  173. return this._limit;
  174. };
  175. //----------------private--------------
  176. //proto.chunkMgr = function chunkMgr(){};
  177. //proto.canUseCoveredIndex = function canUseCoveredIndex(){};
  178. //proto.yieldSometimes = function yieldSometimes(){};
  179. proto.loadBatch = function loadBatch() {
  180. var nDocs = 0,
  181. cursor = this._cursorWithContext ? this._cursorWithContext._cursor : null;
  182. if (!cursor)
  183. return this.dispose();
  184. for(;cursor.ok(); cursor.advance()) {
  185. if (!cursor.ok())
  186. break;
  187. // these methods do not exist
  188. // if (!cursor.currentMatches() || cursor.currentIsDup())
  189. // continue;
  190. var next = cursor.current();
  191. this._currentBatch.push(this._projection ? this.documentFromBsonDeps(next, this._dependencies) : next);
  192. if (this._limit) {
  193. if (++this._docsAddedToBatches == this._limit.getLimit())
  194. break;
  195. if (this._docsAddedToBatches >= this._limit.getLimit()) {
  196. throw new Error("added documents to the batch over limit size");
  197. }
  198. }
  199. // Mongo uses number of bytes, but that doesn't make sense here. Yield when nDocs is over a threshold
  200. if (nDocs > MAX_BATCH_DOCS) {
  201. this._curIdx++; // advance the deque
  202. nDocs++;
  203. return;
  204. }
  205. }
  206. };