|
|
@@ -4,64 +4,95 @@ var async = require('async'),
|
|
|
DocumentSource = require('./DocumentSource');
|
|
|
|
|
|
/**
|
|
|
- * A document source skipper
|
|
|
+ * A document source skipper.
|
|
|
+ *
|
|
|
* @class SkipDocumentSource
|
|
|
* @namespace mungedb-aggregate.pipeline.documentSources
|
|
|
* @module mungedb-aggregate
|
|
|
* @constructor
|
|
|
* @param [ctx] {ExpressionContext}
|
|
|
**/
|
|
|
-var SkipDocumentSource = module.exports = function SkipDocumentSource(ctx){
|
|
|
- if (arguments.length > 1) throw new Error("up to one arg expected");
|
|
|
+var SkipDocumentSource = module.exports = function SkipDocumentSource(ctx) {
|
|
|
+ if (arguments.length > 1) {
|
|
|
+ throw new Error('Up to one argument expected.');
|
|
|
+ }
|
|
|
+
|
|
|
base.call(this, ctx);
|
|
|
+
|
|
|
this.skip = 0;
|
|
|
this.count = 0;
|
|
|
-}, klass = SkipDocumentSource, base = require('./DocumentSource'), proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
|
|
|
|
|
|
-klass.skipName = "$skip";
|
|
|
-proto.getSourceName = function getSourceName(){
|
|
|
+ this.needToSkip = true;
|
|
|
+}, klass = SkipDocumentSource, base = require('./DocumentSource'), proto = klass.prototype = Object.create(base.prototype, {constructor: {value: klass}});
|
|
|
+
|
|
|
+klass.skipName = '$skip';
|
|
|
+
|
|
|
+/**
|
|
|
+ * Return the source name.
|
|
|
+ *
|
|
|
+ * @returns {string}
|
|
|
+ */
|
|
|
+proto.getSourceName = function getSourceName() {
|
|
|
return klass.skipName;
|
|
|
};
|
|
|
|
|
|
/**
|
|
|
- * Coalesce skips together
|
|
|
- * @param {Object} nextSource the next source
|
|
|
- * @return {bool} return whether we can coalese together
|
|
|
- **/
|
|
|
+ * Coalesce skips together.
|
|
|
+ *
|
|
|
+ * @param nextSource
|
|
|
+ * @returns {boolean}
|
|
|
+ */
|
|
|
proto.coalesce = function coalesce(nextSource) {
|
|
|
- var nextSkip = nextSource.constructor === SkipDocumentSource?nextSource:null;
|
|
|
+ var nextSkip = nextSource.constructor === SkipDocumentSource ? nextSource : null;
|
|
|
|
|
|
- // if it's not another $skip, we can't coalesce
|
|
|
- if (!nextSkip) return false;
|
|
|
+ // If it's not another $skip, we can't coalesce.
|
|
|
+ if (!nextSkip) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
|
|
|
- // we need to skip over the sum of the two consecutive $skips
|
|
|
+ // We need to skip over the sum of the two consecutive $skips.
|
|
|
this.skip += nextSkip.skip;
|
|
|
+
|
|
|
return true;
|
|
|
};
|
|
|
|
|
|
+/**
|
|
|
+ * Get next source.
|
|
|
+ *
|
|
|
+ * @param callback
|
|
|
+ * @returns {*}
|
|
|
+ */
|
|
|
proto.getNext = function getNext(callback) {
|
|
|
- if (!callback) throw new Error(this.getSourceName() + ' #getNext() requires callback');
|
|
|
+ if (!callback) {
|
|
|
+ throw new Error(this.getSourceName() + ' #getNext() requires callback.');
|
|
|
+ }
|
|
|
+
|
|
|
+ if (this.expCtx.checkForInterrupt && this.expCtx.checkForInterrupt() === false) {
|
|
|
+ return callback(new Error('Interrupted'));
|
|
|
+ }
|
|
|
|
|
|
var self = this,
|
|
|
next;
|
|
|
|
|
|
- if (this.count < this.skip) {
|
|
|
+ if (this.needToSkip) { // May be unnecessary.
|
|
|
+ this.needToSkip = false;
|
|
|
|
|
|
async.doWhilst(
|
|
|
- function(cb) {
|
|
|
- self.source.getNext(function(err, val) {
|
|
|
- if(err) return cb(err);
|
|
|
- self.count++;
|
|
|
+ function (cb) {
|
|
|
+ self.source.getNext(function (err, val) {
|
|
|
+ if (err) { return cb(err); }
|
|
|
+
|
|
|
+ ++self.count;
|
|
|
next = val;
|
|
|
+
|
|
|
return cb();
|
|
|
});
|
|
|
},
|
|
|
function() {
|
|
|
return self.count < self.skip || next === DocumentSource.EOF;
|
|
|
},
|
|
|
- function(err) {
|
|
|
- if (err)
|
|
|
- return callback(err);
|
|
|
+ function (err) {
|
|
|
+ if (err) { return callback(err); }
|
|
|
}
|
|
|
);
|
|
|
}
|
|
|
@@ -69,28 +100,96 @@ proto.getNext = function getNext(callback) {
|
|
|
return this.source.getNext(callback);
|
|
|
};
|
|
|
|
|
|
+/**
|
|
|
+ * Serialize the source.
|
|
|
+ *
|
|
|
+ * @param explain
|
|
|
+ * @returns {{}}
|
|
|
+ */
|
|
|
proto.serialize = function serialize(explain) {
|
|
|
var out = {};
|
|
|
+
|
|
|
out[this.getSourceName()] = this.skip;
|
|
|
+
|
|
|
return out;
|
|
|
};
|
|
|
|
|
|
+/**
|
|
|
+ * Get skip value.
|
|
|
+ *
|
|
|
+ * @returns {number}
|
|
|
+ */
|
|
|
proto.getSkip = function getSkip() {
|
|
|
return this.skip;
|
|
|
};
|
|
|
|
|
|
/**
|
|
|
- * Creates a new SkipDocumentSource with the input number as the skip
|
|
|
+ * Set skip value.
|
|
|
*
|
|
|
- * @param {Number} JsonElement this thing is *called* Json, but it expects a number
|
|
|
+ * @param newSkip
|
|
|
+ */
|
|
|
+proto.setSkip = function setSkip(newSkip) {
|
|
|
+ this.skip = newSkip;
|
|
|
+};
|
|
|
+
|
|
|
+/**
|
|
|
+ * Create a new SkipDocumentSource.
|
|
|
+ *
|
|
|
+ * @param expCtx
|
|
|
+ * @returns {SkipDocumentSource}
|
|
|
+ */
|
|
|
+klass.create = function create(expCtx) {
|
|
|
+ return new SkipDocumentSource(expCtx);
|
|
|
+};
|
|
|
+
|
|
|
+/**
|
|
|
+ * Creates a new SkipDocumentSource with the input number as the skip.
|
|
|
+ *
|
|
|
+ * @param {Number} JsonElement this thing is *called* JSON, but it expects a number.
|
|
|
**/
|
|
|
klass.createFromJson = function createFromJson(jsonElement, ctx) {
|
|
|
- if (typeof jsonElement !== "number") throw new Error("code 15972; the value to skip must be a number");
|
|
|
+ if (typeof jsonElement !== 'number') {
|
|
|
+ throw new Error('code 15972; the value to skip must be a number');
|
|
|
+ }
|
|
|
|
|
|
var nextSkip = new SkipDocumentSource(ctx);
|
|
|
|
|
|
nextSkip.skip = jsonElement;
|
|
|
- if (nextSkip.skip < 0 || isNaN(nextSkip.skip)) throw new Error("code 15956; the number to skip cannot be negative");
|
|
|
+
|
|
|
+ if (nextSkip.skip < 0 || isNaN(nextSkip.skip)) {
|
|
|
+ throw new Error('code 15956; the number to skip cannot be negative');
|
|
|
+ }
|
|
|
|
|
|
return nextSkip;
|
|
|
};
|
|
|
+
|
|
|
+// SplittableDocumentSource implementation.
|
|
|
+klass.isSplittableDocumentSource = true;
|
|
|
+
|
|
|
+/**
|
|
|
+ * Get dependencies.
|
|
|
+ *
|
|
|
+ * @param deps
|
|
|
+ * @returns {number}
|
|
|
+ */
|
|
|
+proto.getDependencies = function getDependencies(deps) {
|
|
|
+ return DocumentSource.GetDepsReturn.SEE_NEXT;
|
|
|
+};
|
|
|
+
|
|
|
+/**
|
|
|
+ * Get shard source.
|
|
|
+ *
|
|
|
+ * @returns {null}
|
|
|
+ */
|
|
|
+proto.getShardSource = function getShardSource() {
|
|
|
+ return null;
|
|
|
+};
|
|
|
+
|
|
|
+/**
|
|
|
+ * Get router source.
|
|
|
+ *
|
|
|
+ * @returns {SkipDocumentSource}
|
|
|
+ */
|
|
|
+proto.getRouterSource = function getRouterSource() {
|
|
|
+ return this;
|
|
|
+};
|