DocumentSource.js 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. "use strict";
  2. /**
  3. * A base class for all document sources
  4. * @class DocumentSource
  5. * @namespace mungedb-aggregate.pipeline.documentSources
  6. * @module mungedb-aggregate
  7. * @constructor
  8. * @param expCtx {ExpressionContext}
  9. **/
  10. var DocumentSource = module.exports = function DocumentSource(expCtx){
  11. if(arguments.length !== 1) throw new Error("one arg expected");
  12. /*
  13. * Most DocumentSources have an underlying source they get their data
  14. * from. This is a convenience for them.
  15. * The default implementation of setSource() sets this; if you don't
  16. * need a source, override that to verify(). The default is to
  17. * verify() if this has already been set.
  18. */
  19. this.source = null;
  20. /*
  21. * The zero-based user-specified pipeline step. Used for diagnostics.
  22. * Will be set to -1 for artificial pipeline steps that were not part
  23. * of the original user specification.
  24. */
  25. this.step = -1;
  26. this.expCtx = expCtx || {};
  27. /*
  28. * for explain: # of rows returned by this source
  29. * This is *not* unsigned so it can be passed to JSONObjBuilder.append().
  30. */
  31. this.nRowsOut = 0;
  32. }, klass = DocumentSource, base = Object, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
  33. /*
  34. class DocumentSource :
  35. public IntrusiveCounterUnsigned,
  36. public StringWriter {
  37. public:
  38. virtual ~DocumentSource();
  39. // virtuals from StringWriter
  40. virtual void writeString(stringstream &ss) const;
  41. */
  42. /**
  43. * Set the step for a user-specified pipeline step.
  44. * @method setPipelineStep
  45. * @param {Number} step number 0 to n.
  46. **/
  47. proto.setPipelineStep = function setPipelineStep(step) {
  48. this.step = step;
  49. };
  50. /**
  51. * Get the user-specified pipeline step.
  52. * @method getPipelineStep
  53. * @returns {Number} step
  54. **/
  55. proto.getPipelineStep = function getPipelineStep() {
  56. return this.step;
  57. };
  58. /**
  59. * some implementations do the equivalent of verify(!eof()) so check eof() first
  60. * @method getNExt
  61. * @returns {Document} the current Document without advancing
  62. **/
  63. proto.getNext = function getNext(callback) {
  64. throw new Error("not implemented");
  65. };
  66. /**
  67. * Inform the source that it is no longer needed and may release its resources. After
  68. * dispose() is called the source must still be able to handle iteration requests, but may
  69. * become eof().
  70. * NOTE: For proper mutex yielding, dispose() must be called on any DocumentSource that will
  71. * not be advanced until eof(), see SERVER-6123.
  72. *
  73. * @method dispose
  74. **/
  75. proto.dispose = function dispose() {
  76. if ( this.source ) {
  77. // This is required for the DocumentSourceCursor to release its read lock, see
  78. // SERVER-6123.
  79. this.source.dispose();
  80. }
  81. };
  82. /**
  83. * Get the source's name.
  84. * @method getSourceName
  85. * @returns {String} the string name of the source as a constant string; this is static, and there's no need to worry about adopting it
  86. **/
  87. proto.getSourceName = function getSourceName() {
  88. return "[UNKNOWN]";
  89. };
  90. /**
  91. * Set the underlying source this source should use to get Documents
  92. * from.
  93. * It is an error to set the source more than once. This is to
  94. * prevent changing sources once the original source has been started;
  95. * this could break the state maintained by the DocumentSource.
  96. * This pointer is not reference counted because that has led to
  97. * some circular references. As a result, this doesn't keep
  98. * sources alive, and is only intended to be used temporarily for
  99. * the lifetime of a Pipeline::run().
  100. *
  101. * @method setSource
  102. * @param {DocumentSource} source the underlying source to use
  103. **/
  104. proto.setSource = function setSource(theSource) {
  105. if (this.source) throw new Error("It is an error to set the source more than once");
  106. this.source = theSource;
  107. };
  108. /**
  109. * Attempt to coalesce this DocumentSource with its successor in the
  110. * document processing pipeline. If successful, the successor
  111. * DocumentSource should be removed from the pipeline and discarded.
  112. * If successful, this operation can be applied repeatedly, in an
  113. * attempt to coalesce several sources together.
  114. * The default implementation is to do nothing, and return false.
  115. *
  116. * @method coalesce
  117. * @param {DocumentSource} nextSource the next source in the document processing chain.
  118. * @returns {Boolean} whether or not the attempt to coalesce was successful or not; if the attempt was not successful, nothing has been changed
  119. **/
  120. proto.coalesce = function coalesce(nextSource) {
  121. return false;
  122. };
  123. /**
  124. * Optimize the pipeline operation, if possible. This is a local
  125. * optimization that only looks within this DocumentSource. For best
  126. * results, first coalesce compatible sources using coalesce().
  127. * This is intended for any operations that include expressions, and
  128. * provides a hook for those to optimize those operations.
  129. * The default implementation is to do nothing.
  130. *
  131. * @method optimize
  132. **/
  133. proto.optimize = function optimize() {
  134. };
  135. klass.GetDepsReturn = {
  136. NOT_SUPPORTED: "NOT_SUPPORTED", // This means the set should be ignored
  137. EXHAUSTIVE: "EXHAUSTIVE", // This means that everything needed should be in the set
  138. SEE_NEXT: "SEE_NEXT" // Add the next Source's deps to the set
  139. };
  140. /**
  141. * Get the fields this operation needs to do its job.
  142. * Deps should be in "a.b.c" notation
  143. *
  144. * @method getDependencies
  145. * @param {Object} deps set (unique array) of strings
  146. * @returns DocumentSource.GetDepsReturn
  147. **/
  148. proto.getDependencies = function getDependencies(deps) {
  149. return klass.GetDepsReturn.NOT_SUPPORTED;
  150. };
  151. /**
  152. * This takes dependencies from getDependencies and
  153. * returns a projection that includes all of them
  154. *
  155. * @method depsToProjection
  156. * @param {Object} deps set (unique array) of strings
  157. * @returns {Object} JSONObj
  158. **/
  159. klass.depsToProjection = function depsToProjection(deps) {
  160. var needId = false,
  161. bb = {};
  162. if (deps._id === undefined)
  163. bb._id = 0;
  164. var last = "";
  165. Object.keys(deps).sort().forEach(function(it){
  166. if (it.indexOf('_id') === 0 && (it.length === 3 || it[3] === '.')) {
  167. needId = true;
  168. return;
  169. } else {
  170. if (last !== "" && it.slice(0, last.length) === last){
  171. // we are including a parent of *it so we don't need to
  172. // include this field explicitly. In fact, due to
  173. // SERVER-6527 if we included this field, the parent
  174. // wouldn't be fully included.
  175. return;
  176. }
  177. }
  178. last = it + ".";
  179. bb[it] = 1;
  180. });
  181. if (needId) // we are explicit either way
  182. bb._id = 1;
  183. else
  184. bb._id = 0;
  185. return bb;
  186. };
  187. proto._serialize = function _serialize(explain) {
  188. throw new Error("not implemented");
  189. };
  190. proto.serializeToArray = function serializeToArray(array, explain) {
  191. var entry = this.serialize(explain);
  192. if (!entry) {
  193. array.push(entry);
  194. }
  195. };
  196. // Taken as a whole, these three functions should produce the same output document given the
  197. // same deps set as mongo::Projection::transform would on the output of depsToProjection. The
  198. // only exceptions are that we correctly handle the case where no fields are needed and we don't
  199. // need to work around the above mentioned bug with subfields of _id (SERVER-7502). This is
  200. // tested in a DEV block in DocumentSourceCursor::findNext().
  201. //
  202. // Output from this function is input for the next two
  203. //
  204. // ParsedDeps is a simple recursive look-up table. For each field in a ParsedDeps:
  205. // If the value has type==Bool, the whole field is needed
  206. // If the value has type==Object, the fields in the subobject are needed
  207. // All other fields should be missing which means not needed
  208. // DocumentSource::ParsedDeps DocumentSource::parseDeps(const set<string>& deps) {
  209. // MutableDocument md;
  210. proto.parseDeps = function parseDeps(deps) {
  211. var doc,
  212. last;
  213. for (var i = 0; i < deps.length; i++) {
  214. var it = deps[i];
  215. if (!last && it.startsWith(last)) {
  216. // we are including a parent of *it so we don't need to include this field
  217. // explicitly. In fact, if we included this field, the parent wouldn't be fully
  218. // included. This logic relies on on set iterators going in lexicographic order so
  219. // that a string is always directly before of all fields it prefixes.
  220. continue;
  221. }
  222. last = it + '.';
  223. Object.setAtPath(doc, it, true);
  224. }
  225. return doc;
  226. };
  227. proto.documentFromBsonWithDeps = function documentFromBsonWithDeps(obj, deps) {
  228. var doc = {},
  229. self = this;
  230. var arrayHelper = function(field, isNeeded) {
  231. return field.map(function(f) {
  232. self.documentFromBsonWithDeps(f, isNeeded);
  233. });
  234. };
  235. for (var i = 0; i < obj.keys().length; i++) {
  236. var fieldName = obj.keys()[i],
  237. field = obj[fieldName],
  238. isNeeded = deps[fieldName];
  239. if (!isNeeded)
  240. continue;
  241. if (typeof isNeeded === Boolean) {
  242. Object.setAtPath(doc, fieldName, field);
  243. continue;
  244. }
  245. if (!(isNeeded instanceof Object))
  246. throw new Error("dependencies missing for object");
  247. if (field instanceof Array)
  248. Object.setAtPath(doc, fieldName, arrayHelper(field, isNeeded));
  249. if (field instanceof Object) { // everything is...
  250. var sub = this.documentFromBsonWithDeps(field, isNeeded);
  251. Object.setAtPath(doc, fieldName, sub);
  252. }
  253. }
  254. return doc;
  255. };