UnwindDocumentSource.js 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. "use strict";
  2. var async = require("async"),
  3. DocumentSource = require("./DocumentSource"),
  4. Expression = require("../expressions/Expression"),
  5. FieldPath = require("../FieldPath"),
  6. Document = require("../Document");
  7. /**
  8. * A document source unwinder
  9. * @class UnwindDocumentSource
  10. * @namespace mungedb-aggregate.pipeline.documentSources
  11. * @module mungedb-aggregate
  12. * @constructor
  13. * @param [ctx] {ExpressionContext}
  14. **/
  15. var UnwindDocumentSource = module.exports = function UnwindDocumentSource(ctx){
  16. if (arguments.length > 1) {
  17. throw new Error("Up to one argument expected.");
  18. }
  19. base.call(this, ctx);
  20. this._unwindPath = null; // Configuration state.
  21. this._unwinder = null; // Iteration state.
  22. }, klass = UnwindDocumentSource, base = DocumentSource, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}}); //jshint ignore:line
  23. klass.unwindName = "$unwind";
  24. klass.Unwinder = (function() {
  25. /**
  26. * Construct a new Unwinder instance. Used as a parent class for UnwindDocumentSource.
  27. *
  28. * @param unwindPath
  29. * @constructor
  30. */
  31. var klass = function Unwinder(unwindPath) {
  32. this._unwindPath = new FieldPath(unwindPath);
  33. this._inputArray = undefined;
  34. this._document = undefined;
  35. this._index = undefined;
  36. }, base = Object, proto = klass.prototype = Object.create(base.prototype, {constructor: {value: klass}});
  37. proto.resetDocument = function resetDocument(document) {
  38. if (!document) throw new Error("Document is required!");
  39. this._inputArray = [];
  40. this._document = document;
  41. this._index = 0;
  42. var pathValue = Document.getNestedField(this._document, this._unwindPath);
  43. if (!pathValue || pathValue.length === 0) {
  44. return;
  45. }
  46. if (!(pathValue instanceof Array)) {
  47. throw new Error(UnwindDocumentSource.unwindName + ": value at end of field path must be an array; code 15978");
  48. }
  49. this._inputArray = pathValue;
  50. };
  51. /**
  52. * getNext
  53. *
  54. * This is just wrapping the old functions because they are somewhat different
  55. * than the original mongo implementation, but should get updated to follow the current API.
  56. **/
  57. proto.getNext = function getNext() {
  58. if (this._inputArray === undefined || this._index === this._inputArray.length) {
  59. return null;
  60. }
  61. this._document = Document.cloneDeep(this._document);
  62. Document.setNestedField(this._document, this._unwindPath, this._inputArray[this._index++]);
  63. return this._document;
  64. };
  65. return klass;
  66. })();
  67. /**
  68. * Get the document source name.
  69. *
  70. * @method getSourceName
  71. * @returns {string}
  72. */
  73. proto.getSourceName = function getSourceName() {
  74. return klass.unwindName;
  75. };
  76. /**
  77. * Get the next source.
  78. *
  79. * @method getNext
  80. * @param callback
  81. * @returns {*}
  82. */
  83. proto.getNext = function getNext(callback) {
  84. if (!callback) {
  85. throw new Error(this.getSourceName() + " #getNext() requires callback.");
  86. }
  87. if (this.expCtx.checkForInterrupt && this.expCtx.checkForInterrupt() === false) {
  88. return callback(new Error("Interrupted"));
  89. }
  90. var self = this,
  91. out,
  92. exhausted = false;
  93. try {
  94. out = this._unwinder.getNext();
  95. } catch (ex) {
  96. return callback(ex);
  97. }
  98. async.until(
  99. function () {
  100. if (out !== null || exhausted) {
  101. return true;
  102. }
  103. return false;
  104. },
  105. function (cb) {
  106. self.source.getNext(function (err, doc) {
  107. if (err) {
  108. return cb(err);
  109. }
  110. try {
  111. if (doc === null) {
  112. exhausted = true;
  113. } else {
  114. self._unwinder.resetDocument(doc);
  115. out = self._unwinder.getNext();
  116. }
  117. } catch (ex) {
  118. return cb(ex);
  119. }
  120. return cb();
  121. });
  122. },
  123. function(err) {
  124. if (err) {
  125. return callback(err);
  126. }
  127. return callback(null, out);
  128. }
  129. );
  130. return out;
  131. };
  132. /**
  133. * Serialize the data.
  134. *
  135. * @method serialize
  136. * @param explain
  137. * @returns {{}}
  138. */
  139. proto.serialize = function serialize(explain) {
  140. if (!this._unwindPath) {
  141. throw new Error("unwind path does not exist!");
  142. }
  143. var doc = {};
  144. doc[this.getSourceName()] = this._unwindPath.getPath(true);
  145. return doc;
  146. };
  147. /**
  148. * Get the fields this operation needs to do its job.
  149. *
  150. * @method getDependencies
  151. * @param deps
  152. * @returns {DocumentSource.GetDepsReturn.SEE_NEXT|*}
  153. */
  154. proto.getDependencies = function getDependencies(deps) {
  155. if (!this._unwindPath) {
  156. throw new Error("unwind path does not exist!");
  157. }
  158. deps.fields[this._unwindPath.getPath(false)] = 1;
  159. return DocumentSource.GetDepsReturn.SEE_NEXT;
  160. };
  161. /**
  162. * Unwind path.
  163. *
  164. * @method unwindPath
  165. * @param fieldPath
  166. */
  167. proto.unwindPath = function unwindPath(fieldPath) {
  168. if (this._unwindPath) {
  169. throw new Error(this.getSourceName() + " can't unwind more than one path; code 15979");
  170. }
  171. // Record the unwind path.
  172. this._unwindPath = new FieldPath(fieldPath);
  173. this._unwinder = new klass.Unwinder(fieldPath);
  174. };
  175. /**
  176. * Creates a new UnwindDocumentSource with the input path as the path to unwind
  177. * @method createFromJson
  178. * @param {String} JsonElement this thing is *called* Json, but it expects a string
  179. **/
  180. klass.createFromJson = function createFromJson(jsonElement, ctx) {
  181. if (jsonElement.constructor !== String) {
  182. throw new Error("the " + klass.unwindName + " field path must be specified as a string; code 15981");
  183. }
  184. var pathString = Expression.removeFieldPrefix(jsonElement),
  185. unwind = new UnwindDocumentSource(ctx);
  186. unwind.unwindPath(pathString);
  187. return unwind;
  188. };