SortDocumentSource.js 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430
  1. "use strict";
  2. var async = require("async"),
  3. DocumentSource = require("./DocumentSource"),
  4. LimitDocumentSource = require("./LimitDocumentSource");
  5. /**
  6. * A document source sorter
  7. *
  8. * //NOTE: DEVIATION FROM THE MONGO: We don't have shards, this inherits from DocumentSource, instead of SplittableDocumentSource
  9. *
  10. * @class SortDocumentSource
  11. * @namespace mungedb-aggregate.pipeline.documentSources
  12. * @module mungedb-aggregate
  13. * @constructor
  14. * @param [ctx] {ExpressionContext}
  15. **/
  16. var SortDocumentSource = module.exports = function SortDocumentSource(ctx){
  17. if (arguments.length > 1) throw new Error("up to one arg expected");
  18. base.call(this, ctx);
  19. /*
  20. * Before returning anything, this source must fetch everything from
  21. * the underlying source and group it. populate() is used to do that
  22. * on the first call to any method on this source. The populated
  23. * boolean indicates that this has been done
  24. **/
  25. this.populated = false;
  26. this.docIterator = null; // a number tracking our position in the documents array
  27. this.documents = []; // an array of documents
  28. this.vSortKey = [];
  29. this.vAscending = [];
  30. }, klass = SortDocumentSource, base = require("./DocumentSource"), proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}}); //jshint ignore:line
  31. // DEPENDENCIES
  32. var FieldPathExpression = require("../expressions/FieldPathExpression"),
  33. VariablesIdGenerator = require("../expressions/VariablesIdGenerator"),
  34. VariablesParseState = require("../expressions/VariablesParseState"),
  35. Variables = require("../expressions/Variables"),
  36. Value = require("../Value");
  37. klass.sortName = "$sort";
  38. proto.getSourceName = function getSourceName(){
  39. return klass.sortName;
  40. };
  41. proto.getFactory = function getFactory(){
  42. return klass; // using the ctor rather than a separate .create() method
  43. };
  44. proto.dispose = function dispose() {
  45. this.docIterator = 0;
  46. this.documents = [];
  47. this._output = undefined;
  48. this.source.dispose();
  49. };
  50. proto.getLimit = function getLimit() {
  51. return this.limitSrc ? this.limitSrc.getLimit() : -1;
  52. };
  53. proto.coalesce = function coalesce(nextSource) {
  54. if (!this.limitSrc) {
  55. if (nextSource instanceof LimitDocumentSource) {
  56. this.limitSrc = nextSource;
  57. return nextSource;
  58. }
  59. return false;
  60. } else {
  61. return this.limitSrc.coalesce(nextSource);
  62. }
  63. };
  64. proto.getNext = function getNext() {
  65. if (this.expCtx && this.expCtx.checkForInterrupt) this.expCtx.checkForInterrupt();
  66. if (!this.populated)
  67. this.populate();
  68. var output = this.documents[this.docIterator++];
  69. if (!output)
  70. return null;
  71. return output;
  72. };
  73. /**
  74. * Serialize to Array.
  75. *
  76. * @param {Array} array
  77. * @param {bool} explain
  78. **/
  79. proto.serializeToArray = function serializeToArray(array, explain) {
  80. var doc = {};
  81. if (explain) { // always one obj for combined $sort + $limit
  82. doc.sortKey = this.serializeSortKey(explain);
  83. doc.mergePresorted = this._mergePresorted;
  84. doc.limit = this.limitSrc ? this.limitSrc.getLimit() : undefined;
  85. array.push(doc);
  86. } else { // one Value for $sort and maybe a Value for $limit
  87. var inner = {};
  88. inner = this.serializeSortKey(explain);
  89. if (this._mergePresorted)
  90. inner.$mergePresorted = true;
  91. doc[this.getSourceName()] = inner;
  92. array.push(doc);
  93. if (this.limitSrc)
  94. this.limitSrc.serializeToArray(array);
  95. }
  96. };
  97. proto.serialize = function serialize(explain) {
  98. throw new Error("should call serializeToArray instead");
  99. };
  100. /**
  101. * Add sort key field.
  102. *
  103. * Adds a sort key field to the key being built up. A concatenated
  104. * key is built up by calling this repeatedly.
  105. *
  106. * @param {String} fieldPath the field path to the key component
  107. * @param {bool} ascending if true, use the key for an ascending sort, otherwise, use it for descending
  108. **/
  109. proto.addKey = function addKey(fieldPath, ascending) {
  110. var idGenerator = new VariablesIdGenerator(),
  111. vps = new VariablesParseState(idGenerator);
  112. var pathExpr = FieldPathExpression.parse("$$ROOT." + fieldPath, vps);
  113. this.vSortKey.push(pathExpr);
  114. if (ascending === true || ascending === false) {
  115. this.vAscending.push(ascending);
  116. } else {
  117. // This doesn't appear to be an error in real mongo?
  118. throw new Error("ascending must be true or false");
  119. }
  120. };
  121. proto.makeSortOptions = function makeSortOptions(){
  122. /* make sure we've got a sort key */
  123. if (!this.vSortKey.length) throw new Error("no sort key for " + this.getSourceName());
  124. // Skipping memory checks
  125. var opts;
  126. if ( this.limitSrc)
  127. opts.limit = this.limitSrc.getLimt();
  128. return opts;
  129. };
  130. proto.populate = function populate() {
  131. if (this._mergePresorted) {
  132. //NOTE: DEVIATION: skipping stuff about mergeCursors and commandShards
  133. throw new Error("Merge presorted not implemented.");
  134. } else {
  135. var doc;
  136. while((doc = this.source.getNext())) {
  137. // Don't add EOF; it doesn't sort well.
  138. if (doc !== null)
  139. this.documents.push(doc);
  140. }
  141. // sort the list
  142. this.documents.sort(SortDocumentSource.prototype.compare.bind(this));
  143. }
  144. this.docIterator = 0;
  145. this.populated = true;
  146. };
  147. klass.IteratorFromCursor = (function(){
  148. /**
  149. * Helper class to unwind arrays within a series of documents.
  150. * @param {String} unwindPath is the field path to the array to unwind.
  151. **/
  152. var klass = function IteratorFromCursor(sorter, cursor){
  153. this._sorter = new SortDocumentSource(sorter);
  154. //this._cursor = new DBClientCursor(cursor);
  155. }, base = Object, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
  156. proto.more = function more() {
  157. return this._cursor.more();
  158. };
  159. proto.next = function next() {
  160. // var doc = new DocumentSourceMergeCursors(this._cursor);
  161. // TODO: make_pair for return
  162. //return {this._sorter.extractKey(doc): doc};
  163. };
  164. return klass;
  165. })();
  166. proto.populateFromCursors = function populateFromCursors(cursors){
  167. //TODO: umm, probably broken...
  168. // for (var i = 0; i < cursors.length; i++) {
  169. // // TODO Create class
  170. // //this.iterators.push(boost::make_shared<IteratorFromBsonArray>(this, cursors[i]));
  171. // }
  172. //this._output.reset( ); // TODO: MySorter::Iterator::merge(iterators, makeSortOptions(), Comparator(*this))
  173. throw new Error("this implementation is broken"); //TODO: fix?
  174. };
  175. klass.IteratorFromBsonArray = (function(){
  176. /**
  177. * Helper class to unwind arrays within a series of documents.
  178. * @param {String} unwindPath is the field path to the array to unwind.
  179. **/
  180. var klass = function IteratorFromBsonArray(sorter, array){
  181. this._sorter = new SortDocumentSource(sorter);
  182. //this._iterator = new BSONObjIterator(array);
  183. }, base = Object, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
  184. proto.next = function next() {
  185. // var doc = new DocumentSourceMergeCursors(this._cursor);
  186. // TODO: make_pair for return
  187. //return {this._sorter.extractKey(doc): doc};
  188. };
  189. proto.more = function more() {
  190. return this._cursor.more();
  191. };
  192. return klass;
  193. })();
  194. proto.populateFromBsonArrays = function populateFromBsonArrays(arrays){
  195. //TODO: umm, probably broken...
  196. // for (var i = 0; i < arrays.lenth; i++) {
  197. // // TODO Create class
  198. // //this.iterators.push(boost::make_shared<IteratorFromBsonArray>(this, arrays[i]));
  199. // }
  200. // this._output.reset( ); // TODO: MySorter::Iterator::merge(iterators, makeSortOptions(), Comparator(*this))
  201. throw new Error("this implementation is broken"); //TODO: fix?
  202. };
  203. /**
  204. * Extract the key
  205. *
  206. * @param {d} document
  207. * @returns {keys} extracted key
  208. **/
  209. proto.extractKey = function extractKey(d){
  210. var vars = new Variables(0,d);
  211. if ( this.vSortKey.length === 1)
  212. return this.vSortKey[0].evaluate(vars);
  213. var keys;
  214. for (var i=0; i < this.vSortKey.length; i++) {
  215. keys.push(this.vSortKey[i].evaluate(vars));
  216. }
  217. return keys;
  218. };
  219. /**
  220. * Compare two documents according to the specified sort key.
  221. *
  222. * @param {Object} lhs the left side doc
  223. * @param {Object} rhs the right side doc
  224. * @returns {Number} a number less than, equal to, or greater than zero, indicating pL < pR, pL == pR, or pL > pR, respectively
  225. **/
  226. proto.compare = function compare(lhs,rhs) {
  227. /*
  228. populate() already checked that there is a non-empty sort key,
  229. so we shouldn't have to worry about that here.
  230. However, the tricky part is what to do is none of the sort keys are
  231. present. In this case, consider the document less.
  232. */
  233. for(var i = 0, n = this.vSortKey.length; i < n; ++i) {
  234. var pathExpr = FieldPathExpression.create(this.vSortKey[i].getFieldPath(false).fieldNames.slice(1).join("."));
  235. /* evaluate the sort keys */
  236. var left = pathExpr.evaluate(lhs), right = pathExpr.evaluate(rhs);
  237. /*
  238. Compare the two values; if they differ, return. If they are
  239. the same, move on to the next key.
  240. */
  241. var cmp = Value.compare(left, right);
  242. if (cmp) {
  243. /* if necessary, adjust the return value by the key ordering */
  244. if (!this.vAscending[i])
  245. cmp = -cmp;
  246. return cmp;
  247. }
  248. }
  249. /**
  250. * If we got here, everything matched (or didn't exist), so we'll
  251. * consider the documents equal for purposes of this sort
  252. **/
  253. return 0;
  254. };
  255. /**
  256. * Write out an object whose contents are the sort key.
  257. *
  258. * @param {bool} explain
  259. * @return {Object} key
  260. **/
  261. proto.serializeSortKey = function serializeSortKey(explain) {
  262. var keyObj = {};
  263. // add the key fields
  264. var n = this.vSortKey.length;
  265. for (var i = 0; i < n; i++) {
  266. if ( this.vSortKey[i] instanceof FieldPathExpression ) {
  267. var fieldPath = this.vSortKey[i].getFieldPath(false).fieldNames.slice(1).join(".");
  268. // append a named integer based on the sort order
  269. keyObj[fieldPath] = this.vAscending[i] ? 1 : -1;
  270. } else {
  271. // other expressions use a made-up field name
  272. keyObj[{"$computed":i}] = this.vSortKey[i].serialize(explain);
  273. }
  274. }
  275. return keyObj;
  276. };
  277. /**
  278. * Creates a new SortDocumentSource from Json
  279. *
  280. * @param {Object} elem
  281. * @param {Object} expCtx
  282. *
  283. **/
  284. klass.createFromJson = function createFromJson(elem, expCtx) {
  285. if (typeof elem !== "object") throw new Error("code 15973; the " + klass.sortName + " key specification must be an object");
  286. return klass.create(expCtx, elem);
  287. };
  288. /**
  289. * Creates a new SortDocumentSource
  290. *
  291. * @param {Object} expCtx
  292. * @param {object} sortorder
  293. * @param {int} limit
  294. *
  295. **/
  296. klass.create = function create(expCtx, sortOrder, limit) {
  297. var Sort = proto.getFactory(),
  298. nextSort = new Sort(expCtx);
  299. /* check for then iterate over the sort object */
  300. var sortKeys = 0;
  301. for(var keyField in sortOrder) { //jshint ignore:line
  302. var fieldName = keyField.fieldName;
  303. if ( fieldName === "$mergePresorted" ){
  304. Sort._mergePresorted = true;
  305. continue;
  306. }
  307. if ( keyField instanceof Object) {
  308. // this restriction is due to needing to figure out sort direction
  309. throw new Error("code 17312; " + klass.sortName + "the only expression supported by $sort right now is {$meta: 'textScore'}");
  310. }
  311. if (typeof sortOrder[keyField] !== "number")
  312. throw new Error("code 15974; " + klass.sortName + "$sort key ordering must be specified using a number or {$meta: 'text'}");
  313. // RedBeard0531 can the thanked.
  314. var sortDirection = 0;
  315. sortDirection = sortOrder[keyField];
  316. if (sortDirection !== 1 && sortDirection !== -1)
  317. throw new Error("code 15975; " + klass.sortName + " $sort key ordering must be 1 (for ascending) or -1 (for descending)");
  318. nextSort.addKey(keyField, (sortDirection > 0));
  319. ++sortKeys;
  320. }
  321. if (sortKeys <= 0)
  322. throw new Error("code 15976; " + klass.sortName + " must have at least one sort key");
  323. if ( limit > 0) {
  324. var coalesced = nextSort.coalesce(LimitDocumentSource.create(expCtx, limit));
  325. if (!coalesced) throw new Error("Assertion failure"); // should always coalesce
  326. if (nextSort.getLimit() === limit) throw new Error("Assertion failure"); // should always coalesce
  327. }
  328. return nextSort;
  329. };
  330. // SplittableDocumentSource implementation.
  331. klass.isSplittableDocumentSource = true;
  332. /**
  333. * Get dependencies.
  334. *
  335. * @param deps
  336. * @returns {number}
  337. */
  338. proto.getDependencies = function getDependencies(deps) {
  339. for(var i = 0; i < this.vSortKey.length; i++) {
  340. this.vSortKey[i].addDependencies(deps);
  341. }
  342. return DocumentSource.GetDepsReturn.SEE_NEXT;
  343. };
  344. /**
  345. * Get shard source.
  346. *
  347. * @returns {this}
  348. */
  349. proto.getShardSource = function getShardSource() {
  350. if (this._mergePresorted) throw new Error("getShardSource", + klass.sortName + " should not be merging presorted");
  351. return this;
  352. };
  353. /**
  354. * Get merge source.
  355. *
  356. * @returns {SortDocumentSource}
  357. */
  358. proto.getMergeSource = function getMergeSource() {
  359. if ( this._mergingPresorted) throw new Error("getMergeSource", + klass.sortName + " should not be merging presorted");
  360. var other = new SortDocumentSource();
  361. other.vAscending = this.vAscending;
  362. other.vSortKey = this.vSortKey;
  363. other.limitSrc = this.limitSrc;
  364. other._mergingPresorted = true;
  365. return other;
  366. };