| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488 | 
							- "use strict";
 
- var async = require("async"),
 
- 	DocumentSource = require("./DocumentSource"),
 
- 	LimitDocumentSource = require("./LimitDocumentSource");
 
- /**
 
-  * A document source sorter
 
-  *
 
-  *	//NOTE: DEVIATION FROM THE MONGO: We don't have shards, this inherits from DocumentSource, instead of SplittableDocumentSource
 
-  *
 
-  * @class SortDocumentSource
 
-  * @namespace mungedb-aggregate.pipeline.documentSources
 
-  * @module mungedb-aggregate
 
-  * @constructor
 
-  * @param [ctx] {ExpressionContext}
 
-  **/
 
- var SortDocumentSource = module.exports = function SortDocumentSource(ctx){
 
- 	if (arguments.length > 1) throw new Error("up to one arg expected");
 
- 	base.call(this, ctx);
 
- 	/*
 
- 	* Before returning anything, this source must fetch everything from
 
- 	* the underlying source and group it.  populate() is used to do that
 
- 	* on the first call to any method on this source.  The populated
 
- 	* boolean indicates that this has been done
 
- 	**/
 
- 	this.populated = false;
 
- 	this.docIterator = null; // a number tracking our position in the documents array
 
- 	this.documents = []; // an array of documents
 
- 	this.vSortKey = [];
 
- 	this.vAscending = [];
 
- }, klass = SortDocumentSource, base = require("./DocumentSource"), proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}}); //jshint ignore:line
 
- // DEPENDENCIES
 
- var FieldPathExpression = require("../expressions/FieldPathExpression"),
 
- 	VariablesIdGenerator = require("../expressions/VariablesIdGenerator"),
 
- 	VariablesParseState = require("../expressions/VariablesParseState"),
 
- 	Variables = require("../expressions/Variables"),
 
- 	Value = require("../Value");
 
- klass.sortName = "$sort";
 
- proto.getSourceName = function getSourceName(){
 
- 	return klass.sortName;
 
- };
 
- proto.getFactory = function getFactory(){
 
- 	return klass;	// using the ctor rather than a separate .create() method
 
- };
 
- proto.dispose = function dispose() {
 
- 	this.docIterator = 0;
 
- 	this.documents = [];
 
- 	this._output = undefined;
 
- 	this.source.dispose();
 
- };
 
- proto.getLimit = function getLimit() {
 
- 	return this.limitSrc ? this.limitSrc.getLimit() : -1;
 
- };
 
- proto.coalesce = function coalesce(nextSource) {
 
- 	if (!this.limitSrc) {
 
- 		if (nextSource instanceof LimitDocumentSource) {
 
- 			this.limitSrc = nextSource;
 
- 			return nextSource;
 
- 		}
 
- 		return false;
 
- 	} else {
 
- 		return this.limitSrc.coalesce(nextSource);
 
- 	}
 
- };
 
- proto.getNext = function getNext(callback) {
 
- 	if (!callback) throw new Error(this.getSourceName() + " #getNext() requires callback");
 
- 	if (this.expCtx instanceof Object && this.expCtx.checkForInterrupt && this.expCtx.checkForInterrupt() === false)
 
- 		return callback(new Error("Interrupted"));
 
- 	var self = this,
 
- 		out;
 
- 	async.series(
 
- 		[
 
- 			function(next) {
 
- 				if (!self.populated)
 
- 				{
 
- 					self.populate(function(err) {
 
- 						return next(err);
 
- 					});
 
- 				} else {
 
- 					return next();
 
- 				}
 
- 			},
 
- 			function(next) {
 
- 				if (self.docIterator >= self.documents.length) {
 
- 					out = null;
 
- 					return next(null, null);
 
- 				}
 
- 				var output = self.documents[self.docIterator++];
 
- 				if (!output || output === null) {
 
- 					out = null;
 
- 					return next(null, null);
 
- 				}
 
- 				out = output;
 
- 				return next(null, output);
 
- 			}
 
- 		],
 
- 		function(err, results) {
 
- 			return callback(err, out);
 
- 		}
 
- 	);
 
- 	return out;
 
- };
 
- /**
 
- * Serialize to Array.
 
- *
 
- * @param {Array} array
 
- * @param {bool} explain
 
- **/
 
- proto.serializeToArray = function serializeToArray(array, explain) {
 
- 	var doc = {};
 
- 	if (explain) { // always one obj for combined $sort + $limit
 
- 		doc.sortKey = this.serializeSortKey(explain);
 
- 		doc.mergePresorted = this._mergePresorted;
 
- 		doc.limit = this.limitSrc ? this.limitSrc.getLimit() : undefined;
 
- 		array.push(doc);
 
- 	} else { // one Value for $sort and maybe a Value for $limit
 
- 		var inner = {};
 
- 		inner = this.serializeSortKey(explain);
 
- 		if (this._mergePresorted)
 
- 			inner.$mergePresorted = true;
 
- 		doc[this.getSourceName()] = inner;
 
- 		array.push(doc);
 
- 		if (this.limitSrc)
 
- 			this.limitSrc.serializeToArray(array);
 
- 	}
 
- };
 
- proto.serialize = function serialize(explain) {
 
- 	throw new Error("should call serializeToArray instead");
 
- };
 
- /**
 
- * Add sort key field.
 
- *
 
- * Adds a sort key field to the key being built up.  A concatenated
 
- * key is built up by calling this repeatedly.
 
- *
 
- * @param {String} fieldPath the field path to the key component
 
- * @param {bool} ascending if true, use the key for an ascending sort, otherwise, use it for descending
 
- **/
 
- proto.addKey = function addKey(fieldPath, ascending) {
 
- 	var idGenerator = new VariablesIdGenerator(),
 
- 		vps = new VariablesParseState(idGenerator);
 
- 	var pathExpr = FieldPathExpression.parse("$$ROOT." + fieldPath, vps);
 
- 	this.vSortKey.push(pathExpr);
 
- 	if (ascending === true || ascending === false) {
 
- 		this.vAscending.push(ascending);
 
- 	} else {
 
- 		// This doesn't appear to be an error in real mongo?
 
- 		throw new Error("ascending must be true or false");
 
- 	}
 
- };
 
- proto.makeSortOptions = function makeSortOptions(){
 
- 	/* make sure we've got a sort key */
 
- 	if (!this.vSortKey.length) throw new Error("no sort key for " + this.getSourceName());
 
- 	// Skipping memory checks
 
- 	var opts;
 
- 	if ( this.limitSrc)
 
- 		opts.limit = this.limitSrc.getLimt();
 
- 	return opts;
 
- };
 
- proto.populate = function populate(callback) {
 
- 	if ( this._mergePresorted ){
 
- 		// Skipping stuff about mergeCursors and commandShards
 
- 		throw new Error("Merge presorted not implemented.");
 
- 	} else {
 
- 		/* pull everything from the underlying source */
 
- 		var self = this,
 
- 			next;
 
- 		async.doWhilst(
 
- 			function (cb) {
 
- 				self.source.getNext(function(err, doc) {
 
- 					next = doc;
 
- 					// Don't add EOF; it doesn't sort well.
 
- 					if (doc !== null)
 
- 						self.documents.push(doc);
 
- 					return cb();
 
- 				});
 
- 			},
 
- 			function() {
 
- 				return next !== null;
 
- 			},
 
- 			function(err) {
 
- 				try {
 
- 					/* sort the list */
 
- 					self.documents.sort(SortDocumentSource.prototype.compare.bind(self));
 
- 				} catch (ex) {
 
- 					return callback(ex);
 
- 				}
 
- 				/* start the sort iterator */
 
- 				self.docIterator = 0;
 
- 				self.populated = true;
 
- 				//self._output.reset(true);
 
- 				return callback();
 
- 		}
 
- 		);
 
- 	}
 
- 	this.populated = true;
 
- };
 
- klass.IteratorFromCursor = (function(){
 
- 	/**
 
- 	 * Helper class to unwind arrays within a series of documents.
 
- 	 * @param	{String}	unwindPath is the field path to the array to unwind.
 
- 	 **/
 
- 	var klass = function IteratorFromCursor(sorter, cursor){
 
- 		this._sorter = new SortDocumentSource(sorter);
 
- 		//this._cursor = new DBClientCursor(cursor);
 
- 	}, base = Object, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
 
- 	proto.more = function more() {
 
- 		return this._cursor.more();
 
- 	};
 
- 	proto.next = function next() {
 
- 		// var doc = new DocumentSourceMergeCursors(this._cursor);
 
- 		// TODO: make_pair for return
 
- 		//return {this._sorter.extractKey(doc): doc};
 
- 	};
 
- 	return klass;
 
- })();
 
- proto.populateFromCursors = function populateFromCursors(cursors){
 
- 	//TODO: umm, probably broken...
 
- 	// for (var i = 0; i < cursors.length; i++) {
 
- 	// 	// TODO Create class
 
- 	// 	//this.iterators.push(boost::make_shared<IteratorFromBsonArray>(this, cursors[i]));
 
- 	// }
 
- 	//this._output.reset( ); // TODO: MySorter::Iterator::merge(iterators, makeSortOptions(), Comparator(*this))
 
- 	throw new Error("this implementation is broken"); //TODO: fix?
 
- };
 
- klass.IteratorFromBsonArray = (function(){
 
- 	/**
 
- 	 * Helper class to unwind arrays within a series of documents.
 
- 	 * @param	{String}	unwindPath is the field path to the array to unwind.
 
- 	 **/
 
- 	var klass = function IteratorFromBsonArray(sorter, array){
 
- 		this._sorter = new SortDocumentSource(sorter);
 
- 		//this._iterator = new BSONObjIterator(array);
 
- 	}, base = Object, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
 
- 	proto.next = function next() {
 
- 		// var doc = new DocumentSourceMergeCursors(this._cursor);
 
- 		// TODO: make_pair for return
 
- 		//return {this._sorter.extractKey(doc): doc};
 
- 	};
 
- 	proto.more = function more() {
 
- 		return this._cursor.more();
 
- 	};
 
- 	return klass;
 
- })();
 
- proto.populateFromBsonArrays = function populateFromBsonArrays(arrays){
 
- 	//TODO: umm, probably broken...
 
- 	// for (var i = 0; i < arrays.lenth; i++) {
 
- 	// 	// TODO Create class
 
- 	// 	//this.iterators.push(boost::make_shared<IteratorFromBsonArray>(this, arrays[i]));
 
- 	// }
 
- 	// this._output.reset( ); // TODO: MySorter::Iterator::merge(iterators, makeSortOptions(), Comparator(*this))
 
- 	throw new Error("this implementation is broken"); //TODO: fix?
 
- };
 
- /**
 
- * Extract the key
 
- *
 
- * @param  {d} document
 
- * @returns {keys} extracted key
 
- **/
 
- proto.extractKey = function extractKey(d){
 
- 	var vars = new Variables(0,d);
 
- 	if ( this.vSortKey.length === 1)
 
- 		return this.vSortKey[0].evaluate(vars);
 
- 	var keys;
 
- 	for (var i=0; i < this.vSortKey.length; i++) {
 
- 		keys.push(this.vSortKey[i].evaluate(vars));
 
- 	}
 
- 	return keys;
 
- };
 
- /**
 
-  * Compare two documents according to the specified sort key.
 
-  *
 
-  * @param {Object} lhs the left side doc
 
-  * @param {Object} rhs the right side doc
 
-  * @returns {Number} a number less than, equal to, or greater than zero, indicating pL < pR, pL == pR, or pL > pR, respectively
 
- **/
 
- proto.compare = function compare(lhs,rhs) {
 
- 	/*
 
- 	  populate() already checked that there is a non-empty sort key,
 
- 	  so we shouldn't have to worry about that here.
 
- 	  However, the tricky part is what to do is none of the sort keys are
 
- 	  present.  In this case, consider the document less.
 
- 	*/
 
- 	for(var i = 0, n = this.vSortKey.length; i < n; ++i) {
 
- 		var pathExpr = FieldPathExpression.create(this.vSortKey[i].getFieldPath(false).fieldNames.slice(1).join("."));
 
- 		/* evaluate the sort keys */
 
- 		var left = pathExpr.evaluate(lhs), right = pathExpr.evaluate(rhs);
 
- 		/*
 
- 		Compare the two values; if they differ, return.  If they are
 
- 		the same, move on to the next key.
 
- 		*/
 
- 		var cmp = Value.compare(left, right);
 
- 		if (cmp) {
 
- 			/* if necessary, adjust the return value by the key ordering */
 
- 			if (!this.vAscending[i])
 
- 				cmp = -cmp;
 
- 			return cmp;
 
- 		}
 
- 	}
 
- 	/**
 
- 	* If we got here, everything matched (or didn't exist), so we'll
 
- 	* consider the documents equal for purposes of this sort
 
- 	**/
 
- 	return 0;
 
- };
 
- /**
 
-  * Write out an object whose contents are the sort key.
 
-  *
 
-  * @param {bool} explain
 
-  * @return {Object} key
 
- **/
 
- proto.serializeSortKey = function serializeSortKey(explain) {
 
- 	var keyObj = {};
 
- 	// add the key fields
 
- 	var n = this.vSortKey.length;
 
- 	for (var i = 0; i < n; i++) {
 
- 		if ( this.vSortKey[i] instanceof FieldPathExpression ) {
 
- 			var fieldPath = this.vSortKey[i].getFieldPath(false).fieldNames.slice(1).join(".");
 
- 			// append a named integer based on the sort order
 
- 			keyObj[fieldPath] = this.vAscending[i] ? 1 : -1;
 
- 		} else {
 
- 			// other expressions use a made-up field name
 
- 			keyObj[{"$computed":i}] = this.vSortKey[i].serialize(explain);
 
- 		}
 
- 	}
 
- 	return keyObj;
 
- };
 
- /**
 
-  * Creates a new SortDocumentSource from Json
 
-  *
 
-  * @param {Object} elem
 
-  * @param {Object} expCtx
 
-  *
 
- **/
 
- klass.createFromJson = function createFromJson(elem, expCtx) {
 
- 	if (typeof elem !== "object") throw new Error("code 15973; the " + klass.sortName + " key specification must be an object");
 
- 	return klass.create(expCtx, elem);
 
- };
 
- /**
 
-  * Creates a new SortDocumentSource
 
-  *
 
-  * @param {Object} expCtx
 
-  * @param {object} sortorder
 
-  * @param {int} limit
 
-  *
 
- **/
 
- klass.create = function create(expCtx, sortOrder, limit) {
 
- 	var Sort = proto.getFactory(),
 
- 		nextSort = new Sort(expCtx);
 
- 	/* check for then iterate over the sort object */
 
- 	var sortKeys = 0;
 
- 	for(var keyField in sortOrder) { //jshint ignore:line
 
- 		var fieldName = keyField.fieldName;
 
- 		if ( fieldName === "$mergePresorted" ){
 
- 			Sort._mergePresorted = true;
 
- 			continue;
 
- 		}
 
- 		if ( keyField instanceof Object) {
 
- 			// this restriction is due to needing to figure out sort direction
 
- 			throw new Error("code 17312; " + klass.sortName + "the only expression supported by $sort right now is {$meta: 'textScore'}");
 
- 		}
 
- 		if (typeof sortOrder[keyField] !== "number")
 
- 			throw new Error("code 15974; " + klass.sortName + "$sort key ordering must be specified using a number or {$meta: 'text'}");
 
- 		// RedBeard0531 can the thanked.
 
- 		var sortDirection = 0;
 
- 		sortDirection = sortOrder[keyField];
 
- 		if (sortDirection !== 1 && sortDirection !== -1)
 
- 			throw new Error("code 15975; " + klass.sortName + " $sort key ordering must be 1 (for ascending) or -1 (for descending)");
 
- 		nextSort.addKey(keyField, (sortDirection > 0));
 
- 		++sortKeys;
 
- 	}
 
- 	if (sortKeys <= 0)
 
- 		throw new Error("code 15976; " + klass.sortName + " must have at least one sort key");
 
- 	if ( limit > 0) {
 
- 		var coalesced = nextSort.coalesce(LimitDocumentSource.create(expCtx, limit));
 
- 		if (!coalesced) throw new Error("Assertion failure"); // should always coalesce
 
- 		if (nextSort.getLimit() === limit) throw new Error("Assertion failure"); // should always coalesce
 
- 	}
 
- 	return nextSort;
 
- };
 
- // SplittableDocumentSource implementation.
 
- klass.isSplittableDocumentSource = true;
 
- /**
 
-  * Get dependencies.
 
-  *
 
-  * @param deps
 
-  * @returns {number}
 
-  */
 
- proto.getDependencies = function getDependencies(deps) {
 
- 	for(var i = 0; i < this.vSortKey.length; i++) {
 
- 		this.vSortKey[i].addDependencies(deps);
 
- 	}
 
- 	return DocumentSource.GetDepsReturn.SEE_NEXT;
 
- };
 
- /**
 
-  * Get shard source.
 
-  *
 
-  * @returns {this}
 
-  */
 
- proto.getShardSource = function getShardSource() {
 
- 	if (this._mergePresorted) throw new Error("getShardSource", + klass.sortName + " should not be merging presorted");
 
- 	return this;
 
- };
 
- /**
 
-  * Get merge source.
 
-  *
 
-  * @returns {SortDocumentSource}
 
-  */
 
- proto.getMergeSource = function getMergeSource() {
 
- 	if ( this._mergingPresorted) throw new Error("getMergeSource", + klass.sortName + " should not be merging presorted");
 
- 	var other = new SortDocumentSource();
 
- 	other.vAscending = this.vAscending;
 
- 	other.vSortKey = this.vSortKey;
 
- 	other.limitSrc = this.limitSrc;
 
- 	other._mergingPresorted = true;
 
- 	return other;
 
- };
 
 
  |