UnwindDocumentSource.js 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. "use strict";
  2. /**
  3. * A document source unwinder
  4. * @class UnwindDocumentSource
  5. * @namespace mungedb-aggregate.pipeline.documentSources
  6. * @module mungedb-aggregate
  7. * @constructor
  8. * @param [ctx] {ExpressionContext}
  9. **/
  10. var UnwindDocumentSource = module.exports = function UnwindDocumentSource(ctx){
  11. if (arguments.length > 1) throw new Error("up to one arg expected");
  12. base.call(this, ctx);
  13. // Configuration state.
  14. this._unwindPath = null;
  15. // Iteration state.
  16. this._unwinder = null;
  17. }, klass = UnwindDocumentSource, base = require('./DocumentSource'), proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
  18. var DocumentSource = base,
  19. FieldPath = require('../FieldPath'),
  20. Document = require('../Document'),
  21. Expression = require('../expressions/Expression');
  22. klass.Unwinder = (function(){
  23. /**
  24. * Helper class to unwind arrays within a series of documents.
  25. * @param {String} unwindPath is the field path to the array to unwind.
  26. **/
  27. var klass = function Unwinder(unwindPath){
  28. // Path to the array to unwind.
  29. this._unwindPath = unwindPath;
  30. // The souce document to unwind.
  31. this._document = null;
  32. // Document indexes of the field path components.
  33. this._unwindPathFieldIndexes = [];
  34. // Iterator over the array within _document to unwind.
  35. this._unwindArrayIterator = null;
  36. // The last value returned from _unwindArrayIterator.
  37. //this._unwindArrayIteratorCurrent = undefined; //dont define this yet
  38. }, base = Object, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
  39. /**
  40. * Reset the unwinder to unwind a new document.
  41. * @param {Object} document
  42. **/
  43. proto.resetDocument = function resetDocument(document){
  44. if (!document) throw new Error("document is required!");
  45. // Reset document specific attributes.
  46. this._document = document;
  47. this._unwindPathFieldIndexes.length = 0;
  48. this._unwindArrayIterator = null;
  49. delete this._unwindArrayIteratorCurrent;
  50. var pathValue = this.extractUnwindValue(); // sets _unwindPathFieldIndexes
  51. if (!pathValue || pathValue.length === 0) return; // The path does not exist.
  52. if (!(pathValue instanceof Array)) throw new Error(UnwindDocumentSource.unwindName + ": value at end of field path must be an array; code 15978");
  53. // Start the iterator used to unwind the array.
  54. this._unwindArrayIterator = pathValue.slice(0);
  55. this._unwindArrayIteratorCurrent = this._unwindArrayIterator.splice(0,1)[0];
  56. };
  57. /**
  58. * eof
  59. * @returns {Boolean} true if done unwinding the last document passed to resetDocument().
  60. **/
  61. proto.eof = function eof(){
  62. return !this.hasOwnProperty("_unwindArrayIteratorCurrent");
  63. };
  64. /**
  65. * Try to advance to the next document unwound from the document passed to resetDocument().
  66. * @returns {Boolean} true if advanced to a new unwound document, but false if done advancing.
  67. **/
  68. proto.advance = function advance(){
  69. if (!this._unwindArrayIterator) {
  70. // resetDocument() has not been called or the supplied document had no results to
  71. // unwind.
  72. delete this._unwindArrayIteratorCurrent;
  73. } else if (!this._unwindArrayIterator.length) {
  74. // There are no more results to unwind.
  75. delete this._unwindArrayIteratorCurrent;
  76. } else {
  77. this._unwindArrayIteratorCurrent = this._unwindArrayIterator.splice(0, 1)[0];
  78. }
  79. };
  80. /**
  81. * Get the current document unwound from the document provided to resetDocument(), using
  82. * the current value in the array located at the provided unwindPath. But return
  83. * intrusive_ptr<Document>() if resetDocument() has not been called or the results to unwind
  84. * have been exhausted.
  85. *
  86. * @returns {Object}
  87. **/
  88. proto.getCurrent = function getCurrent(){
  89. if (!this.hasOwnProperty("_unwindArrayIteratorCurrent")) {
  90. return null;
  91. }
  92. // Clone all the documents along the field path so that the end values are not shared across
  93. // documents that have come out of this pipeline operator. This is a partial deep clone.
  94. // Because the value at the end will be replaced, everything along the path leading to that
  95. // will be replaced in order not to share that change with any other clones (or the
  96. // original).
  97. var clone = Document.clone(this._document);
  98. var current = clone;
  99. var n = this._unwindPathFieldIndexes.length;
  100. if (!n) throw new Error("unwindFieldPathIndexes are empty");
  101. for (var i = 0; i < n; ++i) {
  102. var fi = this._unwindPathFieldIndexes[i];
  103. var fp = current[fi];
  104. if (i + 1 < n) {
  105. // For every object in the path but the last, clone it and continue on down.
  106. var next = Document.clone(fp);
  107. current[fi] = next;
  108. current = next;
  109. } else {
  110. // In the last nested document, subsitute the current unwound value.
  111. current[fi] = this._unwindArrayIteratorCurrent;
  112. }
  113. }
  114. return clone;
  115. };
  116. /**
  117. * Get the value at the unwind path, otherwise an empty pointer if no such value
  118. * exists. The _unwindPathFieldIndexes attribute will be set as the field path is traversed
  119. * to find the value to unwind.
  120. *
  121. * @returns {Object}
  122. **/
  123. proto.extractUnwindValue = function extractUnwindValue() {
  124. var current = this._document;
  125. var pathValue;
  126. var pathLength = this._unwindPath.getPathLength();
  127. for (var i = 0; i < pathLength; ++i) {
  128. var idx = this._unwindPath.getFieldName(i);
  129. if (!current.hasOwnProperty(idx)) return null; // The target field is missing.
  130. // Record the indexes of the fields down the field path in order to quickly replace them
  131. // as the documents along the field path are cloned.
  132. this._unwindPathFieldIndexes.push(idx);
  133. pathValue = current[idx];
  134. if (i < pathLength - 1) {
  135. if (typeof pathValue !== 'object') return null; // The next field in the path cannot exist (inside a non object).
  136. current = pathValue; // Move down the object tree.
  137. }
  138. }
  139. return pathValue;
  140. };
  141. return klass;
  142. })();
  143. /**
  144. * Lazily construct the _unwinder and initialize the iterator state of this DocumentSource.
  145. * To be called by all members that depend on the iterator state.
  146. **/
  147. proto.lazyInit = function lazyInit(){
  148. if (!this._unwinder) {
  149. if (!this._unwindPath){
  150. throw new Error("unwind path does not exist!");
  151. }
  152. this._unwinder = new klass.Unwinder(this._unwindPath);
  153. if (!this.source.eof()) {
  154. // Set up the first source document for unwinding.
  155. this._unwinder.resetDocument(this.source.getCurrent());
  156. }
  157. this.mayAdvanceSource();
  158. }
  159. };
  160. /**
  161. * If the _unwinder is exhausted and the source may be advanced, advance the source and
  162. * reset the _unwinder's source document.
  163. **/
  164. proto.mayAdvanceSource = function mayAdvanceSource(){
  165. while(this._unwinder.eof()) {
  166. // The _unwinder is exhausted.
  167. if (this.source.eof()) return; // The source is exhausted.
  168. if (!this.source.advance()) return; // The source is exhausted.
  169. // Reset the _unwinder with source's next document.
  170. this._unwinder.resetDocument(this.source.getCurrent());
  171. }
  172. };
  173. /**
  174. * Specify the field to unwind.
  175. **/
  176. proto.unwindPath = function unwindPath(fieldPath){
  177. // Can't set more than one unwind path.
  178. if (this._unwindPath) throw new Error(this.getSourceName() + " can't unwind more than one path; code 15979");
  179. // Record the unwind path.
  180. this._unwindPath = new FieldPath(fieldPath);
  181. };
  182. klass.unwindName = "$unwind";
  183. proto.getSourceName = function getSourceName(){
  184. return klass.unwindName;
  185. };
  186. /**
  187. * Get the fields this operation needs to do its job.
  188. * Deps should be in "a.b.c" notation
  189. *
  190. * @method getDependencies
  191. * @param {Object} deps set (unique array) of strings
  192. * @returns DocumentSource.GetDepsReturn
  193. **/
  194. proto.getDependencies = function getDependencies(deps) {
  195. if (!this._unwindPath) throw new Error("unwind path does not exist!");
  196. deps[this._unwindPath.getPath(false)] = 1;
  197. return DocumentSource.GetDepsReturn.SEE_NEXT;
  198. };
  199. /**
  200. * Is the source at EOF?
  201. * @method eof
  202. **/
  203. proto.eof = function eof() {
  204. this.lazyInit();
  205. return this._unwinder.eof();
  206. };
  207. /**
  208. * some implementations do the equivalent of verify(!eof()) so check eof() first
  209. * @method getCurrent
  210. * @returns {Document} the current Document without advancing
  211. **/
  212. proto.getCurrent = function getCurrent() {
  213. this.lazyInit();
  214. return this._unwinder.getCurrent();
  215. };
  216. /**
  217. * Advance the state of the DocumentSource so that it will return the next Document.
  218. * The default implementation returns false, after checking for interrupts.
  219. * Derived classes can call the default implementation in their own implementations in order to check for interrupts.
  220. *
  221. * @method advance
  222. * @returns {Boolean} whether there is another document to fetch, i.e., whether or not getCurrent() will succeed. This default implementation always returns false.
  223. **/
  224. proto.advance = function advance() {
  225. base.prototype.advance.call(this); // check for interrupts
  226. this.lazyInit();
  227. this._unwinder.advance();
  228. this.mayAdvanceSource();
  229. return !this._unwinder.eof();
  230. };
  231. /**
  232. * Create an object that represents the document source. The object
  233. * will have a single field whose name is the source's name. This
  234. * will be used by the default implementation of addToJsonArray()
  235. * to add this object to a pipeline being represented in JSON.
  236. *
  237. * @method sourceToJson
  238. * @param {Object} builder JSONObjBuilder: a blank object builder to write to
  239. * @param {Boolean} explain create explain output
  240. **/
  241. proto.sourceToJson = function sourceToJson(builder, explain) {
  242. if (!this._unwindPath) throw new Error("unwind path does not exist!");
  243. builder[this.getSourceName()] = this._unwindPath.getPath(true);
  244. };
  245. /**
  246. * Creates a new UnwindDocumentSource with the input path as the path to unwind
  247. * @param {String} JsonElement this thing is *called* Json, but it expects a string
  248. **/
  249. klass.createFromJson = function createFromJson(jsonElement, ctx) {
  250. // The value of $unwind should just be a field path.
  251. if (jsonElement.constructor !== String) throw new Error("the " + klass.unwindName + " field path must be specified as a string; code 15981");
  252. var pathString = Expression.removeFieldPrefix(jsonElement);
  253. var unwind = new UnwindDocumentSource(ctx);
  254. unwind.unwindPath(pathString);
  255. return unwind;
  256. };