Pipeline_test.js 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. "use strict";
  2. if (!module.parent) return require.cache[__filename] = 0, (new(require("mocha"))()).addFile(__filename).ui("exports").run(process.exit);
  3. var assert = require("assert"),
  4. Pipeline = require("../../../lib/pipeline/Pipeline"),
  5. DocumentSource = require("../../../lib/pipeline/documentSources/DocumentSource");
  6. function shardedTest(inputPipeString, expectedMergePipeString, expectedShardPipeString) {
  7. inputPipeString = "{\"pipeline\": " + inputPipeString + "}";
  8. expectedMergePipeString = "{\"pipeline\": " + expectedMergePipeString + "}";
  9. expectedShardPipeString = "{\"pipeline\": " + expectedShardPipeString + "}";
  10. var inputPipe = JSON.parse(inputPipeString),
  11. expectedMergePipe = JSON.parse(expectedMergePipeString),
  12. expectedShardPipe = JSON.parse(expectedShardPipeString);
  13. var mergePipe = Pipeline.parseCommand(inputPipe, {});
  14. assert.notEqual(mergePipe, null);
  15. var shardPipe = mergePipe.splitForSharded();
  16. assert.notEqual(shardPipe, null);
  17. assert.deepEqual(shardPipe.serialize().pipeline, expectedShardPipe.pipeline);
  18. assert.deepEqual(mergePipe.serialize().pipeline, expectedMergePipe.pipeline);
  19. }
  20. module.exports = {
  21. "Pipeline": {
  22. before: function () {
  23. Pipeline.stageDesc.$test = (function () {
  24. var klass = function TestDocumentSource(options, ctx) {
  25. base.call(this, ctx);
  26. this.shouldCoalesce = options.coalesce;
  27. this.coalesceWasCalled = false;
  28. this.optimizeWasCalled = false;
  29. this.works = options.works === false ? false : true; // don't judge
  30. this.current = 5;
  31. }, TestDocumentSource = klass, base = DocumentSource, proto = klass.prototype = Object.create(base.prototype, {constructor: {value: klass}}); //jshint ignore:line
  32. proto.coalesce = function () {
  33. this.coalesceWasCalled = true;
  34. var c = this.shouldCoalesce;//only coalesce with the first thing we find
  35. this.shouldCoalesce = false;
  36. return c;
  37. };
  38. proto.optimize = function () {
  39. this.optimizeWasCalled = true;
  40. };
  41. proto.getNext = function(callback){
  42. var answer = this.current > 0 ? {val:this.current--} : null,
  43. err = null;
  44. if (!this.works)
  45. err = new Error("doesn't work"), answer = undefined;
  46. if(callback) {
  47. return callback(err, answer);
  48. } else {
  49. return answer || err;
  50. }
  51. };
  52. klass.createFromJson = function (options, ctx) {
  53. return new TestDocumentSource(options, ctx);
  54. };
  55. return klass;
  56. })().createFromJson;
  57. },
  58. "parseCommand": {
  59. "should throw Error if given non-objects in the array": function () {
  60. assert.throws(function () {
  61. Pipeline.parseCommand({pipeline: [5]});
  62. });
  63. },
  64. "should throw Error if given objects with more / less than one field": function () {
  65. assert.throws(function () {
  66. Pipeline.parseCommand({pipeline: [
  67. {}
  68. ]});
  69. Pipeline.parseCommand({pipeline: [
  70. {a: 1, b: 2}
  71. ]});
  72. });
  73. },
  74. "should throw Error on unknown document sources": function () {
  75. assert.throws(function () {
  76. Pipeline.parseCommand({pipeline: [
  77. {$foo: "$sdfdf"}
  78. ]});
  79. });
  80. },
  81. "should swap $match and $sort if the $match immediately follows the $sort": function () {
  82. var p = Pipeline.parseCommand({pipeline: [
  83. {$sort: {"xyz": 1}},
  84. {$match: {}}
  85. ]});
  86. assert.equal(p.sources[0].constructor.matchName, "$match");
  87. assert.equal(p.sources[1].constructor.sortName, "$sort");
  88. },
  89. "should attempt to coalesce all sources": function () {
  90. var p = Pipeline.parseCommand({pipeline: [
  91. {$test: {coalesce: false}},
  92. {$test: {coalesce: true}},
  93. {$test: {coalesce: false}},
  94. {$test: {coalesce: false}}
  95. ]});
  96. assert.equal(p.sources.length, 3);
  97. p.sources.slice(0, -1).forEach(function (source) {
  98. assert.equal(source.coalesceWasCalled, true);
  99. });
  100. assert.equal(p.sources[p.sources.length -1].coalesceWasCalled, false);
  101. },
  102. "should optimize all sources": function () {
  103. var p = Pipeline.parseCommand({pipeline: [
  104. {$test: {coalesce: false}},
  105. {$test: {coalesce: false}}
  106. ]});
  107. p.sources.forEach(function (source) {
  108. assert.equal(source.optimizeWasCalled, true);
  109. });
  110. }
  111. },
  112. "sharded": {
  113. "should handle empty pipeline for sharded": function () {
  114. var inputPipe = "[]",
  115. expectedMergePipe = "[]",
  116. expectedShardPipe = "[]";
  117. shardedTest(inputPipe, expectedShardPipe, expectedMergePipe);
  118. },
  119. "should handle one unwind": function () {
  120. var inputPipe = JSON.stringify([{$unwind:"$a"}]),
  121. expectedShardPipe = "[]",
  122. expectedMergePipe = JSON.stringify([{$unwind:"$a"}]);
  123. shardedTest(inputPipe, expectedMergePipe, expectedShardPipe);
  124. },
  125. "should handle two unwinds": function () {
  126. var inputPipe = JSON.stringify([{$unwind:"$a"},{$unwind:"$b"}]),
  127. expectedShardPipe = "[]",
  128. expectedMergePipe = JSON.stringify([{$unwind:"$a"},{$unwind:"$b"}]);
  129. shardedTest(inputPipe, expectedMergePipe, expectedShardPipe);
  130. },
  131. "should handle unwind not final": function () {
  132. var inputPipe = JSON.stringify([{$unwind:"$a"},{$match:{"a":1}}]),
  133. expectedShardPipe = "[]",
  134. expectedMergePipe = JSON.stringify([{$unwind:"$a"},{$match:{"a":1}}]);
  135. shardedTest(inputPipe, expectedShardPipe, expectedMergePipe);
  136. },
  137. "should handle unwind with other": function () {
  138. var inputPipe = JSON.stringify([{$match:{"a":1}},{$unwind:"$a"}]),
  139. expectedShardPipe = JSON.stringify([{$match:{"a":1}}]),
  140. expectedMergePipe = JSON.stringify([{$unwind:"$a"}]);
  141. shardedTest(inputPipe,expectedMergePipe, expectedShardPipe);
  142. }
  143. },
  144. "#stitch": {
  145. "should set the parent source for all sources in the pipeline except the first one": function () {
  146. var p = Pipeline.parseCommand({pipeline:[{$test:{coalesce:false}}, {$test:{coalesce:false}}, {$test:{coalesce:false}}]});
  147. p.stitch();
  148. assert.equal(p.sources[1].source, p.sources[0]);
  149. }
  150. },
  151. "#run": {
  152. "should iterate through sources and return resultant array": function (done) {
  153. var p = Pipeline.parseCommand({pipeline:[{$test:{coalesce:false}}, {$test:{coalesce:false}}, {$test:{coalesce:false}}]}),
  154. results = [];
  155. p.run(function(err, doc) {
  156. if (err) throw err;
  157. if (!doc){
  158. assert.deepEqual(results, [ { val: 5 }, { val: 4 }, { val: 3 }, { val: 2 }, { val: 1 } ]);
  159. done();
  160. } else {
  161. results.push(doc);
  162. }
  163. });
  164. },
  165. "should handle sources that return errors": function (done) {
  166. var p = Pipeline.parseCommand({pipeline:[{$test:{works:false}}]});
  167. p.run(function(err, doc) {
  168. assert(err);
  169. done();
  170. });
  171. }
  172. },
  173. "#addInitialSource": {
  174. "should put the given source at the beginning of the pipeline": function () {
  175. var p = Pipeline.parseCommand({pipeline:[{$test:{coalesce:false}}, {$test:{coalesce:false}}, {$test:{coalesce:false}}]}),
  176. initialSource = Pipeline.stageDesc.$test({coalesce:false});
  177. p.addInitialSource(initialSource);
  178. assert.equal(initialSource, p.sources[0]);
  179. },
  180. "should be able to addInitialSource then stitch": function () {
  181. var p = Pipeline.parseCommand({pipeline:[{$test:{coalesce:false}}, {$test:{coalesce:false}}, {$test:{coalesce:false}}]}),
  182. initialSource = Pipeline.stageDesc.$test({coalesce:false});
  183. p.addInitialSource(initialSource);
  184. p.stitch();
  185. assert.equal(p.sources[1].source, p.sources[0]);
  186. }
  187. },
  188. "#getDependencies()": {
  189. "should properly detect dependencies": function testGetDependencies() {
  190. var p = Pipeline.parseCommand({pipeline: [
  191. {$sort: {"xyz": 1}},
  192. {$project: {"a":"$xyz"}}
  193. ]});
  194. var depsTracker = p.getDependencies();
  195. assert.equal(Object.keys(depsTracker.fields).length, 2);
  196. }
  197. }
  198. }
  199. };