DocumentSource.js 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. "use strict";
  2. /**
  3. * A base class for all document sources
  4. * @class DocumentSource
  5. * @namespace mungedb-aggregate.pipeline.documentSources
  6. * @module mungedb-aggregate
  7. * @constructor
  8. * @param expCtx {ExpressionContext}
  9. **/
  10. var DocumentSource = module.exports = function DocumentSource(expCtx){
  11. if(arguments.length !== 1) throw new Error("one arg expected");
  12. /*
  13. * Most DocumentSources have an underlying source they get their data
  14. * from. This is a convenience for them.
  15. * The default implementation of setSource() sets this; if you don't
  16. * need a source, override that to verify(). The default is to
  17. * verify() if this has already been set.
  18. */
  19. this.source = null;
  20. /*
  21. * The zero-based user-specified pipeline step. Used for diagnostics.
  22. * Will be set to -1 for artificial pipeline steps that were not part
  23. * of the original user specification.
  24. */
  25. this.step = -1;
  26. this.expCtx = expCtx || {};
  27. /*
  28. * for explain: # of rows returned by this source
  29. * This is *not* unsigned so it can be passed to JSONObjBuilder.append().
  30. */
  31. this.nRowsOut = 0;
  32. }, klass = DocumentSource, base = Object, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
  33. /**
  34. * Use EOF as boost::none for document sources to signal the end of their document stream.
  35. **/
  36. klass.EOF = (function() {
  37. /**
  38. * Represents a non-value in a document stream
  39. * @class EOF
  40. * @namespace mungedb-aggregate.pipeline.documentSources.DocumentSource
  41. * @module mungedb-aggregate
  42. * @constructor
  43. **/
  44. var klass = function EOF(){
  45. };
  46. return klass;
  47. })();
  48. /*
  49. class DocumentSource :
  50. public IntrusiveCounterUnsigned,
  51. public StringWriter {
  52. public:
  53. virtual ~DocumentSource();
  54. // virtuals from StringWriter
  55. virtual void writeString(stringstream &ss) const;
  56. */
  57. /**
  58. * Set the step for a user-specified pipeline step.
  59. * @method setPipelineStep
  60. * @param {Number} step number 0 to n.
  61. **/
  62. proto.setPipelineStep = function setPipelineStep(step) {
  63. this.step = step;
  64. };
  65. /**
  66. * Get the user-specified pipeline step.
  67. * @method getPipelineStep
  68. * @returns {Number} step
  69. **/
  70. proto.getPipelineStep = function getPipelineStep() {
  71. return this.step;
  72. };
  73. /**
  74. * Returns the next Document if there is one or DocumentSource.EOF if at EOF.
  75. *
  76. * some implementations do the equivalent of verify(!eof()) so check eof() first
  77. * @method getNext
  78. * @returns {Document} the current Document without advancing
  79. **/
  80. proto.getNext = function getNext(callback) {
  81. throw new Error("not implemented");
  82. };
  83. /**
  84. * Inform the source that it is no longer needed and may release its resources. After
  85. * dispose() is called the source must still be able to handle iteration requests, but may
  86. * become eof().
  87. * NOTE: For proper mutex yielding, dispose() must be called on any DocumentSource that will
  88. * not be advanced until eof(), see SERVER-6123.
  89. *
  90. * @method dispose
  91. **/
  92. proto.dispose = function dispose() {
  93. if ( this.source ) {
  94. // This is required for the DocumentSourceCursor to release its read lock, see
  95. // SERVER-6123.
  96. this.source.dispose();
  97. }
  98. };
  99. /**
  100. * Get the source's name.
  101. * @method getSourceName
  102. * @returns {String} the string name of the source as a constant string; this is static, and there's no need to worry about adopting it
  103. **/
  104. proto.getSourceName = function getSourceName() {
  105. return "[UNKNOWN]";
  106. };
  107. /**
  108. * Set the underlying source this source should use to get Documents
  109. * from.
  110. * It is an error to set the source more than once. This is to
  111. * prevent changing sources once the original source has been started;
  112. * this could break the state maintained by the DocumentSource.
  113. * This pointer is not reference counted because that has led to
  114. * some circular references. As a result, this doesn't keep
  115. * sources alive, and is only intended to be used temporarily for
  116. * the lifetime of a Pipeline::run().
  117. *
  118. * @method setSource
  119. * @param {DocumentSource} source the underlying source to use
  120. **/
  121. proto.setSource = function setSource(theSource) {
  122. if (this.source) throw new Error("It is an error to set the source more than once");
  123. this.source = theSource;
  124. };
  125. /**
  126. * Attempt to coalesce this DocumentSource with its successor in the
  127. * document processing pipeline. If successful, the successor
  128. * DocumentSource should be removed from the pipeline and discarded.
  129. * If successful, this operation can be applied repeatedly, in an
  130. * attempt to coalesce several sources together.
  131. * The default implementation is to do nothing, and return false.
  132. *
  133. * @method coalesce
  134. * @param {DocumentSource} nextSource the next source in the document processing chain.
  135. * @returns {Boolean} whether or not the attempt to coalesce was successful or not; if the attempt was not successful, nothing has been changed
  136. **/
  137. proto.coalesce = function coalesce(nextSource) {
  138. return false;
  139. };
  140. /**
  141. * Optimize the pipeline operation, if possible. This is a local
  142. * optimization that only looks within this DocumentSource. For best
  143. * results, first coalesce compatible sources using coalesce().
  144. * This is intended for any operations that include expressions, and
  145. * provides a hook for those to optimize those operations.
  146. * The default implementation is to do nothing.
  147. *
  148. * @method optimize
  149. **/
  150. proto.optimize = function optimize() {
  151. };
  152. klass.GetDepsReturn = {
  153. NOT_SUPPORTED: "NOT_SUPPORTED", // This means the set should be ignored
  154. EXHAUSTIVE: "EXHAUSTIVE", // This means that everything needed should be in the set
  155. SEE_NEXT: "SEE_NEXT" // Add the next Source's deps to the set
  156. };
  157. /**
  158. * Get the fields this operation needs to do its job.
  159. * Deps should be in "a.b.c" notation
  160. *
  161. * @method getDependencies
  162. * @param {Object} deps set (unique array) of strings
  163. * @returns DocumentSource.GetDepsReturn
  164. **/
  165. proto.getDependencies = function getDependencies(deps) {
  166. return klass.GetDepsReturn.NOT_SUPPORTED;
  167. };
  168. /**
  169. * This takes dependencies from getDependencies and
  170. * returns a projection that includes all of them
  171. *
  172. * @method depsToProjection
  173. * @param {Object} deps set (unique array) of strings
  174. * @returns {Object} JSONObj
  175. **/
  176. klass.depsToProjection = function depsToProjection(deps) {
  177. var needId = false,
  178. bb = {};
  179. if (deps._id === undefined)
  180. bb._id = 0;
  181. var last = "";
  182. Object.keys(deps).sort().forEach(function(it){
  183. if (it.indexOf('_id') === 0 && (it.length === 3 || it[3] === '.')) {
  184. needId = true;
  185. return;
  186. } else {
  187. if (last !== "" && it.slice(0, last.length) === last){
  188. // we are including a parent of *it so we don't need to
  189. // include this field explicitly. In fact, due to
  190. // SERVER-6527 if we included this field, the parent
  191. // wouldn't be fully included.
  192. return;
  193. }
  194. }
  195. last = it + ".";
  196. bb[it] = 1;
  197. });
  198. if (needId) // we are explicit either way
  199. bb._id = 1;
  200. else
  201. bb._id = 0;
  202. return bb;
  203. };
  204. proto._serialize = function _serialize(explain) {
  205. throw new Error("not implemented");
  206. };
  207. proto.serializeToArray = function serializeToArray(array, explain) {
  208. var entry = this.serialize(explain);
  209. if (!entry) {
  210. array.push(entry);
  211. }
  212. };
  213. klass.parseDeps = function parseDeps(deps) {
  214. var md = {};
  215. var last,
  216. depKeys = Object.keys(deps);
  217. for (var i = 0; i < depKeys.length; i++) {
  218. var it = depKeys[i],
  219. value = deps[it];
  220. if (!last && it.indexOf(last) >= 0)
  221. continue;
  222. last = it + '.';
  223. md[it] = true;
  224. }
  225. return md;
  226. };
  227. klass.documentFromJsonWithDeps = function documentFromJsonWithDeps(bson, neededFields) {
  228. var arrayHelper = function(bson, neededFields) {
  229. var values = [];
  230. var bsonKeys = Object.keys(bson);
  231. for (var i = 0; i < bsonKeys.length; i++) {
  232. var key = bsonKeys[i],
  233. bsonElement = bson[key];
  234. if (bsonElement instanceof Object) {
  235. var sub = klass.documentFromJsonWithDeps(bsonElement, isNeeded);
  236. values.push(sub);
  237. }
  238. if (bsonElement instanceof Array) {
  239. values.push(arrayHelper(bsonElement, neededFields));
  240. }
  241. }
  242. return values;
  243. };
  244. var md = {};
  245. var bsonKeys = Object.keys(bson);
  246. for (var i = 0; i < bsonKeys.length; i++) {
  247. var fieldName = bsonKeys[i],
  248. bsonElement = bson[fieldName],
  249. isNeeded = neededFields ? neededFields[fieldName] : null;
  250. if (!isNeeded)
  251. continue;
  252. if (typeof(isNeeded) === 'boolean') {
  253. md[fieldName] = bsonElement;
  254. continue;
  255. }
  256. if (!isNeeded instanceof Object)
  257. throw new Error("instanceof should be an instance of Object");
  258. if (bsonElement instanceof Object) {
  259. var sub = klass.documentFromJsonWithDeps(bsonElement, isNeeded);
  260. md[fieldName] = sub;
  261. }
  262. if (bsonElement instanceof Array) {
  263. md[fieldName] = arrayHelper(bsonElement, isNeeded);
  264. }
  265. }
  266. return md;
  267. };