SortDocumentSource.js 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488
  1. "use strict";
  2. var async = require("async"),
  3. DocumentSource = require("./DocumentSource"),
  4. LimitDocumentSource = require("./LimitDocumentSource");
  5. /**
  6. * A document source sorter
  7. *
  8. * //NOTE: DEVIATION FROM THE MONGO: We don't have shards, this inherits from DocumentSource, instead of SplittableDocumentSource
  9. *
  10. * @class SortDocumentSource
  11. * @namespace mungedb-aggregate.pipeline.documentSources
  12. * @module mungedb-aggregate
  13. * @constructor
  14. * @param [ctx] {ExpressionContext}
  15. **/
  16. var SortDocumentSource = module.exports = function SortDocumentSource(ctx){
  17. if (arguments.length > 1) throw new Error("up to one arg expected");
  18. base.call(this, ctx);
  19. /*
  20. * Before returning anything, this source must fetch everything from
  21. * the underlying source and group it. populate() is used to do that
  22. * on the first call to any method on this source. The populated
  23. * boolean indicates that this has been done
  24. **/
  25. this.populated = false;
  26. this.docIterator = null; // a number tracking our position in the documents array
  27. this.documents = []; // an array of documents
  28. this.vSortKey = [];
  29. this.vAscending = [];
  30. }, klass = SortDocumentSource, base = require("./DocumentSource"), proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}}); //jshint ignore:line
  31. // DEPENDENCIES
  32. var FieldPathExpression = require("../expressions/FieldPathExpression"),
  33. VariablesIdGenerator = require("../expressions/VariablesIdGenerator"),
  34. VariablesParseState = require("../expressions/VariablesParseState"),
  35. Variables = require("../expressions/Variables"),
  36. Value = require("../Value");
  37. klass.sortName = "$sort";
  38. proto.getSourceName = function getSourceName(){
  39. return klass.sortName;
  40. };
  41. proto.getFactory = function getFactory(){
  42. return klass; // using the ctor rather than a separate .create() method
  43. };
  44. proto.dispose = function dispose() {
  45. this.docIterator = 0;
  46. this.documents = [];
  47. this._output = undefined;
  48. this.source.dispose();
  49. };
  50. proto.getLimit = function getLimit() {
  51. return this.limitSrc ? this.limitSrc.getLimit() : -1;
  52. };
  53. proto.coalesce = function coalesce(nextSource) {
  54. if (!this.limitSrc) {
  55. if (nextSource instanceof LimitDocumentSource) {
  56. this.limitSrc = nextSource;
  57. return nextSource;
  58. }
  59. return false;
  60. } else {
  61. return this.limitSrc.coalesce(nextSource);
  62. }
  63. };
  64. proto.getNext = function getNext(callback) {
  65. if (!callback) throw new Error(this.getSourceName() + " #getNext() requires callback");
  66. if (this.expCtx instanceof Object && this.expCtx.checkForInterrupt && this.expCtx.checkForInterrupt() === false)
  67. return callback(new Error("Interrupted"));
  68. var self = this,
  69. out;
  70. async.series(
  71. [
  72. function(next) {
  73. if (!self.populated)
  74. {
  75. self.populate(function(err) {
  76. return next(err);
  77. });
  78. } else {
  79. return next();
  80. }
  81. },
  82. function(next) {
  83. if (self.docIterator >= self.documents.length) {
  84. out = null;
  85. return next(null, null);
  86. }
  87. var output = self.documents[self.docIterator++];
  88. if (!output || output === null) {
  89. out = null;
  90. return next(null, null);
  91. }
  92. out = output;
  93. return next(null, output);
  94. }
  95. ],
  96. function(err, results) {
  97. return callback(err, out);
  98. }
  99. );
  100. return out;
  101. };
  102. /**
  103. * Serialize to Array.
  104. *
  105. * @param {Array} array
  106. * @param {bool} explain
  107. **/
  108. proto.serializeToArray = function serializeToArray(array, explain) {
  109. var doc = {};
  110. if (explain) { // always one obj for combined $sort + $limit
  111. doc.sortKey = this.serializeSortKey(explain);
  112. doc.mergePresorted = this._mergePresorted;
  113. doc.limit = this.limitSrc ? this.limitSrc.getLimit() : undefined;
  114. array.push(doc);
  115. } else { // one Value for $sort and maybe a Value for $limit
  116. var inner = {};
  117. inner = this.serializeSortKey(explain);
  118. if (this._mergePresorted)
  119. inner.$mergePresorted = true;
  120. doc[this.getSourceName()] = inner;
  121. array.push(doc);
  122. if (this.limitSrc)
  123. this.limitSrc.serializeToArray(array);
  124. }
  125. };
  126. proto.serialize = function serialize(explain) {
  127. throw new Error("should call serializeToArray instead");
  128. };
  129. /**
  130. * Add sort key field.
  131. *
  132. * Adds a sort key field to the key being built up. A concatenated
  133. * key is built up by calling this repeatedly.
  134. *
  135. * @param {String} fieldPath the field path to the key component
  136. * @param {bool} ascending if true, use the key for an ascending sort, otherwise, use it for descending
  137. **/
  138. proto.addKey = function addKey(fieldPath, ascending) {
  139. var idGenerator = new VariablesIdGenerator(),
  140. vps = new VariablesParseState(idGenerator);
  141. var pathExpr = FieldPathExpression.parse("$$ROOT." + fieldPath, vps);
  142. this.vSortKey.push(pathExpr);
  143. if (ascending === true || ascending === false) {
  144. this.vAscending.push(ascending);
  145. } else {
  146. // This doesn't appear to be an error in real mongo?
  147. throw new Error("ascending must be true or false");
  148. }
  149. };
  150. proto.makeSortOptions = function makeSortOptions(){
  151. /* make sure we've got a sort key */
  152. if (!this.vSortKey.length) throw new Error("no sort key for " + this.getSourceName());
  153. // Skipping memory checks
  154. var opts;
  155. if ( this.limitSrc)
  156. opts.limit = this.limitSrc.getLimt();
  157. return opts;
  158. };
  159. proto.populate = function populate(callback) {
  160. if ( this._mergePresorted ){
  161. // Skipping stuff about mergeCursors and commandShards
  162. throw new Error("Merge presorted not implemented.");
  163. } else {
  164. /* pull everything from the underlying source */
  165. var self = this,
  166. next;
  167. async.doWhilst(
  168. function (cb) {
  169. self.source.getNext(function(err, doc) {
  170. next = doc;
  171. // Don't add EOF; it doesn't sort well.
  172. if (doc !== null)
  173. self.documents.push(doc);
  174. return cb();
  175. });
  176. },
  177. function() {
  178. return next !== null;
  179. },
  180. function(err) {
  181. try {
  182. /* sort the list */
  183. self.documents.sort(SortDocumentSource.prototype.compare.bind(self));
  184. } catch (ex) {
  185. return callback(ex);
  186. }
  187. /* start the sort iterator */
  188. self.docIterator = 0;
  189. self.populated = true;
  190. //self._output.reset(true);
  191. return callback();
  192. }
  193. );
  194. }
  195. this.populated = true;
  196. };
  197. klass.IteratorFromCursor = (function(){
  198. /**
  199. * Helper class to unwind arrays within a series of documents.
  200. * @param {String} unwindPath is the field path to the array to unwind.
  201. **/
  202. var klass = function IteratorFromCursor(sorter, cursor){
  203. this._sorter = new SortDocumentSource(sorter);
  204. //this._cursor = new DBClientCursor(cursor);
  205. }, base = Object, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
  206. proto.more = function more() {
  207. return this._cursor.more();
  208. };
  209. proto.next = function next() {
  210. // var doc = new DocumentSourceMergeCursors(this._cursor);
  211. // TODO: make_pair for return
  212. //return {this._sorter.extractKey(doc): doc};
  213. };
  214. return klass;
  215. })();
  216. proto.populateFromCursors = function populateFromCursors(cursors){
  217. //TODO: umm, probably broken...
  218. // for (var i = 0; i < cursors.length; i++) {
  219. // // TODO Create class
  220. // //this.iterators.push(boost::make_shared<IteratorFromBsonArray>(this, cursors[i]));
  221. // }
  222. //this._output.reset( ); // TODO: MySorter::Iterator::merge(iterators, makeSortOptions(), Comparator(*this))
  223. throw new Error("this implementation is broken"); //TODO: fix?
  224. };
  225. klass.IteratorFromBsonArray = (function(){
  226. /**
  227. * Helper class to unwind arrays within a series of documents.
  228. * @param {String} unwindPath is the field path to the array to unwind.
  229. **/
  230. var klass = function IteratorFromBsonArray(sorter, array){
  231. this._sorter = new SortDocumentSource(sorter);
  232. //this._iterator = new BSONObjIterator(array);
  233. }, base = Object, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
  234. proto.next = function next() {
  235. // var doc = new DocumentSourceMergeCursors(this._cursor);
  236. // TODO: make_pair for return
  237. //return {this._sorter.extractKey(doc): doc};
  238. };
  239. proto.more = function more() {
  240. return this._cursor.more();
  241. };
  242. return klass;
  243. })();
  244. proto.populateFromBsonArrays = function populateFromBsonArrays(arrays){
  245. //TODO: umm, probably broken...
  246. // for (var i = 0; i < arrays.lenth; i++) {
  247. // // TODO Create class
  248. // //this.iterators.push(boost::make_shared<IteratorFromBsonArray>(this, arrays[i]));
  249. // }
  250. // this._output.reset( ); // TODO: MySorter::Iterator::merge(iterators, makeSortOptions(), Comparator(*this))
  251. throw new Error("this implementation is broken"); //TODO: fix?
  252. };
  253. /**
  254. * Extract the key
  255. *
  256. * @param {d} document
  257. * @returns {keys} extracted key
  258. **/
  259. proto.extractKey = function extractKey(d){
  260. var vars = new Variables(0,d);
  261. if ( this.vSortKey.length === 1)
  262. return this.vSortKey[0].evaluate(vars);
  263. var keys;
  264. for (var i=0; i < this.vSortKey.length; i++) {
  265. keys.push(this.vSortKey[i].evaluate(vars));
  266. }
  267. return keys;
  268. };
  269. /**
  270. * Compare two documents according to the specified sort key.
  271. *
  272. * @param {Object} lhs the left side doc
  273. * @param {Object} rhs the right side doc
  274. * @returns {Number} a number less than, equal to, or greater than zero, indicating pL < pR, pL == pR, or pL > pR, respectively
  275. **/
  276. proto.compare = function compare(lhs,rhs) {
  277. /*
  278. populate() already checked that there is a non-empty sort key,
  279. so we shouldn't have to worry about that here.
  280. However, the tricky part is what to do is none of the sort keys are
  281. present. In this case, consider the document less.
  282. */
  283. for(var i = 0, n = this.vSortKey.length; i < n; ++i) {
  284. var pathExpr = FieldPathExpression.create(this.vSortKey[i].getFieldPath(false).fieldNames.slice(1).join("."));
  285. /* evaluate the sort keys */
  286. var left = pathExpr.evaluate(lhs), right = pathExpr.evaluate(rhs);
  287. /*
  288. Compare the two values; if they differ, return. If they are
  289. the same, move on to the next key.
  290. */
  291. var cmp = Value.compare(left, right);
  292. if (cmp) {
  293. /* if necessary, adjust the return value by the key ordering */
  294. if (!this.vAscending[i])
  295. cmp = -cmp;
  296. return cmp;
  297. }
  298. }
  299. /**
  300. * If we got here, everything matched (or didn't exist), so we'll
  301. * consider the documents equal for purposes of this sort
  302. **/
  303. return 0;
  304. };
  305. /**
  306. * Write out an object whose contents are the sort key.
  307. *
  308. * @param {bool} explain
  309. * @return {Object} key
  310. **/
  311. proto.serializeSortKey = function serializeSortKey(explain) {
  312. var keyObj = {};
  313. // add the key fields
  314. var n = this.vSortKey.length;
  315. for (var i = 0; i < n; i++) {
  316. if ( this.vSortKey[i] instanceof FieldPathExpression ) {
  317. var fieldPath = this.vSortKey[i].getFieldPath(false).fieldNames.slice(1).join(".");
  318. // append a named integer based on the sort order
  319. keyObj[fieldPath] = this.vAscending[i] ? 1 : -1;
  320. } else {
  321. // other expressions use a made-up field name
  322. keyObj[{"$computed":i}] = this.vSortKey[i].serialize(explain);
  323. }
  324. }
  325. return keyObj;
  326. };
  327. /**
  328. * Creates a new SortDocumentSource from Json
  329. *
  330. * @param {Object} elem
  331. * @param {Object} expCtx
  332. *
  333. **/
  334. klass.createFromJson = function createFromJson(elem, expCtx) {
  335. if (typeof elem !== "object") throw new Error("code 15973; the " + klass.sortName + " key specification must be an object");
  336. return klass.create(expCtx, elem);
  337. };
  338. /**
  339. * Creates a new SortDocumentSource
  340. *
  341. * @param {Object} expCtx
  342. * @param {object} sortorder
  343. * @param {int} limit
  344. *
  345. **/
  346. klass.create = function create(expCtx, sortOrder, limit) {
  347. var Sort = proto.getFactory(),
  348. nextSort = new Sort(expCtx);
  349. /* check for then iterate over the sort object */
  350. var sortKeys = 0;
  351. for(var keyField in sortOrder) { //jshint ignore:line
  352. var fieldName = keyField.fieldName;
  353. if ( fieldName === "$mergePresorted" ){
  354. Sort._mergePresorted = true;
  355. continue;
  356. }
  357. if ( keyField instanceof Object) {
  358. // this restriction is due to needing to figure out sort direction
  359. throw new Error("code 17312; " + klass.sortName + "the only expression supported by $sort right now is {$meta: 'textScore'}");
  360. }
  361. if (typeof sortOrder[keyField] !== "number")
  362. throw new Error("code 15974; " + klass.sortName + "$sort key ordering must be specified using a number or {$meta: 'text'}");
  363. // RedBeard0531 can the thanked.
  364. var sortDirection = 0;
  365. sortDirection = sortOrder[keyField];
  366. if (sortDirection !== 1 && sortDirection !== -1)
  367. throw new Error("code 15975; " + klass.sortName + " $sort key ordering must be 1 (for ascending) or -1 (for descending)");
  368. nextSort.addKey(keyField, (sortDirection > 0));
  369. ++sortKeys;
  370. }
  371. if (sortKeys <= 0)
  372. throw new Error("code 15976; " + klass.sortName + " must have at least one sort key");
  373. if ( limit > 0) {
  374. var coalesced = nextSort.coalesce(LimitDocumentSource.create(expCtx, limit));
  375. if (!coalesced) throw new Error("Assertion failure"); // should always coalesce
  376. if (nextSort.getLimit() === limit) throw new Error("Assertion failure"); // should always coalesce
  377. }
  378. return nextSort;
  379. };
  380. // SplittableDocumentSource implementation.
  381. klass.isSplittableDocumentSource = true;
  382. /**
  383. * Get dependencies.
  384. *
  385. * @param deps
  386. * @returns {number}
  387. */
  388. proto.getDependencies = function getDependencies(deps) {
  389. for(var i = 0; i < this.vSortKey.length; i++) {
  390. this.vSortKey[i].addDependencies(deps);
  391. }
  392. return DocumentSource.GetDepsReturn.SEE_NEXT;
  393. };
  394. /**
  395. * Get shard source.
  396. *
  397. * @returns {this}
  398. */
  399. proto.getShardSource = function getShardSource() {
  400. if (this._mergePresorted) throw new Error("getShardSource", + klass.sortName + " should not be merging presorted");
  401. return this;
  402. };
  403. /**
  404. * Get merge source.
  405. *
  406. * @returns {SortDocumentSource}
  407. */
  408. proto.getMergeSource = function getMergeSource() {
  409. if ( this._mergingPresorted) throw new Error("getMergeSource", + klass.sortName + " should not be merging presorted");
  410. var other = new SortDocumentSource();
  411. other.vAscending = this.vAscending;
  412. other.vSortKey = this.vSortKey;
  413. other.limitSrc = this.limitSrc;
  414. other._mergingPresorted = true;
  415. return other;
  416. };