Pipeline.js 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. "use strict";
  2. var assert = require("assert"),
  3. Pipeline = require("../../../lib/pipeline/Pipeline"),
  4. FieldPath = require("../../../lib/pipeline/FieldPath"),
  5. DocumentSource = require('../../../lib/pipeline/documentSources/DocumentSource'),
  6. CursorDocumentSource = require("../../../lib/pipeline/documentSources/CursorDocumentSource"),
  7. ProjectDocumentSource = require("../../../lib/pipeline/documentSources/ProjectDocumentSource"),
  8. ArrayRunner = require("../../../lib/query/ArrayRunner");
  9. var addSource = function addSource(match, data) {
  10. var cds = new CursorDocumentSource(null, new ArrayRunner(data), null);
  11. match.setSource(cds);
  12. };
  13. var shardedTest = function(inputPipeString, expectedMergePipeString, expectedShardPipeString) {
  14. inputPipeString = '{"pipeline": ' + inputPipeString + '}';
  15. expectedMergePipeString = '{"pipeline": ' + expectedMergePipeString + '}';
  16. expectedShardPipeString = '{"pipeline": ' + expectedShardPipeString + '}';
  17. var inputPipe = JSON.parse(inputPipeString),
  18. expectedMergePipe = JSON.parse(expectedMergePipeString),
  19. expectedShardPipe = JSON.parse(expectedShardPipeString);
  20. var mergePipe = Pipeline.parseCommand(inputPipe, {});
  21. assert.notEqual(mergePipe, null);
  22. var shardPipe = mergePipe.splitForSharded();
  23. assert.notEqual(shardPipe, null);
  24. assert.deepEqual(shardPipe.serialize()["pipeline"],
  25. expectedShardPipe["pipeline"]);
  26. assert.deepEqual(mergePipe.serialize()["pipeline"],
  27. expectedMergePipe["pipeline"]);
  28. };
  29. module.exports = {
  30. "Pipeline": {
  31. before: function () {
  32. Pipeline.stageDesc.$test = (function () {
  33. var klass = function TestDocumentSource(options, ctx) {
  34. base.call(this, ctx);
  35. this.shouldCoalesce = options.coalesce;
  36. this.coalesceWasCalled = false;
  37. this.optimizeWasCalled = false;
  38. this.works = options.works === false ? false : true; // don't judge
  39. this.current = 5;
  40. }, TestDocumentSource = klass, base = DocumentSource, proto = klass.prototype = Object.create(base.prototype, {constructor: {value: klass}});
  41. proto.coalesce = function () {
  42. this.coalesceWasCalled = true;
  43. var c = this.shouldCoalesce;//only coalesce with the first thing we find
  44. this.shouldCoalesce = false;
  45. return c;
  46. };
  47. proto.optimize = function () {
  48. this.optimizeWasCalled = true;
  49. };
  50. proto.getNext = function(callback){
  51. var answer = this.current > 0 ? {val:this.current--} : null,
  52. err = null;
  53. if (!this.works)
  54. err = new Error("doesn't work"), answer = undefined;
  55. if(callback) {
  56. return callback(err, answer);
  57. } else {
  58. return answer || err;
  59. }
  60. };
  61. klass.createFromJson = function (options, ctx) {
  62. return new TestDocumentSource(options, ctx);
  63. };
  64. return klass;
  65. })().createFromJson;
  66. },
  67. "parseCommand": {
  68. "should throw Error if given non-objects in the array": function () {
  69. assert.throws(function () {
  70. Pipeline.parseCommand({pipeline: [5]});
  71. });
  72. },
  73. "should throw Error if given objects with more / less than one field": function () {
  74. assert.throws(function () {
  75. Pipeline.parseCommand({pipeline: [
  76. {}
  77. ]});
  78. Pipeline.parseCommand({pipeline: [
  79. {a: 1, b: 2}
  80. ]});
  81. });
  82. },
  83. "should throw Error on unknown document sources": function () {
  84. assert.throws(function () {
  85. Pipeline.parseCommand({pipeline: [
  86. {$foo: "$sdfdf"}
  87. ]});
  88. });
  89. },
  90. "should swap $match and $sort if the $match immediately follows the $sort": function () {
  91. var p = Pipeline.parseCommand({pipeline: [
  92. {$sort: {"xyz": 1}},
  93. {$match: {}}
  94. ]});
  95. assert.equal(p.sources[0].constructor.matchName, "$match");
  96. assert.equal(p.sources[1].constructor.sortName, "$sort");
  97. },
  98. "should attempt to coalesce all sources": function () {
  99. var p = Pipeline.parseCommand({pipeline: [
  100. {$test: {coalesce: false}},
  101. {$test: {coalesce: true}},
  102. {$test: {coalesce: false}},
  103. {$test: {coalesce: false}}
  104. ]});
  105. assert.equal(p.sources.length, 3);
  106. p.sources.slice(0, -1).forEach(function (source) {
  107. assert.equal(source.coalesceWasCalled, true);
  108. });
  109. assert.equal(p.sources[p.sources.length -1].coalesceWasCalled, false);
  110. },
  111. "should optimize all sources": function () {
  112. var p = Pipeline.parseCommand({pipeline: [
  113. {$test: {coalesce: false}},
  114. {$test: {coalesce: false}}
  115. ]});
  116. p.sources.forEach(function (source) {
  117. assert.equal(source.optimizeWasCalled, true);
  118. });
  119. }
  120. },
  121. "sharded": {
  122. "should handle empty pipeline for sharded": function () {
  123. var inputPipe = "[]",
  124. expectedMergePipe = "[]",
  125. expectedShardPipe = "[]";
  126. shardedTest(inputPipe, expectedMergePipe, expectedShardPipe);
  127. },
  128. "should handle one unwind": function () {
  129. var inputPipe = '[{"$unwind":"$a"}]',
  130. expectedMergePipe = "[]",
  131. expectedShardPipe = '[{"$unwind":"$a"}]';
  132. shardedTest(inputPipe, expectedMergePipe, expectedShardPipe);
  133. },
  134. "should handle two unwinds": function () {
  135. var inputPipe = '[{"$unwind":"$a"}, {"$unwind":"$b"}]',
  136. expectedMergePipe = "[]",
  137. expectedShardPipe = '[{"$unwind": "$a"}, {"$unwind": "$b"}]';
  138. shardedTest(inputPipe, expectedMergePipe, expectedShardPipe);
  139. }
  140. },
  141. "#stitch": {
  142. "should set the parent source for all sources in the pipeline except the first one": function () {
  143. var p = Pipeline.parseCommand({pipeline:[{$test:{coalesce:false}}, {$test:{coalesce:false}}, {$test:{coalesce:false}}]});
  144. p.stitch();
  145. assert.equal(p.sources[1].source, p.sources[0]);
  146. }
  147. },
  148. "#run": {
  149. "should iterate through sources and return resultant array": function (done) {
  150. var p = Pipeline.parseCommand({pipeline:[{$test:{coalesce:false}}, {$test:{coalesce:false}}, {$test:{coalesce:false}}]}),
  151. results = [];
  152. p.run(function(err, doc) {
  153. if (err) throw err;
  154. if (!doc){
  155. assert.deepEqual(results, [ { val: 5 }, { val: 4 }, { val: 3 }, { val: 2 }, { val: 1 } ]);
  156. done();
  157. } else {
  158. results.push(doc);
  159. }
  160. });
  161. },
  162. "should handle sources that return errors": function (done) {
  163. var p = Pipeline.parseCommand({pipeline:[{$test:{works:false}}]}),
  164. results = [];
  165. p.run(function(err, doc) {
  166. assert(err);
  167. done();
  168. });
  169. }
  170. },
  171. "#addInitialSource": {
  172. "should put the given source at the beginning of the pipeline": function () {
  173. var p = Pipeline.parseCommand({pipeline:[{$test:{coalesce:false}}, {$test:{coalesce:false}}, {$test:{coalesce:false}}]}),
  174. initialSource = Pipeline.stageDesc.$test({coalesce:false});
  175. p.addInitialSource(initialSource);
  176. assert.equal(initialSource, p.sources[0]);
  177. },
  178. "should be able to addInitialSource then stitch": function () {
  179. var p = Pipeline.parseCommand({pipeline:[{$test:{coalesce:false}}, {$test:{coalesce:false}}, {$test:{coalesce:false}}]}),
  180. initialSource = Pipeline.stageDesc.$test({coalesce:false});
  181. p.addInitialSource(initialSource);
  182. p.stitch();
  183. assert.equal(p.sources[1].source, p.sources[0]);
  184. }
  185. },
  186. "#getDependencies()": {
  187. "should properly detect dependencies": function testGetDependencies() {
  188. var p = Pipeline.parseCommand({pipeline: [
  189. {$sort: {"xyz": 1}},
  190. {$project: {"a":"$xyz"}}
  191. ]});
  192. var depsTracker = p.getDependencies();
  193. assert.equal(Object.keys(depsTracker.fields).length, 2);
  194. }
  195. }
  196. }
  197. };
  198. if (!module.parent)(new(require("mocha"))()).ui("exports").reporter("spec").addFile(__filename).grep(process.env.MOCHA_GREP || '').run(process.exit);