SkipDocumentSource.js 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. "use strict";
  2. var async = require('async'),
  3. DocumentSource = require('./DocumentSource');
  4. /**
  5. * A document source skipper
  6. * @class SkipDocumentSource
  7. * @namespace mungedb-aggregate.pipeline.documentSources
  8. * @module mungedb-aggregate
  9. * @constructor
  10. * @param [ctx] {ExpressionContext}
  11. **/
  12. var SkipDocumentSource = module.exports = function SkipDocumentSource(ctx){
  13. if (arguments.length > 1) throw new Error("up to one arg expected");
  14. base.call(this, ctx);
  15. this.skip = 0;
  16. this.count = 0;
  17. }, klass = SkipDocumentSource, base = require('./DocumentSource'), proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
  18. klass.skipName = "$skip";
  19. proto.getSourceName = function getSourceName(){
  20. return klass.skipName;
  21. };
  22. /**
  23. * Coalesce skips together
  24. * @param {Object} nextSource the next source
  25. * @return {bool} return whether we can coalese together
  26. **/
  27. proto.coalesce = function coalesce(nextSource) {
  28. var nextSkip = nextSource.constructor === SkipDocumentSource?nextSource:null;
  29. // if it's not another $skip, we can't coalesce
  30. if (!nextSkip) return false;
  31. // we need to skip over the sum of the two consecutive $skips
  32. this.skip += nextSkip.skip;
  33. return true;
  34. };
  35. proto.getNext = function getNext(callback) {
  36. if (!callback) throw new Error(this.getSourceName() + ' #getNext() requires callback');
  37. var self = this,
  38. next;
  39. if (this.count < this.skip) {
  40. async.doWhilst(
  41. function(cb) {
  42. self.source.getNext(function(err, val) {
  43. if(err) return cb(err);
  44. self.count++;
  45. next = val;
  46. return cb();
  47. });
  48. },
  49. function() {
  50. return self.count < self.skip || next === DocumentSource.EOF;
  51. },
  52. function(err) {
  53. if (err)
  54. return callback(err);
  55. }
  56. );
  57. }
  58. return this.source.getNext(callback);
  59. };
  60. proto.serialize = function serialize(explain) {
  61. var out = {};
  62. out[this.getSourceName()] = this.skip;
  63. return out;
  64. };
  65. proto.getSkip = function getSkip() {
  66. return this.skip;
  67. };
  68. /**
  69. * Creates a new SkipDocumentSource with the input number as the skip
  70. *
  71. * @param {Number} JsonElement this thing is *called* Json, but it expects a number
  72. **/
  73. klass.createFromJson = function createFromJson(jsonElement, ctx) {
  74. if (typeof jsonElement !== "number") throw new Error("code 15972; the value to skip must be a number");
  75. var nextSkip = new SkipDocumentSource(ctx);
  76. nextSkip.skip = jsonElement;
  77. if (nextSkip.skip < 0 || isNaN(nextSkip.skip)) throw new Error("code 15956; the number to skip cannot be negative");
  78. return nextSkip;
  79. };