UnwindDocumentSource.js 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352
  1. var UnwindDocumentSource = module.exports = (function(){
  2. // CONSTRUCTOR
  3. /**
  4. * A document source unwindper
  5. *
  6. * @class UnwindDocumentSource
  7. * @namespace munge.pipeline.documentsource
  8. * @module munge
  9. * @constructor
  10. * @param {Object} query the match query to use
  11. **/
  12. var klass = module.exports = UnwindDocumentSource = function UnwindDocumentSource(/* pCtx*/){
  13. if(arguments.length !== 0) throw new Error("zero args expected");
  14. base.call(this);
  15. // Configuration state.
  16. this._unwindPath = null;
  17. // Iteration state.
  18. this._unwinder = null;
  19. }, base = require('./DocumentSource'), proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
  20. var DocumentSource = base,
  21. FieldPath = require('../FieldPath'),
  22. Document = require('../Document'),
  23. Expression = require('../expressions/Expression');
  24. klass.Unwinder = (function(){
  25. /**
  26. * Helper class to unwind arrays within a series of documents.
  27. *
  28. * @param {String} unwindPath is the field path to the array to unwind.
  29. **/
  30. var klass = function Unwinder(unwindPath){
  31. // Path to the array to unwind.
  32. this._unwindPath = unwindPath;
  33. // The souce document to unwind.
  34. this._document = null;
  35. // Document indexes of the field path components.
  36. this._unwindPathFieldIndexes = [];
  37. // Iterator over the array within _document to unwind.
  38. this._unwindArrayIterator = null;
  39. // The last value returned from _unwindArrayIterator.
  40. //this._unwindArrayIteratorCurrent = undefined; //dont define this yet
  41. }, base = Object, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
  42. /**
  43. * Reset the unwinder to unwind a new document.
  44. *
  45. * @param {Object} document
  46. **/
  47. proto.resetDocument = function resetDocument(document){
  48. if (!document){
  49. throw new Error("document is required!");
  50. }
  51. // Reset document specific attributes.
  52. this._document = document;
  53. this._unwindPathFieldIndexes.length = 0;
  54. this._unwindArrayIterator = null;
  55. delete this._unwindArrayIteratorCurrent;
  56. var pathValue = this.extractUnwindValue(); // sets _unwindPathFieldIndexes
  57. if (!pathValue || pathValue.length === 0) {
  58. // The path does not exist.
  59. return;
  60. }
  61. if (pathValue.constructor !== Array){
  62. throw new Error(UnwindDocumentSource.unwindName + ": value at end of field path must be an array; code 15978");
  63. }
  64. // Start the iterator used to unwind the array.
  65. this._unwindArrayIterator = pathValue.slice(0);
  66. this._unwindArrayIteratorCurrent = this._unwindArrayIterator.splice(0,1)[0];
  67. };
  68. /**
  69. * eof
  70. *
  71. * @returns {Boolean} true if done unwinding the last document passed to resetDocument().
  72. **/
  73. proto.eof = function eof(){
  74. return !this.hasOwnProperty("_unwindArrayIteratorCurrent");
  75. };
  76. /**
  77. * Try to advance to the next document unwound from the document passed to resetDocument().
  78. *
  79. * @returns {Boolean} true if advanced to a new unwound document, but false if done advancing.
  80. **/
  81. proto.advance = function advance(){
  82. if (!this._unwindArrayIterator) {
  83. // resetDocument() has not been called or the supplied document had no results to
  84. // unwind.
  85. delete this._unwindArrayIteratorCurrent;
  86. } else if (!this._unwindArrayIterator.length) {
  87. // There are no more results to unwind.
  88. delete this._unwindArrayIteratorCurrent;
  89. } else {
  90. this._unwindArrayIteratorCurrent = this._unwindArrayIterator.splice(0, 1)[0];
  91. }
  92. };
  93. /**
  94. * Get the current document unwound from the document provided to resetDocument(), using
  95. * the current value in the array located at the provided unwindPath. But return
  96. * intrusive_ptr<Document>() if resetDocument() has not been called or the results to unwind
  97. * have been exhausted.
  98. *
  99. * @returns {Object}
  100. **/
  101. proto.getCurrent = function getCurrent(){
  102. if (!this.hasOwnProperty("_unwindArrayIteratorCurrent")) {
  103. return null;
  104. }
  105. // Clone all the documents along the field path so that the end values are not shared across
  106. // documents that have come out of this pipeline operator. This is a partial deep clone.
  107. // Because the value at the end will be replaced, everything along the path leading to that
  108. // will be replaced in order not to share that change with any other clones (or the
  109. // original).
  110. var clone = Document.clone(this._document);
  111. var current = clone;
  112. var n = this._unwindPathFieldIndexes.length;
  113. if (!n) {
  114. throw new Error("unwindFieldPathIndexes are empty");
  115. }
  116. for (var i = 0; i < n; ++i) {
  117. var fi = this._unwindPathFieldIndexes[i];
  118. var fp = current[fi];
  119. if (i + 1 < n) {
  120. // For every object in the path but the last, clone it and continue on down.
  121. var next = Document.clone(fp);
  122. current[fi] = next;
  123. current = next;
  124. } else {
  125. // In the last nested document, subsitute the current unwound value.
  126. current[fi] = this._unwindArrayIteratorCurrent;
  127. }
  128. }
  129. return clone;
  130. };
  131. /**
  132. * Get the value at the unwind path, otherwise an empty pointer if no such value
  133. * exists. The _unwindPathFieldIndexes attribute will be set as the field path is traversed
  134. * to find the value to unwind.
  135. *
  136. * @returns {Object}
  137. **/
  138. proto.extractUnwindValue = function extractUnwindValue(){
  139. var current = this._document;
  140. var pathValue;
  141. var pathLength = this._unwindPath.getPathLength();
  142. for (var i = 0; i < pathLength; ++i) {
  143. var idx = this._unwindPath.getFieldName(i);
  144. if (!current.hasOwnProperty(idx)) {
  145. // The target field is missing.
  146. return null;
  147. }
  148. // Record the indexes of the fields down the field path in order to quickly replace them
  149. // as the documents along the field path are cloned.
  150. this._unwindPathFieldIndexes.push(idx);
  151. pathValue = current[idx];
  152. if (i < pathLength - 1) {
  153. if (typeof pathValue !== 'object') {
  154. // The next field in the path cannot exist (inside a non object).
  155. return null;
  156. }
  157. // Move down the object tree.
  158. current = pathValue;
  159. }
  160. }
  161. return pathValue;
  162. };
  163. return klass;
  164. })();
  165. /**
  166. * Lazily construct the _unwinder and initialize the iterator state of this DocumentSource.
  167. * To be called by all members that depend on the iterator state.
  168. **/
  169. proto.lazyInit = function lazyInit(){
  170. if (!this._unwinder) {
  171. if (!this._unwindPath){
  172. throw new Error("unwind path does not exist!");
  173. }
  174. this._unwinder = new klass.Unwinder(this._unwindPath);
  175. if (!this.pSource.eof()) {
  176. // Set up the first source document for unwinding.
  177. this._unwinder.resetDocument(this.pSource.getCurrent());
  178. }
  179. this.mayAdvanceSource();
  180. }
  181. };
  182. /**
  183. * If the _unwinder is exhausted and the source may be advanced, advance the pSource and
  184. * reset the _unwinder's source document.
  185. **/
  186. proto.mayAdvanceSource = function mayAdvanceSource(){
  187. while(this._unwinder.eof()) {
  188. // The _unwinder is exhausted.
  189. if (this.pSource.eof()) {
  190. // The source is exhausted.
  191. return;
  192. }
  193. if (!this.pSource.advance()) {
  194. // The source is exhausted.
  195. return;
  196. }
  197. // Reset the _unwinder with pSource's next document.
  198. this._unwinder.resetDocument(this.pSource.getCurrent());
  199. }
  200. };
  201. /**
  202. * Specify the field to unwind.
  203. **/
  204. proto.unwindPath = function unwindPath(fieldPath){
  205. // Can't set more than one unwind path.
  206. if (this._unwindPath){
  207. throw new Error(this.getSourceName() + " can't unwind more than one path; code 15979");
  208. }
  209. // Record the unwind path.
  210. this._unwindPath = new FieldPath(fieldPath);
  211. };
  212. klass.unwindName = "$unwind";
  213. proto.getSourceName = function getSourceName(){
  214. return klass.unwindName;
  215. };
  216. /**
  217. * Get the fields this operation needs to do its job.
  218. * Deps should be in "a.b.c" notation
  219. *
  220. * @method getDependencies
  221. * @param {Object} deps set (unique array) of strings
  222. * @returns DocumentSource.GetDepsReturn
  223. **/
  224. proto.getDependencies = function getDependencies(deps) {
  225. if (!this._unwindPath){
  226. throw new Error("unwind path does not exist!");
  227. }
  228. deps[this._unwindPath.getPath(false)] = 1;
  229. return DocumentSource.GetDepsReturn.SEE_NEXT;
  230. };
  231. /**
  232. * Is the source at EOF?
  233. *
  234. * @method eof
  235. **/
  236. proto.eof = function eof() {
  237. this.lazyInit();
  238. return this._unwinder.eof();
  239. };
  240. /**
  241. * some implementations do the equivalent of verify(!eof()) so check eof() first
  242. *
  243. * @method getCurrent
  244. * @returns {Document} the current Document without advancing
  245. **/
  246. proto.getCurrent = function getCurrent() {
  247. this.lazyInit();
  248. return this._unwinder.getCurrent();
  249. };
  250. /**
  251. * Advance the state of the DocumentSource so that it will return the next Document.
  252. * The default implementation returns false, after checking for interrupts.
  253. * Derived classes can call the default implementation in their own implementations in order to check for interrupts.
  254. *
  255. * @method advance
  256. * @returns {Boolean} whether there is another document to fetch, i.e., whether or not getCurrent() will succeed. This default implementation always returns false.
  257. **/
  258. proto.advance = function advance() {
  259. base.prototype.advance.call(this); // check for interrupts
  260. this.lazyInit();
  261. this._unwinder.advance();
  262. this.mayAdvanceSource();
  263. return !this._unwinder.eof();
  264. };
  265. /**
  266. * Create an object that represents the document source. The object
  267. * will have a single field whose name is the source's name. This
  268. * will be used by the default implementation of addToJsonArray()
  269. * to add this object to a pipeline being represented in JSON.
  270. *
  271. * @method sourceToJson
  272. * @param {Object} builder JSONObjBuilder: a blank object builder to write to
  273. * @param {Boolean} explain create explain output
  274. **/
  275. proto.sourceToJson = function sourceToJson(builder, explain) {
  276. if (!this._unwindPath){
  277. throw new Error("unwind path does not exist!");
  278. }
  279. builder[this.getSourceName()] = this._unwindPath.getPath(true);
  280. };
  281. /**
  282. * Creates a new UnwindDocumentSource with the input path as the path to unwind
  283. *
  284. * @param {String} JsonElement this thing is *called* Json, but it expects a string
  285. **/
  286. klass.createFromJson = function createFromJson(JsonElement) {
  287. /*
  288. The value of $unwind should just be a field path.
  289. */
  290. if (JsonElement.constructor !== String){
  291. throw new Error("the " + klass.unwindName + " field path must be specified as a string; code 15981");
  292. }
  293. var pathString = Expression.removeFieldPrefix(JsonElement);
  294. var unwind = new UnwindDocumentSource();
  295. unwind.unwindPath(pathString);
  296. return unwind;
  297. };
  298. /**
  299. * Reset the document source so that it is ready for a new stream of data.
  300. * Note that this is a deviation from the mongo implementation.
  301. *
  302. * @method reset
  303. **/
  304. proto.reset = function reset(){
  305. this._unwinder = null;
  306. };
  307. return klass;
  308. })();