UnwindDocumentSource.js 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. "use strict";
  2. var async = require('async'),
  3. DocumentSource = require('./DocumentSource'),
  4. Expression = require('../expressions/Expression'),
  5. FieldPath = require('../FieldPath'),
  6. Value = require('../Value'),
  7. Document = require('../Document');
  8. /**
  9. * A document source unwinder
  10. * @class UnwindDocumentSource
  11. * @namespace mungedb-aggregate.pipeline.documentSources
  12. * @module mungedb-aggregate
  13. * @constructor
  14. * @param [ctx] {ExpressionContext}
  15. **/
  16. var UnwindDocumentSource = module.exports = function UnwindDocumentSource(ctx){
  17. if (arguments.length > 1) {
  18. throw new Error('Up to one argument expected.');
  19. }
  20. base.call(this, ctx);
  21. this._unwindPath = null; // Configuration state.
  22. this._unwinder = null; // Iteration state.
  23. }, klass = UnwindDocumentSource, base = require('./DocumentSource'), proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
  24. klass.unwindName = '$unwind';
  25. klass.Unwinder = (function() {
  26. /**
  27. * Construct a new Unwinder instance. Used as a parent class for UnwindDocumentSource.
  28. *
  29. * @param unwindPath
  30. * @constructor
  31. */
  32. var klass = function Unwinder(unwindPath) {
  33. this._unwindPath = new FieldPath(unwindPath);
  34. this._inputArray = undefined;
  35. this._document = undefined;
  36. this._index = undefined;
  37. }, base = Object, proto = klass.prototype = Object.create(base.prototype, {constructor: {value: klass}});
  38. proto.resetDocument = function resetDocument(document) {
  39. if (!document) throw new Error('Document is required!');
  40. this._inputArray = [];
  41. this._document = document;
  42. this._index = 0;
  43. var pathValue = Document.getNestedField(this._document, this._unwindPath);
  44. if (!pathValue || pathValue.length === 0) {
  45. return;
  46. }
  47. if (!(pathValue instanceof Array)) {
  48. throw new Error(UnwindDocumentSource.unwindName + ': value at end of field path must be an array; code 15978');
  49. }
  50. this._inputArray = pathValue;
  51. };
  52. /**
  53. * getNext
  54. *
  55. * This is just wrapping the old functions because they are somewhat different
  56. * than the original mongo implementation, but should get updated to follow the current API.
  57. **/
  58. proto.getNext = function getNext() {
  59. if (this._inputArray === undefined || this._index === this._inputArray.length) {
  60. return null;
  61. }
  62. this._document = Document.cloneDeep(this._document);
  63. Document.setNestedField(this._document, this._unwindPath, this._inputArray[this._index++]);
  64. return this._document;
  65. };
  66. return klass;
  67. })();
  68. /**
  69. * Get the document source name.
  70. *
  71. * @method getSourceName
  72. * @returns {string}
  73. */
  74. proto.getSourceName = function getSourceName() {
  75. return klass.unwindName;
  76. };
  77. /**
  78. * Get the next source.
  79. *
  80. * @method getNext
  81. * @param callback
  82. * @returns {*}
  83. */
  84. proto.getNext = function getNext(callback) {
  85. if (!callback) {
  86. throw new Error(this.getSourceName() + ' #getNext() requires callback.');
  87. }
  88. if (this.expCtx.checkForInterrupt && this.expCtx.checkForInterrupt() === false) {
  89. return callback(new Error('Interrupted'));
  90. }
  91. var self = this,
  92. out,
  93. exhausted = false;
  94. try {
  95. out = this._unwinder.getNext();
  96. } catch (ex) {
  97. return callback(ex);
  98. }
  99. async.until(
  100. function () {
  101. if (out !== null || exhausted) {
  102. return true;
  103. }
  104. return false;
  105. },
  106. function (cb) {
  107. self.source.getNext(function (err, doc) {
  108. if (err) {
  109. return cb(err);
  110. }
  111. try {
  112. if (doc === null) {
  113. exhausted = true;
  114. } else {
  115. self._unwinder.resetDocument(doc);
  116. out = self._unwinder.getNext();
  117. }
  118. } catch (ex) {
  119. return cb(ex);
  120. }
  121. return cb();
  122. });
  123. },
  124. function(err) {
  125. if (err) {
  126. return callback(err);
  127. }
  128. return callback(null, out);
  129. }
  130. );
  131. return out;
  132. };
  133. /**
  134. * Serialize the data.
  135. *
  136. * @method serialize
  137. * @param explain
  138. * @returns {{}}
  139. */
  140. proto.serialize = function serialize(explain) {
  141. if (!this._unwindPath) {
  142. throw new Error('unwind path does not exist!');
  143. }
  144. var doc = {};
  145. doc[this.getSourceName()] = this._unwindPath.getPath(true);
  146. return doc;
  147. };
  148. /**
  149. * Get the fields this operation needs to do its job.
  150. *
  151. * @method getDependencies
  152. * @param deps
  153. * @returns {DocumentSource.GetDepsReturn.SEE_NEXT|*}
  154. */
  155. proto.getDependencies = function getDependencies(deps) {
  156. if (!this._unwindPath) {
  157. throw new Error('unwind path does not exist!');
  158. }
  159. deps.fields[this._unwindPath.getPath(false)] = 1;
  160. return DocumentSource.GetDepsReturn.SEE_NEXT;
  161. };
  162. /**
  163. * Unwind path.
  164. *
  165. * @method unwindPath
  166. * @param fieldPath
  167. */
  168. proto.unwindPath = function unwindPath(fieldPath) {
  169. if (this._unwindPath) {
  170. throw new Error(this.getSourceName() + ' can\'t unwind more than one path; code 15979');
  171. }
  172. // Record the unwind path.
  173. this._unwindPath = new FieldPath(fieldPath);
  174. this._unwinder = new klass.Unwinder(fieldPath);
  175. };
  176. /**
  177. * Creates a new UnwindDocumentSource with the input path as the path to unwind
  178. * @method createFromJson
  179. * @param {String} JsonElement this thing is *called* Json, but it expects a string
  180. **/
  181. klass.createFromJson = function createFromJson(jsonElement, ctx) {
  182. if (jsonElement.constructor !== String) {
  183. throw new Error('the ' + klass.unwindName + ' field path must be specified as a string; code 15981');
  184. }
  185. var pathString = Expression.removeFieldPrefix(jsonElement),
  186. unwind = new UnwindDocumentSource(ctx);
  187. unwind.unwindPath(pathString);
  188. return unwind;
  189. };