Pipeline_test.js 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. "use strict";
  2. var assert = require("assert"),
  3. Pipeline = require("../../../lib/pipeline/Pipeline"),
  4. FieldPath = require("../../../lib/pipeline/FieldPath"),
  5. DocumentSource = require('../../../lib/pipeline/documentSources/DocumentSource'),
  6. CursorDocumentSource = require("../../../lib/pipeline/documentSources/CursorDocumentSource"),
  7. ProjectDocumentSource = require("../../../lib/pipeline/documentSources/ProjectDocumentSource"),
  8. ArrayRunner = require("../../../lib/query/ArrayRunner");
  9. var addSource = function addSource(match, data) {
  10. var cds = new CursorDocumentSource(null, new ArrayRunner(data), null);
  11. match.setSource(cds);
  12. };
  13. var shardedTest = function(inputPipeString, expectedMergePipeString, expectedShardPipeString) {
  14. inputPipeString = '{"pipeline": ' + inputPipeString + '}';
  15. expectedMergePipeString = '{"pipeline": ' + expectedMergePipeString + '}';
  16. expectedShardPipeString = '{"pipeline": ' + expectedShardPipeString + '}';
  17. var inputPipe = JSON.parse(inputPipeString),
  18. expectedMergePipe = JSON.parse(expectedMergePipeString),
  19. expectedShardPipe = JSON.parse(expectedShardPipeString);
  20. var mergePipe = Pipeline.parseCommand(inputPipe, {});
  21. assert.notEqual(mergePipe, null);
  22. var shardPipe = mergePipe.splitForSharded();
  23. assert.notEqual(shardPipe, null);
  24. assert.deepEqual(shardPipe.serialize().pipeline, expectedShardPipe.pipeline);
  25. assert.deepEqual(mergePipe.serialize().pipeline, expectedMergePipe.pipeline);
  26. };
  27. module.exports = {
  28. "Pipeline": {
  29. before: function () {
  30. Pipeline.stageDesc.$test = (function () {
  31. var klass = function TestDocumentSource(options, ctx) {
  32. base.call(this, ctx);
  33. this.shouldCoalesce = options.coalesce;
  34. this.coalesceWasCalled = false;
  35. this.optimizeWasCalled = false;
  36. this.works = options.works === false ? false : true; // don't judge
  37. this.current = 5;
  38. }, TestDocumentSource = klass, base = DocumentSource, proto = klass.prototype = Object.create(base.prototype, {constructor: {value: klass}});
  39. proto.coalesce = function () {
  40. this.coalesceWasCalled = true;
  41. var c = this.shouldCoalesce;//only coalesce with the first thing we find
  42. this.shouldCoalesce = false;
  43. return c;
  44. };
  45. proto.optimize = function () {
  46. this.optimizeWasCalled = true;
  47. };
  48. proto.getNext = function(callback){
  49. var answer = this.current > 0 ? {val:this.current--} : null,
  50. err = null;
  51. if (!this.works)
  52. err = new Error("doesn't work"), answer = undefined;
  53. if(callback) {
  54. return callback(err, answer);
  55. } else {
  56. return answer || err;
  57. }
  58. };
  59. klass.createFromJson = function (options, ctx) {
  60. return new TestDocumentSource(options, ctx);
  61. };
  62. return klass;
  63. })().createFromJson;
  64. },
  65. "parseCommand": {
  66. "should throw Error if given non-objects in the array": function () {
  67. assert.throws(function () {
  68. Pipeline.parseCommand({pipeline: [5]});
  69. });
  70. },
  71. "should throw Error if given objects with more / less than one field": function () {
  72. assert.throws(function () {
  73. Pipeline.parseCommand({pipeline: [
  74. {}
  75. ]});
  76. Pipeline.parseCommand({pipeline: [
  77. {a: 1, b: 2}
  78. ]});
  79. });
  80. },
  81. "should throw Error on unknown document sources": function () {
  82. assert.throws(function () {
  83. Pipeline.parseCommand({pipeline: [
  84. {$foo: "$sdfdf"}
  85. ]});
  86. });
  87. },
  88. "should swap $match and $sort if the $match immediately follows the $sort": function () {
  89. var p = Pipeline.parseCommand({pipeline: [
  90. {$sort: {"xyz": 1}},
  91. {$match: {}}
  92. ]});
  93. assert.equal(p.sources[0].constructor.matchName, "$match");
  94. assert.equal(p.sources[1].constructor.sortName, "$sort");
  95. },
  96. "should attempt to coalesce all sources": function () {
  97. var p = Pipeline.parseCommand({pipeline: [
  98. {$test: {coalesce: false}},
  99. {$test: {coalesce: true}},
  100. {$test: {coalesce: false}},
  101. {$test: {coalesce: false}}
  102. ]});
  103. assert.equal(p.sources.length, 3);
  104. p.sources.slice(0, -1).forEach(function (source) {
  105. assert.equal(source.coalesceWasCalled, true);
  106. });
  107. assert.equal(p.sources[p.sources.length -1].coalesceWasCalled, false);
  108. },
  109. "should optimize all sources": function () {
  110. var p = Pipeline.parseCommand({pipeline: [
  111. {$test: {coalesce: false}},
  112. {$test: {coalesce: false}}
  113. ]});
  114. p.sources.forEach(function (source) {
  115. assert.equal(source.optimizeWasCalled, true);
  116. });
  117. }
  118. },
  119. "sharded": {
  120. "should handle empty pipeline for sharded": function () {
  121. var inputPipe = "[]",
  122. expectedMergePipe = "[]",
  123. expectedShardPipe = "[]";
  124. shardedTest(inputPipe, expectedShardPipe, expectedMergePipe);
  125. },
  126. "should handle one unwind": function () {
  127. var inputPipe = '[{"$unwind":"$a"}]',
  128. expectedShardPipe = "[]",
  129. expectedMergePipe = '[{"$unwind":"$a"}]';
  130. shardedTest(inputPipe, expectedMergePipe, expectedShardPipe);
  131. },
  132. "should handle two unwinds": function () {
  133. var inputPipe = '[{"$unwind":"$a"}, {"$unwind":"$b"}]',
  134. expectedShardPipe = "[]",
  135. expectedMergePipe = '[{"$unwind": "$a"}, {"$unwind": "$b"}]';
  136. shardedTest(inputPipe, expectedMergePipe, expectedShardPipe);
  137. },
  138. "should handle unwind not final": function () {
  139. var inputPipe = '[{"$unwind": "$a"}, {"$match": {"a":1}}]',
  140. expectedShardPipe = '[]',
  141. expectedMergePipe = '[{"$unwind": "$a"}, {"$match": {"a":1}}]';
  142. shardedTest(inputPipe, expectedShardPipe, expectedMergePipe);
  143. },
  144. "should handle unwind with other": function () {
  145. var inputPipe = '[{"$match": {"a":1}}, {"$unwind": "$a"}]',
  146. expectedShardPipe = '[{"$match":{"a":1}}]',
  147. expectedMergePipe = '[{"$unwind":"$a"}]';
  148. shardedTest(inputPipe,expectedMergePipe, expectedShardPipe);
  149. }
  150. },
  151. "#stitch": {
  152. "should set the parent source for all sources in the pipeline except the first one": function () {
  153. var p = Pipeline.parseCommand({pipeline:[{$test:{coalesce:false}}, {$test:{coalesce:false}}, {$test:{coalesce:false}}]});
  154. p.stitch();
  155. assert.equal(p.sources[1].source, p.sources[0]);
  156. }
  157. },
  158. "#run": {
  159. "should iterate through sources and return resultant array": function (done) {
  160. var p = Pipeline.parseCommand({pipeline:[{$test:{coalesce:false}}, {$test:{coalesce:false}}, {$test:{coalesce:false}}]}),
  161. results = [];
  162. p.run(function(err, doc) {
  163. if (err) throw err;
  164. if (!doc){
  165. assert.deepEqual(results, [ { val: 5 }, { val: 4 }, { val: 3 }, { val: 2 }, { val: 1 } ]);
  166. done();
  167. } else {
  168. results.push(doc);
  169. }
  170. });
  171. },
  172. "should handle sources that return errors": function (done) {
  173. var p = Pipeline.parseCommand({pipeline:[{$test:{works:false}}]}),
  174. results = [];
  175. p.run(function(err, doc) {
  176. assert(err);
  177. done();
  178. });
  179. }
  180. },
  181. "#addInitialSource": {
  182. "should put the given source at the beginning of the pipeline": function () {
  183. var p = Pipeline.parseCommand({pipeline:[{$test:{coalesce:false}}, {$test:{coalesce:false}}, {$test:{coalesce:false}}]}),
  184. initialSource = Pipeline.stageDesc.$test({coalesce:false});
  185. p.addInitialSource(initialSource);
  186. assert.equal(initialSource, p.sources[0]);
  187. },
  188. "should be able to addInitialSource then stitch": function () {
  189. var p = Pipeline.parseCommand({pipeline:[{$test:{coalesce:false}}, {$test:{coalesce:false}}, {$test:{coalesce:false}}]}),
  190. initialSource = Pipeline.stageDesc.$test({coalesce:false});
  191. p.addInitialSource(initialSource);
  192. p.stitch();
  193. assert.equal(p.sources[1].source, p.sources[0]);
  194. }
  195. },
  196. "#getDependencies()": {
  197. "should properly detect dependencies": function testGetDependencies() {
  198. var p = Pipeline.parseCommand({pipeline: [
  199. {$sort: {"xyz": 1}},
  200. {$project: {"a":"$xyz"}}
  201. ]});
  202. var depsTracker = p.getDependencies();
  203. assert.equal(Object.keys(depsTracker.fields).length, 2);
  204. }
  205. }
  206. }
  207. };
  208. if (!module.parent)(new(require("mocha"))()).ui("exports").reporter("spec").addFile(__filename).grep(process.env.MOCHA_GREP || '').run(process.exit);