SortDocumentSource.js 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498
  1. "use strict";
  2. var async = require("async"),
  3. DocumentSource = require("./DocumentSource"),
  4. LimitDocumentSource = require("./LimitDocumentSource"),
  5. Document = require('../Document');
  6. /**
  7. * A document source sorter
  8. *
  9. * //NOTE: DEVIATION FROM THE MONGO: We don't have shards, this inherits from DocumentSource, instead of SplittableDocumentSource
  10. *
  11. * @class SortDocumentSource
  12. * @namespace mungedb-aggregate.pipeline.documentSources
  13. * @module mungedb-aggregate
  14. * @constructor
  15. * @param [ctx] {ExpressionContext}
  16. **/
  17. var SortDocumentSource = module.exports = function SortDocumentSource(ctx){
  18. if (arguments.length > 1) throw new Error("up to one arg expected");
  19. base.call(this, ctx);
  20. /*
  21. * Before returning anything, this source must fetch everything from
  22. * the underlying source and group it. populate() is used to do that
  23. * on the first call to any method on this source. The populated
  24. * boolean indicates that this has been done
  25. **/
  26. this.populated = false;
  27. this.docIterator = null; // a number tracking our position in the documents array
  28. this.documents = []; // an array of documents
  29. this.vSortKey = [];
  30. this.vAscending = [];
  31. }, klass = SortDocumentSource, base = require('./DocumentSource'), proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
  32. // DEPENDENCIES
  33. var FieldPathExpression = require("../expressions/FieldPathExpression"),
  34. VariablesIdGenerator = require("../expressions/VariablesIdGenerator"),
  35. VariablesParseState = require("../expressions/VariablesParseState"),
  36. Variables = require("../expressions/Variables"),
  37. Value = require("../Value");
  38. klass.sortName = "$sort";
  39. proto.getSourceName = function getSourceName(){
  40. return klass.sortName;
  41. };
  42. proto.getFactory = function getFactory(){
  43. return klass; // using the ctor rather than a separate .create() method
  44. };
  45. proto.dispose = function dispose() {
  46. this.docIterator = 0;
  47. this.documents = [];
  48. this._output.reset();
  49. this.source.dispose();
  50. };
  51. proto.getLimit = function getLimit() {
  52. return this.limitSrc ? this.limitSrc.getLimit() : -1;
  53. };
  54. proto.getDependencies = function getDependencies(deps) {
  55. for(var i = 0; i < this.vSortKey.length; ++i) {
  56. this.vSortKey[i].addDependencies(deps);
  57. }
  58. return DocumentSource.GetDepsReturn.SEE_NEXT;
  59. };
  60. proto.coalesce = function coalesce(nextSource) {
  61. if (!this.limitSrc) {
  62. if (nextSource instanceof LimitDocumentSource) {
  63. this.limitSrc = nextSource;
  64. return nextSource;
  65. }
  66. return false;
  67. } else {
  68. return this.limitSrc.coalesce(nextSource);
  69. }
  70. };
  71. proto.getNext = function getNext(callback) {
  72. if (!callback) throw new Error(this.getSourceName() + ' #getNext() requires callback');
  73. if (this.expCtx instanceof Object && this.expCtx.checkForInterrupt && this.expCtx.checkForInterrupt() === false)
  74. return callback(new Error("Interrupted"));
  75. var self = this,
  76. out;
  77. async.series(
  78. [
  79. function(next) {
  80. if (!self.populated)
  81. {
  82. self.populate(function(err) {
  83. return next(err);
  84. });
  85. } else {
  86. return next();
  87. }
  88. },
  89. function(next) {
  90. if (self.docIterator >= self.documents.length) {
  91. out = null;
  92. return next(null, null);
  93. }
  94. var output = self.documents[self.docIterator++];
  95. if (!output || output === null) {
  96. out = null;
  97. return next(null, null);
  98. }
  99. out = output;
  100. return next(null, output);
  101. }
  102. ],
  103. function(err, results) {
  104. return callback(err, out);
  105. }
  106. );
  107. return out;
  108. };
  109. /**
  110. * Serialize to Array.
  111. *
  112. * @param {Array} array
  113. * @param {bool} explain
  114. **/
  115. proto.serializeToArray = function serializeToArray(array, explain) {
  116. var doc = {};
  117. if (explain) { // always one obj for combined $sort + $limit
  118. doc.sortKey = this.serializeSortKey(explain);
  119. doc.mergePresorted = this._mergePresorted;
  120. doc.limit = this.limitSrc ? this.limitSrc.getLimit() : undefined;
  121. array.push(doc);
  122. } else { // one Value for $sort and maybe a Value for $limit
  123. var inner = {};
  124. inner = this.serializeSortKey(explain);
  125. if (this._mergePresorted)
  126. inner.$mergePresorted = true;
  127. doc[this.getSourceName()] = inner;
  128. array.push(doc);
  129. if (this.limitSrc)
  130. this.limitSrc.serializeToArray(array);
  131. }
  132. };
  133. proto.serialize = function serialize(explain) {
  134. throw new Error("should call serializeToArray instead");
  135. };
  136. /**
  137. * Add sort key field.
  138. *
  139. * Adds a sort key field to the key being built up. A concatenated
  140. * key is built up by calling this repeatedly.
  141. *
  142. * @param {String} fieldPath the field path to the key component
  143. * @param {bool} ascending if true, use the key for an ascending sort, otherwise, use it for descending
  144. **/
  145. proto.addKey = function addKey(fieldPath, ascending) {
  146. var idGenerator = new VariablesIdGenerator(),
  147. vps = new VariablesParseState(idGenerator);
  148. var pathExpr = FieldPathExpression.parse("$$ROOT." + fieldPath, vps);
  149. this.vSortKey.push(pathExpr);
  150. if (ascending === true || ascending === false) {
  151. this.vAscending.push(ascending);
  152. } else {
  153. // This doesn't appear to be an error in real mongo?
  154. throw new Error("ascending must be true or false");
  155. }
  156. };
  157. proto.makeSortOptions = function makeSortOptions(){
  158. /* make sure we've got a sort key */
  159. if (!this.vSortKey.length) throw new Error("no sort key for " + this.getSourceName());
  160. // Skipping memory checks
  161. var opts;
  162. if ( this.limitSrc)
  163. opts.limit = limitSrc.getLimt();
  164. return opts;
  165. }
  166. proto.populate = function populate(callback) {
  167. if ( this._mergePresorted ){
  168. // Skipping stuff about mergeCursors and commandShards
  169. if ( this.source instanceof MergeCursorDocumentSouce ){
  170. populateFromCursors( this.source);
  171. } else if ( this.source instanceof CommandShardsDocumentSource){
  172. populateFromJsonArrays(this.source);
  173. } else {
  174. throw new Error("code 17196; the " + klass.sortName + "can only mergePresorted from MergeCursors and CommandShards");
  175. }
  176. } else {
  177. /* pull everything from the underlying source */
  178. var self = this,
  179. next;
  180. async.doWhilst(
  181. function (cb) {
  182. self.source.getNext(function(err, doc) {
  183. next = doc;
  184. // Don't add EOF; it doesn't sort well.
  185. if (doc !== null)
  186. self.documents.push(doc);
  187. return cb();
  188. });
  189. },
  190. function() {
  191. return next !== null;
  192. },
  193. function(err) {
  194. /* sort the list */
  195. self.documents.sort(SortDocumentSource.prototype.compare.bind(self));
  196. /* start the sort iterator */
  197. self.docIterator = 0;
  198. self.populated = true;
  199. //self._output.reset(true);
  200. return callback();
  201. }
  202. );
  203. }
  204. this.populated = true;
  205. };
  206. klass.IteratorFromCursor = (function(){
  207. /**
  208. * Helper class to unwind arrays within a series of documents.
  209. * @param {String} unwindPath is the field path to the array to unwind.
  210. **/
  211. var klass = function IteratorFromCursor(sorter, cursor){
  212. this._sorter = new SortDocumentSource(sorter);
  213. //this._cursor = new DBClientCursor(cursor);
  214. }, base = Object, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
  215. proto.more = function more() {
  216. return this._cursor.more();
  217. };
  218. proto.next = function next() {
  219. var doc = DocumentSourceMergeCursors(this._cursor);
  220. // TODO: make_pair for return
  221. //return {this._sorter.extractKey(doc): doc};
  222. };
  223. return klass;
  224. })();
  225. proto.populateFromCursors = function populateFromCursors(cursors){
  226. for (var i = 0; i < cursors.length; i++) {
  227. // TODO Create class
  228. //this.iterators.push(boost::make_shared<IteratorFromBsonArray>(this, cursors[i]));
  229. }
  230. this._output.reset( ); // TODO: MySorter::Iterator::merge(iterators, makeSortOptions(), Comparator(*this))
  231. }
  232. klass.IteratorFromBsonArray = (function(){
  233. /**
  234. * Helper class to unwind arrays within a series of documents.
  235. * @param {String} unwindPath is the field path to the array to unwind.
  236. **/
  237. var klass = function IteratorFromBsonArray(sorter, array){
  238. this._sorter = new SortDocumentSource(sorter);
  239. //this._iterator = new BSONObjIterator(array);
  240. }, base = Object, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
  241. proto.next = function next() {
  242. var doc = DocumentSourceMergeCursors(this._cursor);
  243. // TODO: make_pair for return
  244. //return {this._sorter.extractKey(doc): doc};
  245. };
  246. proto.more = function more() {
  247. return this._cursor.more();
  248. };
  249. return klass;
  250. })();
  251. proto.populateFromBsonArrays = function populateFromBsonArrays(arrays){
  252. for (var i = 0; i < arrays.lenth; i++) {
  253. // TODO Create class
  254. //this.iterators.push(boost::make_shared<IteratorFromBsonArray>(this, arrays[i]));
  255. }
  256. this._output.reset( ); // TODO: MySorter::Iterator::merge(iterators, makeSortOptions(), Comparator(*this))
  257. }
  258. /**
  259. * Extract the key
  260. *
  261. * @param {d} document
  262. * @returns {keys} extracted key
  263. **/
  264. proto.extractKey = function extractKey(d){
  265. var vars = new Variables(0,d);
  266. if ( this.vSortKey.length == 1)
  267. return this.vSortKey[0].evaluate(vars);
  268. var keys;
  269. for (var i=0; i < this.vSortKey.length; i++) {
  270. keys.push(this.vSortKey[i].evaluate(vars));
  271. }
  272. return keys;
  273. }
  274. /**
  275. * Compare two documents according to the specified sort key.
  276. *
  277. * @param {Object} lhs the left side doc
  278. * @param {Object} rhs the right side doc
  279. * @returns {Number} a number less than, equal to, or greater than zero, indicating pL < pR, pL == pR, or pL > pR, respectively
  280. **/
  281. proto.compare = function compare(lhs,rhs) {
  282. /*
  283. populate() already checked that there is a non-empty sort key,
  284. so we shouldn't have to worry about that here.
  285. However, the tricky part is what to do is none of the sort keys are
  286. present. In this case, consider the document less.
  287. */
  288. for(var i = 0, n = this.vSortKey.length; i < n; ++i) {
  289. var pathExpr = FieldPathExpression.create(this.vSortKey[i].getFieldPath(false).fieldNames.slice(1).join('.'));
  290. /* evaluate the sort keys */
  291. var left = pathExpr.evaluate(lhs), right = pathExpr.evaluate(rhs);
  292. /*
  293. Compare the two values; if they differ, return. If they are
  294. the same, move on to the next key.
  295. */
  296. var cmp = Value.compare(left, right);
  297. if (cmp) {
  298. /* if necessary, adjust the return value by the key ordering */
  299. if (!this.vAscending[i])
  300. cmp = -cmp;
  301. return cmp;
  302. }
  303. }
  304. /**
  305. * If we got here, everything matched (or didn't exist), so we'll
  306. * consider the documents equal for purposes of this sort
  307. **/
  308. return 0;
  309. };
  310. /**
  311. * Write out an object whose contents are the sort key.
  312. *
  313. * @param {bool} explain
  314. * @return {Object} key
  315. **/
  316. proto.serializeSortKey = function serializeSortKey(explain) {
  317. var keyObj = {};
  318. // add the key fields
  319. var n = this.vSortKey.length;
  320. for (var i = 0; i < n; i++) {
  321. if ( this.vSortKey[i] instanceof FieldPathExpression ) {
  322. var fieldPath = this.vSortKey[i].getFieldPath(false).fieldNames.slice(1).join('.');
  323. // append a named integer based on the sort order
  324. keyObj[fieldPath] = this.vAscending[i] ? 1 : -1;
  325. } else {
  326. // other expressions use a made-up field name
  327. keyObj[{"$computed":i}] = this.vSortKey[i].serialize(explain);
  328. }
  329. }
  330. return keyObj;
  331. };
  332. /**
  333. * Creates a new SortDocumentSource from Json
  334. *
  335. * @param {Object} elem
  336. * @param {Object} expCtx
  337. *
  338. **/
  339. klass.createFromJson = function createFromJson(elem, expCtx) {
  340. if (typeof elem !== "object") throw new Error("code 15973; the " + klass.sortName + " key specification must be an object");
  341. return this.create(expCtx, elem);
  342. };
  343. /**
  344. * Creates a new SortDocumentSource
  345. *
  346. * @param {Object} expCtx
  347. * @param {object} sortorder
  348. * @param {int} limit
  349. *
  350. **/
  351. klass.create = function create(expCtx, sortOrder, limit) {
  352. var Sort = proto.getFactory(),
  353. nextSort = new Sort(expCtx);
  354. /* check for then iterate over the sort object */
  355. var sortKeys = 0;
  356. for(var keyField in sortOrder) {
  357. var fieldName = keyField.fieldName;
  358. if ( fieldName === "$mergePresorted" ){
  359. Sort._mergePresorted = true;
  360. continue;
  361. }
  362. if ( keyField instanceof Object) {
  363. // this restriction is due to needing to figure out sort direction
  364. throw new Error("code 17312; " + klass.sortName + "the only expression supported by $sort right now is {$meta: 'textScore'}");
  365. nextSort.vSortKey.push(new ExpressionMeta());
  366. nextSort.vAscending.push(false); // best scoring documents first
  367. continue;
  368. }
  369. if (typeof sortOrder[keyField] !== "number") throw new Error("code 15974; " + klass.sortName + "$sort key ordering must be specified using a number or {$meta: 'text'}");
  370. // RedBeard0531 can the thanked.
  371. var sortDirection = 0;
  372. sortDirection = sortOrder[keyField];
  373. if ((sortDirection != 1) && (sortDirection !== -1)) throw new Error("code 15975; " + klass.sortName + " $sort key ordering must be 1 (for ascending) or -1 (for descending)");
  374. nextSort.addKey(keyField, (sortDirection > 0));
  375. ++sortKeys;
  376. }
  377. if (sortKeys <= 0) throw new Error("code 15976; " + klass.sortName + " must have at least one sort key");
  378. if ( limit > 0) {
  379. var coalesced = nextSort.coalesce( create(expCtx, limit));
  380. // should always coalesce
  381. }
  382. return nextSort;
  383. };
  384. // SplittableDocumentSource implementation.
  385. klass.isSplittableDocumentSource = true;
  386. /**
  387. * Get dependencies.
  388. *
  389. * @param deps
  390. * @returns {number}
  391. */
  392. proto.getDependencies = function getDependencies(deps) {
  393. for(var i = 0; i < this.vSortKey.length; i++) {
  394. this.vSortKey[i].addDependencies(deps);
  395. }
  396. return DocumentSource.GetDepsReturn.SEE_NEXT;
  397. };
  398. /**
  399. * Get shard source.
  400. *
  401. * @returns {this}
  402. */
  403. proto.getShardSource = function getShardSource() {
  404. if (this._mergePresorted) throw new Error("getShardSource", + klass.sortName + " should not be merging presorted");
  405. return this;
  406. };
  407. /**
  408. * Get merge source.
  409. *
  410. * @returns {SortDocumentSource}
  411. */
  412. proto.getMergeSource = function getMergeSource() {
  413. if ( this._mergingPresorted) throw new Error("getMergeSource", + klass.sortName + " should not be merging presorted");
  414. var other = new SortDocumentSource();
  415. other.vAscending = this.vAscending;
  416. other.vSortKey = this.vSortKey;
  417. other.limitSrc = this.limitSrc;
  418. other._mergingPresorted = true;
  419. return other;
  420. };