SortDocumentSource.js 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483
  1. "use strict";
  2. var async = require("async"),
  3. DocumentSource = require("./DocumentSource"),
  4. LimitDocumentSource = require("./LimitDocumentSource"),
  5. Document = require('../Document');
  6. /**
  7. * A document source sorter
  8. *
  9. * //NOTE: DEVIATION FROM THE MONGO: We don't have shards, this inherits from DocumentSource, instead of SplittableDocumentSource
  10. *
  11. * @class SortDocumentSource
  12. * @namespace mungedb-aggregate.pipeline.documentSources
  13. * @module mungedb-aggregate
  14. * @constructor
  15. * @param [ctx] {ExpressionContext}
  16. **/
  17. var SortDocumentSource = module.exports = function SortDocumentSource(ctx){
  18. if (arguments.length > 1) throw new Error("up to one arg expected");
  19. base.call(this, ctx);
  20. /*
  21. * Before returning anything, this source must fetch everything from
  22. * the underlying source and group it. populate() is used to do that
  23. * on the first call to any method on this source. The populated
  24. * boolean indicates that this has been done
  25. **/
  26. this.populated = false;
  27. this.docIterator = null; // a number tracking our position in the documents array
  28. this.documents = []; // an array of documents
  29. this.vSortKey = [];
  30. this.vAscending = [];
  31. }, klass = SortDocumentSource, base = require('./DocumentSource'), proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
  32. // DEPENDENCIES
  33. var FieldPathExpression = require("../expressions/FieldPathExpression"),
  34. VariablesIdGenerator = require("../expressions/VariablesIdGenerator"),
  35. VariablesParseState = require("../expressions/VariablesParseState"),
  36. Variables = require("../expressions/Variables"),
  37. Value = require("../Value");
  38. klass.sortName = "$sort";
  39. proto.getSourceName = function getSourceName(){
  40. return klass.sortName;
  41. };
  42. proto.getFactory = function getFactory(){
  43. return klass; // using the ctor rather than a separate .create() method
  44. };
  45. proto.dispose = function dispose() {
  46. this.docIterator = 0;
  47. this.documents = [];
  48. this._output.reset();
  49. this.source.dispose();
  50. };
  51. proto.getLimit = function getLimit() {
  52. return this.limitSrc ? this.limitSrc.getLimit() : -1;
  53. };
  54. proto.coalesce = function coalesce(nextSource) {
  55. if (!this.limitSrc) {
  56. if (nextSource instanceof LimitDocumentSource) {
  57. this.limitSrc = nextSource;
  58. return nextSource;
  59. }
  60. return false;
  61. } else {
  62. return this.limitSrc.coalesce(nextSource);
  63. }
  64. };
  65. proto.getNext = function getNext(callback) {
  66. if (!callback) throw new Error(this.getSourceName() + ' #getNext() requires callback');
  67. if (this.expCtx instanceof Object && this.expCtx.checkForInterrupt && this.expCtx.checkForInterrupt() === false)
  68. return callback(new Error("Interrupted"));
  69. var self = this,
  70. out;
  71. async.series(
  72. [
  73. function(next) {
  74. if (!self.populated)
  75. {
  76. self.populate(function(err) {
  77. return next(err);
  78. });
  79. } else {
  80. return next();
  81. }
  82. },
  83. function(next) {
  84. if (self.docIterator >= self.documents.length) {
  85. out = null;
  86. return next(null, null);
  87. }
  88. var output = self.documents[self.docIterator++];
  89. if (!output || output === null) {
  90. out = null;
  91. return next(null, null);
  92. }
  93. out = output;
  94. return next(null, output);
  95. }
  96. ],
  97. function(err, results) {
  98. return callback(err, out);
  99. }
  100. );
  101. return out;
  102. };
  103. /**
  104. * Serialize to Array.
  105. *
  106. * @param {Array} array
  107. * @param {bool} explain
  108. **/
  109. proto.serializeToArray = function serializeToArray(array, explain) {
  110. var doc = {};
  111. if (explain) { // always one obj for combined $sort + $limit
  112. doc.sortKey = this.serializeSortKey(explain);
  113. doc.mergePresorted = this._mergePresorted;
  114. doc.limit = this.limitSrc ? this.limitSrc.getLimit() : undefined;
  115. array.push(doc);
  116. } else { // one Value for $sort and maybe a Value for $limit
  117. var inner = {};
  118. inner = this.serializeSortKey(explain);
  119. if (this._mergePresorted)
  120. inner.$mergePresorted = true;
  121. doc[this.getSourceName()] = inner;
  122. array.push(doc);
  123. if (this.limitSrc)
  124. this.limitSrc.serializeToArray(array);
  125. }
  126. };
  127. proto.serialize = function serialize(explain) {
  128. throw new Error("should call serializeToArray instead");
  129. };
  130. /**
  131. * Add sort key field.
  132. *
  133. * Adds a sort key field to the key being built up. A concatenated
  134. * key is built up by calling this repeatedly.
  135. *
  136. * @param {String} fieldPath the field path to the key component
  137. * @param {bool} ascending if true, use the key for an ascending sort, otherwise, use it for descending
  138. **/
  139. proto.addKey = function addKey(fieldPath, ascending) {
  140. var idGenerator = new VariablesIdGenerator(),
  141. vps = new VariablesParseState(idGenerator);
  142. var pathExpr = FieldPathExpression.parse("$$ROOT." + fieldPath, vps);
  143. this.vSortKey.push(pathExpr);
  144. if (ascending === true || ascending === false) {
  145. this.vAscending.push(ascending);
  146. } else {
  147. // This doesn't appear to be an error in real mongo?
  148. throw new Error("ascending must be true or false");
  149. }
  150. };
  151. proto.makeSortOptions = function makeSortOptions(){
  152. /* make sure we've got a sort key */
  153. if (!this.vSortKey.length) throw new Error("no sort key for " + this.getSourceName());
  154. // Skipping memory checks
  155. var opts;
  156. if ( this.limitSrc)
  157. opts.limit = this.limitSrc.getLimt();
  158. return opts;
  159. };
  160. proto.populate = function populate(callback) {
  161. if ( this._mergePresorted ){
  162. // Skipping stuff about mergeCursors and commandShards
  163. throw new Error("Merge presorted not implemented.");
  164. } else {
  165. /* pull everything from the underlying source */
  166. var self = this,
  167. next;
  168. async.doWhilst(
  169. function (cb) {
  170. self.source.getNext(function(err, doc) {
  171. next = doc;
  172. // Don't add EOF; it doesn't sort well.
  173. if (doc !== null)
  174. self.documents.push(doc);
  175. return cb();
  176. });
  177. },
  178. function() {
  179. return next !== null;
  180. },
  181. function(err) {
  182. try {
  183. /* sort the list */
  184. self.documents.sort(SortDocumentSource.prototype.compare.bind(self));
  185. } catch (ex) {
  186. return callback(ex);
  187. }
  188. /* start the sort iterator */
  189. self.docIterator = 0;
  190. self.populated = true;
  191. //self._output.reset(true);
  192. return callback();
  193. }
  194. );
  195. }
  196. this.populated = true;
  197. };
  198. klass.IteratorFromCursor = (function(){
  199. /**
  200. * Helper class to unwind arrays within a series of documents.
  201. * @param {String} unwindPath is the field path to the array to unwind.
  202. **/
  203. var klass = function IteratorFromCursor(sorter, cursor){
  204. this._sorter = new SortDocumentSource(sorter);
  205. //this._cursor = new DBClientCursor(cursor);
  206. }, base = Object, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
  207. proto.more = function more() {
  208. return this._cursor.more();
  209. };
  210. proto.next = function next() {
  211. // var doc = new DocumentSourceMergeCursors(this._cursor);
  212. // TODO: make_pair for return
  213. //return {this._sorter.extractKey(doc): doc};
  214. };
  215. return klass;
  216. })();
  217. proto.populateFromCursors = function populateFromCursors(cursors){
  218. for (var i = 0; i < cursors.length; i++) {
  219. // TODO Create class
  220. //this.iterators.push(boost::make_shared<IteratorFromBsonArray>(this, cursors[i]));
  221. }
  222. this._output.reset( ); // TODO: MySorter::Iterator::merge(iterators, makeSortOptions(), Comparator(*this))
  223. };
  224. klass.IteratorFromBsonArray = (function(){
  225. /**
  226. * Helper class to unwind arrays within a series of documents.
  227. * @param {String} unwindPath is the field path to the array to unwind.
  228. **/
  229. var klass = function IteratorFromBsonArray(sorter, array){
  230. this._sorter = new SortDocumentSource(sorter);
  231. //this._iterator = new BSONObjIterator(array);
  232. }, base = Object, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
  233. proto.next = function next() {
  234. // var doc = new DocumentSourceMergeCursors(this._cursor);
  235. // TODO: make_pair for return
  236. //return {this._sorter.extractKey(doc): doc};
  237. };
  238. proto.more = function more() {
  239. return this._cursor.more();
  240. };
  241. return klass;
  242. })();
  243. proto.populateFromBsonArrays = function populateFromBsonArrays(arrays){
  244. for (var i = 0; i < arrays.lenth; i++) {
  245. // TODO Create class
  246. //this.iterators.push(boost::make_shared<IteratorFromBsonArray>(this, arrays[i]));
  247. }
  248. this._output.reset( ); // TODO: MySorter::Iterator::merge(iterators, makeSortOptions(), Comparator(*this))
  249. };
  250. /**
  251. * Extract the key
  252. *
  253. * @param {d} document
  254. * @returns {keys} extracted key
  255. **/
  256. proto.extractKey = function extractKey(d){
  257. var vars = new Variables(0,d);
  258. if ( this.vSortKey.length == 1)
  259. return this.vSortKey[0].evaluate(vars);
  260. var keys;
  261. for (var i=0; i < this.vSortKey.length; i++) {
  262. keys.push(this.vSortKey[i].evaluate(vars));
  263. }
  264. return keys;
  265. };
  266. /**
  267. * Compare two documents according to the specified sort key.
  268. *
  269. * @param {Object} lhs the left side doc
  270. * @param {Object} rhs the right side doc
  271. * @returns {Number} a number less than, equal to, or greater than zero, indicating pL < pR, pL == pR, or pL > pR, respectively
  272. **/
  273. proto.compare = function compare(lhs,rhs) {
  274. /*
  275. populate() already checked that there is a non-empty sort key,
  276. so we shouldn't have to worry about that here.
  277. However, the tricky part is what to do is none of the sort keys are
  278. present. In this case, consider the document less.
  279. */
  280. for(var i = 0, n = this.vSortKey.length; i < n; ++i) {
  281. var pathExpr = FieldPathExpression.create(this.vSortKey[i].getFieldPath(false).fieldNames.slice(1).join('.'));
  282. /* evaluate the sort keys */
  283. var left = pathExpr.evaluate(lhs), right = pathExpr.evaluate(rhs);
  284. /*
  285. Compare the two values; if they differ, return. If they are
  286. the same, move on to the next key.
  287. */
  288. var cmp = Value.compare(left, right);
  289. if (cmp) {
  290. /* if necessary, adjust the return value by the key ordering */
  291. if (!this.vAscending[i])
  292. cmp = -cmp;
  293. return cmp;
  294. }
  295. }
  296. /**
  297. * If we got here, everything matched (or didn't exist), so we'll
  298. * consider the documents equal for purposes of this sort
  299. **/
  300. return 0;
  301. };
  302. /**
  303. * Write out an object whose contents are the sort key.
  304. *
  305. * @param {bool} explain
  306. * @return {Object} key
  307. **/
  308. proto.serializeSortKey = function serializeSortKey(explain) {
  309. var keyObj = {};
  310. // add the key fields
  311. var n = this.vSortKey.length;
  312. for (var i = 0; i < n; i++) {
  313. if ( this.vSortKey[i] instanceof FieldPathExpression ) {
  314. var fieldPath = this.vSortKey[i].getFieldPath(false).fieldNames.slice(1).join('.');
  315. // append a named integer based on the sort order
  316. keyObj[fieldPath] = this.vAscending[i] ? 1 : -1;
  317. } else {
  318. // other expressions use a made-up field name
  319. keyObj[{"$computed":i}] = this.vSortKey[i].serialize(explain);
  320. }
  321. }
  322. return keyObj;
  323. };
  324. /**
  325. * Creates a new SortDocumentSource from Json
  326. *
  327. * @param {Object} elem
  328. * @param {Object} expCtx
  329. *
  330. **/
  331. klass.createFromJson = function createFromJson(elem, expCtx) {
  332. if (typeof elem !== "object") throw new Error("code 15973; the " + klass.sortName + " key specification must be an object");
  333. return klass.create(expCtx, elem);
  334. };
  335. /**
  336. * Creates a new SortDocumentSource
  337. *
  338. * @param {Object} expCtx
  339. * @param {object} sortorder
  340. * @param {int} limit
  341. *
  342. **/
  343. klass.create = function create(expCtx, sortOrder, limit) {
  344. var Sort = proto.getFactory(),
  345. nextSort = new Sort(expCtx);
  346. /* check for then iterate over the sort object */
  347. var sortKeys = 0;
  348. for(var keyField in sortOrder) {
  349. var fieldName = keyField.fieldName;
  350. if ( fieldName === "$mergePresorted" ){
  351. Sort._mergePresorted = true;
  352. continue;
  353. }
  354. if ( keyField instanceof Object) {
  355. // this restriction is due to needing to figure out sort direction
  356. throw new Error("code 17312; " + klass.sortName + "the only expression supported by $sort right now is {$meta: 'textScore'}");
  357. }
  358. if (typeof sortOrder[keyField] !== "number") throw new Error("code 15974; " + klass.sortName + "$sort key ordering must be specified using a number or {$meta: 'text'}");
  359. // RedBeard0531 can the thanked.
  360. var sortDirection = 0;
  361. sortDirection = sortOrder[keyField];
  362. if ((sortDirection != 1) && (sortDirection !== -1)) throw new Error("code 15975; " + klass.sortName + " $sort key ordering must be 1 (for ascending) or -1 (for descending)");
  363. nextSort.addKey(keyField, (sortDirection > 0));
  364. ++sortKeys;
  365. }
  366. if (sortKeys <= 0) throw new Error("code 15976; " + klass.sortName + " must have at least one sort key");
  367. if ( limit > 0) {
  368. var coalesced = nextSort.coalesce( create(expCtx, limit));
  369. // should always coalesce
  370. }
  371. return nextSort;
  372. };
  373. // SplittableDocumentSource implementation.
  374. klass.isSplittableDocumentSource = true;
  375. /**
  376. * Get dependencies.
  377. *
  378. * @param deps
  379. * @returns {number}
  380. */
  381. proto.getDependencies = function getDependencies(deps) {
  382. for(var i = 0; i < this.vSortKey.length; i++) {
  383. this.vSortKey[i].addDependencies(deps);
  384. }
  385. return DocumentSource.GetDepsReturn.SEE_NEXT;
  386. };
  387. /**
  388. * Get shard source.
  389. *
  390. * @returns {this}
  391. */
  392. proto.getShardSource = function getShardSource() {
  393. if (this._mergePresorted) throw new Error("getShardSource", + klass.sortName + " should not be merging presorted");
  394. return this;
  395. };
  396. /**
  397. * Get merge source.
  398. *
  399. * @returns {SortDocumentSource}
  400. */
  401. proto.getMergeSource = function getMergeSource() {
  402. if ( this._mergingPresorted) throw new Error("getMergeSource", + klass.sortName + " should not be merging presorted");
  403. var other = new SortDocumentSource();
  404. other.vAscending = this.vAscending;
  405. other.vSortKey = this.vSortKey;
  406. other.limitSrc = this.limitSrc;
  407. other._mergingPresorted = true;
  408. return other;
  409. };