SortDocumentSource.js 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. "use strict";
  2. var async = require("async"),
  3. DocumentSource = require("./DocumentSource"),
  4. LimitDocumentSource = require("./LimitDocumentSource");
  5. /**
  6. * A document source sorter
  7. *
  8. * //NOTE: DEVIATION FROM THE MONGO: We don't have shards, this inherits from DocumentSource, instead of SplittableDocumentSource
  9. *
  10. * @class SortDocumentSource
  11. * @namespace mungedb-aggregate.pipeline.documentSources
  12. * @module mungedb-aggregate
  13. * @constructor
  14. * @param [ctx] {ExpressionContext}
  15. **/
  16. var SortDocumentSource = module.exports = function SortDocumentSource(ctx){
  17. if (arguments.length > 1) throw new Error("up to one arg expected");
  18. base.call(this, ctx);
  19. /*
  20. * Before returning anything, this source must fetch everything from
  21. * the underlying source and group it. populate() is used to do that
  22. * on the first call to any method on this source. The populated
  23. * boolean indicates that this has been done
  24. **/
  25. this.populated = false;
  26. this.docIterator = null; // a number tracking our position in the documents array
  27. this.documents = []; // an array of documents
  28. this.vSortKey = [];
  29. this.vAscending = [];
  30. }, klass = SortDocumentSource, base = require('./DocumentSource'), proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
  31. // DEPENDENCIES
  32. var FieldPathExpression = require("../expressions/FieldPathExpression"),
  33. Value = require("../Value");
  34. klass.sortName = "$sort";
  35. proto.getSourceName = function getSourceName(){
  36. return klass.sortName;
  37. };
  38. proto.getFactory = function getFactory(){
  39. return klass; // using the ctor rather than a separate .create() method
  40. };
  41. klass.GetDepsReturn = {
  42. SEE_NEXT: "SEE_NEXT" // Add the next Source's deps to the set
  43. };
  44. proto.dispose = function dispose() {
  45. this.docIterator = 0;
  46. this.documents = [];
  47. this.source.dispose();
  48. };
  49. proto.getLimit = function getLimit() {
  50. return this.limitSrc ? this.limitSrc.getLimit() : -1;
  51. };
  52. proto.getDependencies = function getDependencies(deps) {
  53. for(var i = 0; i < this.vSortKey.length; ++i) {
  54. this.vSortKey[i].addDependencies(deps);
  55. }
  56. return klass.GetDepsReturn.SEE_NEXT;
  57. };
  58. proto.coalesce = function coalesce(nextSource) {
  59. if (!this.limitSrc) {
  60. if (nextSource instanceof LimitDocumentSource) {
  61. this.limitSrc = nextSource;
  62. return nextSource;
  63. }
  64. return false;
  65. } else {
  66. return this.limitSrc.coalesce(nextSource);
  67. }
  68. };
  69. proto.getNext = function getNext(callback) {
  70. if (!callback) throw new Error(this.getSourceName() + ' #getNext() requires callback');
  71. var self = this,
  72. out;
  73. async.series(
  74. [
  75. function(next) {
  76. if (!self.populated)
  77. self.populate(function(err) {
  78. return next(err);
  79. });
  80. else
  81. next();
  82. },
  83. function(next) {
  84. if (self.docIterator >= self.documents.length) {
  85. out = DocumentSource.EOF;
  86. return next(null, DocumentSource.EOF);
  87. }
  88. var output = self.documents[self.docIterator++];
  89. if (!output || output === DocumentSource.EOF) {
  90. out = DocumentSource.EOF;
  91. return next(null, DocumentSource.EOF);
  92. }
  93. out = output;
  94. return next(null, output);
  95. }
  96. ],
  97. function(err, results) {
  98. return callback(err, out);
  99. }
  100. );
  101. return out;
  102. };
  103. proto.serializeToArray = function serializeToArray(array, explain) {
  104. var doc = {};
  105. if (explain) {
  106. doc.sortKey = this.serializeSortKey();
  107. doc.mergePresorted = this._mergePresorted;
  108. doc.limit = this.limitSrc ? this.limitSrc.getLimit() : undefined;
  109. array.push(doc);
  110. } else {
  111. var inner = this.serializeSortKey();
  112. if (this._mergePresorted)
  113. inner.$mergePresorted = true;
  114. doc[this.getSourceName()] = inner;
  115. array.push(doc);
  116. if (this.limitSrc)
  117. this.limitSrc.serializeToArray(array);
  118. }
  119. };
  120. proto.serialize = function serialize(explain) {
  121. throw new Error("should call serializeToArray instead");
  122. };
  123. /**
  124. * Add sort key field.
  125. *
  126. * Adds a sort key field to the key being built up. A concatenated
  127. * key is built up by calling this repeatedly.
  128. *
  129. * @param {String} fieldPath the field path to the key component
  130. * @param {bool} ascending if true, use the key for an ascending sort, otherwise, use it for descending
  131. **/
  132. proto.addKey = function addKey(fieldPath, ascending) {
  133. var pathExpr = new FieldPathExpression(fieldPath);
  134. this.vSortKey.push(pathExpr);
  135. if (ascending === true || ascending === false) {
  136. this.vAscending.push(ascending);
  137. } else {
  138. // This doesn't appear to be an error in real mongo?
  139. throw new Error("ascending must be true or false");
  140. }
  141. };
  142. proto.populate = function populate(callback) {
  143. /* make sure we've got a sort key */
  144. if (!this.vSortKey.length) throw new Error("no sort key for " + this.getSourceName());
  145. // Skipping stuff about mergeCursors and commandShards
  146. /* pull everything from the underlying source */
  147. var self = this,
  148. next;
  149. async.doWhilst(
  150. function (cb) {
  151. self.source.getNext(function(err, doc) {
  152. next = doc;
  153. // Don't add EOF; it doesn't sort well.
  154. if (doc !== DocumentSource.EOF)
  155. self.documents.push(doc);
  156. return cb();
  157. });
  158. },
  159. function() {
  160. return next !== DocumentSource.EOF;
  161. },
  162. function(err) {
  163. /* sort the list */
  164. self.documents.sort(SortDocumentSource.prototype.compare.bind(self));
  165. /* start the sort iterator */
  166. self.docIterator = 0;
  167. self.populated = true;
  168. return callback();
  169. }
  170. );
  171. };
  172. /**
  173. * Compare two documents according to the specified sort key.
  174. *
  175. * @param {Object} pL the left side doc
  176. * @param {Object} pR the right side doc
  177. * @returns {Number} a number less than, equal to, or greater than zero, indicating pL < pR, pL == pR, or pL > pR, respectively
  178. **/
  179. proto.compare = function compare(pL,pR) {
  180. /**
  181. * populate() already checked that there is a non-empty sort key,
  182. * so we shouldn't have to worry about that here.
  183. *
  184. * However, the tricky part is what to do is none of the sort keys are
  185. * present. In this case, consider the document less.
  186. **/
  187. var n = this.vSortKey.length;
  188. for(var i = 0; i < n; ++i) {
  189. /* evaluate the sort keys */
  190. var pathExpr = new FieldPathExpression(this.vSortKey[i].getFieldPath(false));
  191. var left = pathExpr.evaluate(pL), right = pathExpr.evaluate(pR);
  192. /*
  193. Compare the two values; if they differ, return. If they are
  194. the same, move on to the next key.
  195. */
  196. var cmp = Value.compare(left, right);
  197. if (cmp) {
  198. /* if necessary, adjust the return value by the key ordering */
  199. if (!this.vAscending[i])
  200. cmp = -cmp;
  201. return cmp;
  202. }
  203. }
  204. /**
  205. * If we got here, everything matched (or didn't exist), so we'll
  206. * consider the documents equal for purposes of this sort
  207. **/
  208. return 0;
  209. };
  210. /**
  211. * Write out an object whose contents are the sort key.
  212. **/
  213. proto.serializeSortKey = function sortKeyToJson() {
  214. var keyObj = {};
  215. var n = this.vSortKey.length;
  216. for (var i = 0; i < n; i++) {
  217. var fieldPath = this.vSortKey[i].getFieldPath();
  218. keyObj[fieldPath] = this.vAscending[i] ? 1 : -1;
  219. }
  220. return keyObj;
  221. };
  222. /**
  223. * Creates a new SortDocumentSource
  224. * @param {Object} jsonElement
  225. **/
  226. klass.createFromJson = function createFromJson(jsonElement, ctx) {
  227. if (typeof jsonElement !== "object") throw new Error("code 15973; the " + klass.sortName + " key specification must be an object");
  228. var Sort = proto.getFactory(),
  229. nextSort = new Sort(ctx);
  230. /* check for then iterate over the sort object */
  231. var sortKeys = 0;
  232. for(var key in jsonElement) {
  233. var sortOrder = 0;
  234. if (typeof jsonElement[key] !== "number") throw new Error("code 15974; " + klass.sortName + " key ordering must be specified using a number");
  235. sortOrder = jsonElement[key];
  236. if ((sortOrder != 1) && (sortOrder !== -1)) throw new Error("code 15975; " + klass.sortName + " key ordering must be 1 (for ascending) or 0 (for descending)");
  237. nextSort.addKey(key, (sortOrder > 0));
  238. ++sortKeys;
  239. }
  240. if (sortKeys <= 0) throw new Error("code 15976; " + klass.sortName + " must have at least one sort key");
  241. return nextSort;
  242. };