SkipDocumentSource.js 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. "use strict";
  2. var async = require('async'),
  3. DocumentSource = require('./DocumentSource');
  4. /**
  5. * A document source skipper.
  6. *
  7. * @class SkipDocumentSource
  8. * @namespace mungedb-aggregate.pipeline.documentSources
  9. * @module mungedb-aggregate
  10. * @constructor
  11. * @param [ctx] {ExpressionContext}
  12. **/
  13. var SkipDocumentSource = module.exports = function SkipDocumentSource(ctx) {
  14. if (arguments.length > 1) {
  15. throw new Error('Up to one argument expected.');
  16. }
  17. base.call(this, ctx);
  18. this.skip = 0;
  19. this.count = 0;
  20. this.needToSkip = true;
  21. }, klass = SkipDocumentSource, base = require('./DocumentSource'), proto = klass.prototype = Object.create(base.prototype, {constructor: {value: klass}});
  22. klass.skipName = '$skip';
  23. /**
  24. * Return the source name.
  25. *
  26. * @returns {string}
  27. */
  28. proto.getSourceName = function getSourceName() {
  29. return klass.skipName;
  30. };
  31. /**
  32. * Coalesce skips together.
  33. *
  34. * @param nextSource
  35. * @returns {boolean}
  36. */
  37. proto.coalesce = function coalesce(nextSource) {
  38. var nextSkip = nextSource.constructor === SkipDocumentSource ? nextSource : null;
  39. // If it's not another $skip, we can't coalesce.
  40. if (!nextSkip) {
  41. return false;
  42. }
  43. // We need to skip over the sum of the two consecutive $skips.
  44. this.skip += nextSkip.skip;
  45. return true;
  46. };
  47. /**
  48. * Get next source.
  49. *
  50. * @param callback
  51. * @returns {*}
  52. */
  53. proto.getNext = function getNext(callback) {
  54. if (!callback) {
  55. throw new Error(this.getSourceName() + ' #getNext() requires callback.');
  56. }
  57. if (this.expCtx.checkForInterrupt && this.expCtx.checkForInterrupt() === false) {
  58. return callback(new Error('Interrupted'));
  59. }
  60. var self = this,
  61. next;
  62. if (this.needToSkip) { // May be unnecessary.
  63. this.needToSkip = false;
  64. async.doWhilst(
  65. function (cb) {
  66. self.source.getNext(function (err, val) {
  67. if (err) { return cb(err); }
  68. ++self.count;
  69. next = val;
  70. return cb();
  71. });
  72. },
  73. function() {
  74. return self.count < self.skip || next === DocumentSource.EOF;
  75. },
  76. function (err) {
  77. if (err) { return callback(err); }
  78. }
  79. );
  80. }
  81. return this.source.getNext(callback);
  82. };
  83. /**
  84. * Serialize the source.
  85. *
  86. * @param explain
  87. * @returns {{}}
  88. */
  89. proto.serialize = function serialize(explain) {
  90. var out = {};
  91. out[this.getSourceName()] = this.skip;
  92. return out;
  93. };
  94. /**
  95. * Get skip value.
  96. *
  97. * @returns {number}
  98. */
  99. proto.getSkip = function getSkip() {
  100. return this.skip;
  101. };
  102. /**
  103. * Set skip value.
  104. *
  105. * @param newSkip
  106. */
  107. proto.setSkip = function setSkip(newSkip) {
  108. this.skip = newSkip;
  109. };
  110. /**
  111. * Create a new SkipDocumentSource.
  112. *
  113. * @param expCtx
  114. * @returns {SkipDocumentSource}
  115. */
  116. klass.create = function create(expCtx) {
  117. return new SkipDocumentSource(expCtx);
  118. };
  119. /**
  120. * Creates a new SkipDocumentSource with the input number as the skip.
  121. *
  122. * @param {Number} JsonElement this thing is *called* JSON, but it expects a number.
  123. **/
  124. klass.createFromJson = function createFromJson(jsonElement, ctx) {
  125. if (typeof jsonElement !== 'number') {
  126. throw new Error('code 15972; the value to skip must be a number');
  127. }
  128. var nextSkip = new SkipDocumentSource(ctx);
  129. nextSkip.skip = jsonElement;
  130. if (nextSkip.skip < 0 || isNaN(nextSkip.skip)) {
  131. throw new Error('code 15956; the number to skip cannot be negative');
  132. }
  133. return nextSkip;
  134. };
  135. // SplittableDocumentSource implementation.
  136. klass.isSplittableDocumentSource = true;
  137. /**
  138. * Get dependencies.
  139. *
  140. * @param deps
  141. * @returns {number}
  142. */
  143. proto.getDependencies = function getDependencies(deps) {
  144. return DocumentSource.GetDepsReturn.SEE_NEXT;
  145. };
  146. /**
  147. * Get shard source.
  148. *
  149. * @returns {null}
  150. */
  151. proto.getShardSource = function getShardSource() {
  152. return null;
  153. };
  154. /**
  155. * Get router source.
  156. *
  157. * @returns {SkipDocumentSource}
  158. */
  159. proto.getRouterSource = function getRouterSource() {
  160. return this;
  161. };