CursorDocumentSource.js 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. "use strict";
  2. var DocumentSource = require('./DocumentSource'),
  3. LimitDocumentSource = require('./LimitDocumentSource');
  4. // Mimicking max memory size from mongo/db/query/new_find.cpp
  5. // Need to actually decide some size for this?
  6. var MAX_BATCH_DOCS = 150;
  7. /**
  8. * Constructs and returns Documents from the objects produced by a supplied Cursor.
  9. * An object of this type may only be used by one thread, see SERVER-6123.
  10. *
  11. * This is usually put at the beginning of a chain of document sources
  12. * in order to fetch data from the database.
  13. *
  14. * @class CursorDocumentSource
  15. * @namespace mungedb-aggregate.pipeline.documentSources
  16. * @module mungedb-aggregate
  17. * @constructor
  18. * @param {CursorDocumentSource.CursorWithContext} cursorWithContext the cursor to use to fetch data
  19. **/
  20. var CursorDocumentSource = module.exports = CursorDocumentSource = function CursorDocumentSource(cursorWithContext, expCtx){
  21. base.call(this, expCtx);
  22. this.current = null;
  23. // this.ns = null;
  24. // /*
  25. // The bson dependencies must outlive the Cursor wrapped by this
  26. // source. Therefore, bson dependencies must appear before pCursor
  27. // in order cause its destructor to be called *after* pCursor's.
  28. // */
  29. // this.query = null;
  30. // this.sort = null;
  31. this._projection = null;
  32. this._cursorWithContext = cursorWithContext;
  33. this._curIdx = 0;
  34. this._currentBatch = [];
  35. this._limit = undefined;
  36. this._docsAddedToBatches = 0;
  37. if (!this._cursorWithContext || !this._cursorWithContext._cursor) throw new Error("CursorDocumentSource requires a valid cursorWithContext");
  38. }, klass = CursorDocumentSource, base = require('./DocumentSource'), proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
  39. klass.CursorWithContext = (function (){
  40. /**
  41. * Holds a Cursor and all associated state required to access the cursor.
  42. * @class CursorWithContext
  43. * @namespace mungedb-aggregate.pipeline.documentSources.CursorDocumentSource
  44. * @module mungedb-aggregate
  45. * @constructor
  46. **/
  47. var klass = function CursorWithContext(ns){
  48. this._cursor = null;
  49. };
  50. return klass;
  51. })();
  52. /**
  53. * Release the Cursor and the read lock it requires, but without changing the other data.
  54. * Releasing the lock is required for proper concurrency, see SERVER-6123. This
  55. * functionality is also used by the explain version of pipeline execution.
  56. *
  57. * @method dispose
  58. **/
  59. proto.dispose = function dispose() {
  60. this._cursorWithContext = null;
  61. this._currentBatch = [];
  62. this._curIdx = 0;
  63. };
  64. proto.getSourceName = function getSourceName() {
  65. return "$cursor";
  66. };
  67. proto.getNext = function getNext(callback) {
  68. if (!callback) throw new Error(this.getSourceName() + ' #getNext() requires callback');
  69. if (this._currentBatch.length <= this._curIdx) {
  70. this.loadBatch();
  71. if (this._currentBatch.length <= this._curIdx) {
  72. callback(null, DocumentSource.EOF);
  73. return DocumentSource.EOF;
  74. }
  75. }
  76. // Don't unshift. It's expensiver.
  77. var out = this._currentBatch[this._curIdx];
  78. this._curIdx++;
  79. callback(null, out);
  80. return out;
  81. };
  82. proto.coalesce = function coalesce(nextSource) {
  83. if (this._limit) {
  84. return this._limit.coalesce(nextSource);
  85. } else if (nextSource instanceof LimitDocumentSource) {
  86. this._limit = nextSource;
  87. return this._limit;
  88. } else {
  89. return false;
  90. }
  91. };
  92. ///**
  93. // * Record the namespace. Required for explain.
  94. // *
  95. // * @method setNamespace
  96. // * @param {String} ns the namespace
  97. // **/
  98. //proto.setNamespace = function setNamespace(ns) {}
  99. //
  100. ///**
  101. // * Record the query that was specified for the cursor this wraps, if any.
  102. // * This should be captured after any optimizations are applied to
  103. // * the pipeline so that it reflects what is really used.
  104. // * This gets used for explain output.
  105. // *
  106. // * @method setQuery
  107. // * @param {Object} pBsonObj the query to record
  108. // **/
  109. proto.setQuery = function setQuery(query) {
  110. this._query = query;
  111. };
  112. ///**
  113. // * Record the sort that was specified for the cursor this wraps, if any.
  114. // * This should be captured after any optimizations are applied to
  115. // * the pipeline so that it reflects what is really used.
  116. // * This gets used for explain output.
  117. // *
  118. // * @method setSort
  119. // * @param {Object} pBsonObj the query to record
  120. // **/
  121. //proto.setSort = function setSort(pBsonObj) {};
  122. /**
  123. * setProjection method
  124. *
  125. * @method setProjection
  126. * @param {Object} projection
  127. **/
  128. proto.setProjection = function setProjection(projection, deps) {
  129. if (this._projection){
  130. throw new Error("projection is already set");
  131. }
  132. //dont think we need this yet
  133. // this._projection = new Projection();
  134. // this._projection.init(projection);
  135. //
  136. // this.cursor().fields = this._projection;
  137. this._projection = projection; //just for testing
  138. this._dependencies = deps;
  139. };
  140. //----------------virtuals from DocumentSource--------------
  141. /**
  142. * Set the underlying source this source should use to get Documents
  143. * from.
  144. * It is an error to set the source more than once. This is to
  145. * prevent changing sources once the original source has been started;
  146. * this could break the state maintained by the DocumentSource.
  147. * This pointer is not reference counted because that has led to
  148. * some circular references. As a result, this doesn't keep
  149. * sources alive, and is only intended to be used temporarily for
  150. * the lifetime of a Pipeline::run().
  151. *
  152. * @method setSource
  153. * @param source {DocumentSource} the underlying source to use
  154. * @param callback {Function} a `mungedb-aggregate`-specific extension to the API to half-way support reading from async sources
  155. **/
  156. proto.setSource = function setSource(theSource) {
  157. if (theSource) throw new Error("CursorDocumentSource doesn't take a source"); //TODO: This needs to put back without the if once async is fully and properly supported
  158. };
  159. proto.serialize = function serialize(explain) {
  160. if (!explain)
  161. return null;
  162. if (!this._cursorWithContext)
  163. throw new Error("code 17135; Cursor deleted.");
  164. // A stab at what mongo wants
  165. return {
  166. query: this._query,
  167. sort: this._sort ? this._sort : null,
  168. limit: this._limit ? this._limit : null,
  169. fields: this._projection ? this._projection : null,
  170. indexonly: false,
  171. cursorType: this._cursorWithContext ? "cursor" : null
  172. };
  173. };
  174. // LimitDocumentSource has the setLimit function which trickles down to any documentsource
  175. proto.getLimit = function getLimit() {
  176. return this._limit ? this._limit.getLimit() : -1;
  177. };
  178. //----------------private--------------
  179. //proto.chunkMgr = function chunkMgr(){};
  180. //proto.canUseCoveredIndex = function canUseCoveredIndex(){};
  181. //proto.yieldSometimes = function yieldSometimes(){};
  182. proto.loadBatch = function loadBatch() {
  183. var nDocs = 0,
  184. cursor = this._cursorWithContext ? this._cursorWithContext._cursor : null;
  185. if (!cursor)
  186. return this.dispose();
  187. for(;cursor.ok(); cursor.advance()) {
  188. if (!cursor.ok())
  189. break;
  190. // these methods do not exist
  191. // if (!cursor.currentMatches() || cursor.currentIsDup())
  192. // continue;
  193. var next = cursor.current();
  194. this._currentBatch.push(this._projection ? base.documentFromJsonWithDeps(next, this._dependencies) : next);
  195. if (this._limit) {
  196. this._docsAddedToBatches++;
  197. if (this._docsAddedToBatches == this._limit.getLimit())
  198. break;
  199. if (this._docsAddedToBatches >= this._limit.getLimit()) {
  200. throw new Error("added documents to the batch over limit size");
  201. }
  202. }
  203. // Mongo uses number of bytes, but that doesn't make sense here. Yield when nDocs is over a threshold
  204. if (nDocs > MAX_BATCH_DOCS) {
  205. this._curIdx++; // advance the deque
  206. nDocs++;
  207. return;
  208. }
  209. }
  210. this._cursorWithContext = undefined; //NOTE: Trying to emulate erasing the cursor; not exactly how mongo does it
  211. };