UnwindDocumentSource.js 8.7 KB


  1. "use strict";
  2. var async = require("async");
  3. /**
  4. * A document source unwinder
  5. * @class UnwindDocumentSource
  6. * @namespace mungedb-aggregate.pipeline.documentSources
  7. * @module mungedb-aggregate
  8. * @constructor
  9. * @param [ctx] {ExpressionContext}
  10. **/
  11. var UnwindDocumentSource = module.exports = function UnwindDocumentSource(ctx){
  12. if (arguments.length > 1) throw new Error("up to one arg expected");
  13. base.call(this, ctx);
  14. // Configuration state.
  15. this._unwindPath = null;
  16. // Iteration state.
  17. this._unwinder = null;
  18. }, klass = UnwindDocumentSource, base = require('./DocumentSource'), proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
  19. var DocumentSource = base,
  20. FieldPath = require('../FieldPath'),
  21. Document = require('../Document'),
  22. Expression = require('../expressions/Expression');
  23. klass.Unwinder = (function(){
  24. /**
  25. * Helper class to unwind arrays within a series of documents.
  26. * @param {String} unwindPath is the field path to the array to unwind.
  27. **/
  28. var klass = function Unwinder(unwindPath){
  29. // Path to the array to unwind.
  30. this._unwindPath = unwindPath;
  31. // The souce document to unwind.
  32. this._document = null;
  33. // Document indexes of the field path components.
  34. this._unwindPathFieldIndexes = [];
  35. // Iterator over the array within _document to unwind.
  36. this._unwindArrayIterator = null;
  37. // The last value returned from _unwindArrayIterator.
  38. //this._unwindArrayIteratorCurrent = undefined; //dont define this yet
  39. }, base = Object, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
  40. /**
  41. * Reset the unwinder to unwind a new document.
  42. * @param {Object} document
  43. **/
  44. proto.resetDocument = function resetDocument(document){
  45. if (!document) throw new Error("document is required!");
  46. // Reset document specific attributes.
  47. this._document = document;
  48. this._unwindPathFieldIndexes.length = 0;
  49. this._unwindArrayIterator = null;
  50. delete this._unwindArrayIteratorCurrent;
  51. var pathValue = this.extractUnwindValue(); // sets _unwindPathFieldIndexes
  52. if (!pathValue || pathValue.length === 0) return; // The path does not exist.
  53. if (!(pathValue instanceof Array)) throw new Error(UnwindDocumentSource.unwindName + ": value at end of field path must be an array; code 15978");
  54. // Start the iterator used to unwind the array.
  55. this._unwindArrayIterator = pathValue.slice(0);
  56. this._unwindArrayIteratorCurrent = this._unwindArrayIterator.splice(0,1)[0];
  57. };
  58. /**
  59. * getNext
  60. *
  61. * This is just wrapping the old functions because they are somewhat different
  62. * than the original mongo implementation, but should get updated to follow the current API.
  63. **/
  64. proto.getNext = function getNext() {
  65. if (this.eof())
  66. return DocumentSource.EOF;
  67. var output = this.getCurrent();
  68. this.advance();
  69. return output;
  70. };
  71. /**
  72. * eof
  73. * @returns {Boolean} true if done unwinding the last document passed to resetDocument().
  74. **/
  75. proto.eof = function eof(){
  76. return !this.hasOwnProperty("_unwindArrayIteratorCurrent");
  77. };
  78. /**
  79. * Try to advance to the next document unwound from the document passed to resetDocument().
  80. * @returns {Boolean} true if advanced to a new unwound document, but false if done advancing.
  81. **/
  82. proto.advance = function advance(){
  83. if (!this._unwindArrayIterator) {
  84. // resetDocument() has not been called or the supplied document had no results to
  85. // unwind.
  86. delete this._unwindArrayIteratorCurrent;
  87. } else if (!this._unwindArrayIterator.length) {
  88. // There are no more results to unwind.
  89. delete this._unwindArrayIteratorCurrent;
  90. } else {
  91. this._unwindArrayIteratorCurrent = this._unwindArrayIterator.splice(0, 1)[0];
  92. }
  93. };
  94. /**
  95. * Get the current document unwound from the document provided to resetDocument(), using
  96. * the current value in the array located at the provided unwindPath. But return
  97. * intrusive_ptr<Document>() if resetDocument() has not been called or the results to unwind
  98. * have been exhausted.
  99. *
  100. * @returns {Object}
  101. **/
  102. proto.getCurrent = function getCurrent(){
  103. if (!this.hasOwnProperty("_unwindArrayIteratorCurrent")) {
  104. return null;
  105. }
  106. // Clone all the documents along the field path so that the end values are not shared across
  107. // documents that have come out of this pipeline operator. This is a partial deep clone.
  108. // Because the value at the end will be replaced, everything along the path leading to that
  109. // will be replaced in order not to share that change with any other clones (or the
  110. // original).
  111. var clone = Document.clone(this._document);
  112. var current = clone;
  113. var n = this._unwindPathFieldIndexes.length;
  114. if (!n) throw new Error("unwindFieldPathIndexes are empty");
  115. for (var i = 0; i < n; ++i) {
  116. var fi = this._unwindPathFieldIndexes[i];
  117. var fp = current[fi];
  118. if (i + 1 < n) {
  119. // For every object in the path but the last, clone it and continue on down.
  120. var next = Document.clone(fp);
  121. current[fi] = next;
  122. current = next;
  123. } else {
  124. // In the last nested document, subsitute the current unwound value.
  125. current[fi] = this._unwindArrayIteratorCurrent;
  126. }
  127. }
  128. return clone;
  129. };
  130. /**
  131. * Get the value at the unwind path, otherwise an empty pointer if no such value
  132. * exists. The _unwindPathFieldIndexes attribute will be set as the field path is traversed
  133. * to find the value to unwind.
  134. *
  135. * @returns {Object}
  136. **/
  137. proto.extractUnwindValue = function extractUnwindValue() {
  138. var current = this._document;
  139. var pathValue;
  140. var pathLength = this._unwindPath.getPathLength();
  141. for (var i = 0; i < pathLength; ++i) {
  142. var idx = this._unwindPath.getFieldName(i);
  143. if (!current.hasOwnProperty(idx)) return null; // The target field is missing.
  144. // Record the indexes of the fields down the field path in order to quickly replace them
  145. // as the documents along the field path are cloned.
  146. this._unwindPathFieldIndexes.push(idx);
  147. pathValue = current[idx];
  148. if (i < pathLength - 1) {
  149. if (typeof pathValue !== 'object') return null; // The next field in the path cannot exist (inside a non object).
  150. current = pathValue; // Move down the object tree.
  151. }
  152. }
  153. return pathValue;
  154. };
  155. return klass;
  156. })();
  157. /**
  158. * Specify the field to unwind.
  159. **/
  160. proto.unwindPath = function unwindPath(fieldPath){
  161. // Can't set more than one unwind path.
  162. if (this._unwindPath) throw new Error(this.getSourceName() + " can't unwind more than one path; code 15979");
  163. // Record the unwind path.
  164. this._unwindPath = new FieldPath(fieldPath);
  165. this._unwinder = new klass.Unwinder(this._unwindPath);
  166. };
  167. klass.unwindName = "$unwind";
  168. proto.getSourceName = function getSourceName(){
  169. return klass.unwindName;
  170. };
  171. /**
  172. * Get the fields this operation needs to do its job.
  173. * Deps should be in "a.b.c" notation
  174. *
  175. * @method getDependencies
  176. * @param {Object} deps set (unique array) of strings
  177. * @returns DocumentSource.GetDepsReturn
  178. **/
  179. proto.getDependencies = function getDependencies(deps) {
  180. if (!this._unwindPath) throw new Error("unwind path does not exist!");
  181. deps[this._unwindPath.getPath(false)] = 1;
  182. return DocumentSource.GetDepsReturn.SEE_NEXT;
  183. };
  184. proto.getNext = function getNext(callback) {
  185. if (!callback) throw new Error(this.getSourceName() + ' #getNext() requires callback');
  186. var self = this,
  187. out = this._unwinder.getNext(),
  188. exhausted = false;
  189. async.until(
  190. function() {
  191. if(out === DocumentSource.EOF && exhausted) return true; // Really is EOF, not just an empty unwinder
  192. else if(out !== DocumentSource.EOF) return true; // Return whatever we got that wasn't EOF
  193. return false;
  194. },
  195. function(cb) {
  196. self.source.getNext(function(err, doc) {
  197. if(err) return cb(err);
  198. out = doc;
  199. if(out === DocumentSource.EOF) { // Our source is out of documents, we're done
  200. exhausted = true;
  201. return cb();
  202. } else {
  203. self._unwinder.resetDocument(doc);
  204. out = self._unwinder.getNext();
  205. return cb();
  206. }
  207. });
  208. },
  209. function(err) {
  210. if(err) return callback(err);
  211. return callback(null, out);
  212. }
  213. );
  214. return out; //For sync mode
  215. };
  216. proto.serialize = function serialize(explain) {
  217. if (!this._unwindPath) throw new Error("unwind path does not exist!");
  218. var doc = {};
  219. doc[this.getSourceName()] = this._unwindPath.getPath(true);
  220. return doc;
  221. };
  222. /**
  223. * Creates a new UnwindDocumentSource with the input path as the path to unwind
  224. * @param {String} JsonElement this thing is *called* Json, but it expects a string
  225. **/
  226. klass.createFromJson = function createFromJson(jsonElement, ctx) {
  227. // The value of $unwind should just be a field path.
  228. if (jsonElement.constructor !== String) throw new Error("the " + klass.unwindName + " field path must be specified as a string; code 15981");
  229. var pathString = Expression.removeFieldPrefix(jsonElement);
  230. var unwind = new UnwindDocumentSource(ctx);
  231. unwind.unwindPath(pathString);
  232. return unwind;
  233. };