Pipeline.js 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. "use strict";
  2. var assert = require("assert"),
  3. Pipeline = require("../../../lib/pipeline/Pipeline"),
  4. FieldPath = require("../../../lib/pipeline/FieldPath"),
  5. DocumentSource = require('../../../lib/pipeline/documentSources/DocumentSource'),
  6. CursorDocumentSource = require("../../../lib/pipeline/documentSources/CursorDocumentSource"),
  7. ArrayRunner = require("../../../lib/query/ArrayRunner");
  8. var addSource = function addSource(match, data) {
  9. var cds = new CursorDocumentSource(null, new ArrayRunner(data), null);
  10. match.setSource(cds);
  11. };
  12. var shardedTest = function(inputPipeString, expectedMergePipeString, expectedShardPipeString) {
  13. var inputPipe = JSON.parse(inputPipeString),
  14. expectedMergePipe = JSON.parse(expectedMergePipeString),
  15. expectedShardPipe = JSON.parse(expectedShardPipeString);
  16. var mergePipe = Pipeline.parseCommand(inputPipe, {});
  17. assert.notEqual(mergePipe, null);
  18. var shardPipe = mergePipe.splitForSharded();
  19. assert.notEqual(shardPipe, null);
  20. assert.equal(shardPipe.serialize()["pipeline"],
  21. expectedShardPipe["pipeline"]);
  22. assert.equal(mergePipe.serialize()["pipeline"],
  23. expectedMergePipe["pipeline"]);
  24. };
  25. module.exports = {
  26. "Pipeline": {
  27. before: function () {
  28. Pipeline.stageDesc.$test = (function () {
  29. var klass = function TestDocumentSource(options, ctx) {
  30. base.call(this, ctx);
  31. this.shouldCoalesce = options.coalesce;
  32. this.coalesceWasCalled = false;
  33. this.optimizeWasCalled = false;
  34. this.works = options.works === false ? false : true; // don't judge
  35. this.current = 5;
  36. }, TestDocumentSource = klass, base = DocumentSource, proto = klass.prototype = Object.create(base.prototype, {constructor: {value: klass}});
  37. proto.coalesce = function () {
  38. this.coalesceWasCalled = true;
  39. var c = this.shouldCoalesce;//only coalesce with the first thing we find
  40. this.shouldCoalesce = false;
  41. return c;
  42. };
  43. proto.optimize = function () {
  44. this.optimizeWasCalled = true;
  45. };
  46. proto.getNext = function(callback){
  47. var answer = this.current > 0 ? {val:this.current--} : DocumentSource.EOF,
  48. err = null;
  49. if (!this.works)
  50. err = new Error("doesn't work"), answer = undefined;
  51. if(callback) {
  52. return callback(err, answer);
  53. } else {
  54. return answer || err;
  55. }
  56. };
  57. klass.createFromJson = function (options, ctx) {
  58. return new TestDocumentSource(options, ctx);
  59. };
  60. return klass;
  61. })().createFromJson;
  62. },
  63. "parseCommand": {
  64. "should throw Error if given non-objects in the array": function () {
  65. assert.throws(function () {
  66. Pipeline.parseCommand({pipeline: [5]});
  67. });
  68. },
  69. "should throw Error if given objects with more / less than one field": function () {
  70. assert.throws(function () {
  71. Pipeline.parseCommand({pipeline: [
  72. {}
  73. ]});
  74. Pipeline.parseCommand({pipeline: [
  75. {a: 1, b: 2}
  76. ]});
  77. });
  78. },
  79. "should throw Error on unknown document sources": function () {
  80. assert.throws(function () {
  81. Pipeline.parseCommand({pipeline: [
  82. {$foo: "$sdfdf"}
  83. ]});
  84. });
  85. },
  86. "should swap $match and $sort if the $match immediately follows the $sort": function () {
  87. debugger;
  88. var p = Pipeline.parseCommand({pipeline: [
  89. {$sort: {"xyz": 1}},
  90. {$match: {}}
  91. ]});
  92. assert.equal(p.sources[0].constructor.matchName, "$match");
  93. assert.equal(p.sources[1].constructor.sortName, "$sort");
  94. },
  95. "should attempt to coalesce all sources": function () {
  96. var p = Pipeline.parseCommand({pipeline: [
  97. {$test: {coalesce: false}},
  98. {$test: {coalesce: true}},
  99. {$test: {coalesce: false}},
  100. {$test: {coalesce: false}}
  101. ]});
  102. assert.equal(p.sources.length, 3);
  103. p.sources.slice(0, -1).forEach(function (source) {
  104. assert.equal(source.coalesceWasCalled, true);
  105. });
  106. assert.equal(p.sources[p.sources.length -1].coalesceWasCalled, false);
  107. },
  108. "should optimize all sources": function () {
  109. var p = Pipeline.parseCommand({pipeline: [
  110. {$test: {coalesce: false}},
  111. {$test: {coalesce: false}}
  112. ]});
  113. p.sources.forEach(function (source) {
  114. assert.equal(source.optimizeWasCalled, true);
  115. });
  116. },
  117. "should duplicate match before redact": function () {
  118. },
  119. "should handle empty pipeline for sharded": function () {
  120. var inputPipe = "{[]}",
  121. expectedMergePipe = "{[]}",
  122. expectedShardPipe = "{[]}";
  123. shardedTest(inputPipe, expectedMergePipe, expectedShardPipe);
  124. },
  125. "should handle one unwind": function () {
  126. var inputPipe = "[{$unwind: '$a'}]}",
  127. expectedMergePipe = "[]}",
  128. expectedShardPipe = "[{$unwind: '$a'}]}";
  129. shardedTest(inputPipe, expectedMergePipe, expectedShardPipe);
  130. },
  131. "should handle two unwinds": function () {
  132. var inputPipe = "[{$unwind: '$a'}, {$unwind: '$b'}]}",
  133. expectedMergePipe = "[]}",
  134. expectedShardPipe = "[{$unwind: '$a'}, {$unwind: '$b'}]}";
  135. shardedTest(inputPipe, expectedMergePipe, expectedShardPipe);
  136. }
  137. },
  138. "#stitch": {
  139. "should set the parent source for all sources in the pipeline except the first one": function () {
  140. var p = Pipeline.parseCommand({pipeline:[{$test:{coalesce:false}}, {$test:{coalesce:false}}, {$test:{coalesce:false}}]});
  141. p.stitch();
  142. assert.equal(p.sources[1].source, p.sources[0]);
  143. }
  144. },
  145. "#_runSync": {
  146. "should iterate through sources and return resultant array": function () {
  147. var p = Pipeline.parseCommand({pipeline:[{$test:{coalesce:false}}, {$test:{coalesce:false}}, {$test:{coalesce:false}}]}),
  148. results = p.run(function(err, results) {
  149. assert.deepEqual(results.result, [ { val: 5 }, { val: 4 }, { val: 3 }, { val: 2 }, { val: 1 } ]);
  150. });
  151. },
  152. "should catch parse errors": function () {
  153. // The $foo part is invalid and causes a throw.
  154. assert.throws(function () {
  155. Pipeline.parseCommand({pipeline: [
  156. {$match: {$foo: {bar: "baz"}}}
  157. ]});
  158. });
  159. },
  160. "should call callback with errors from pipeline components": function (next) {
  161. var p = Pipeline.parseCommand({pipeline: [
  162. {$match: {foo: {bar: "baz"}}}
  163. ]});
  164. p.run(new DocumentSource({}), function (err, results) {
  165. assert(err instanceof Error);
  166. return next();
  167. });
  168. }
  169. },
  170. "#_runAsync": {
  171. "should iterate through sources and return resultant array asynchronously": function () {
  172. var p = Pipeline.parseCommand({pipeline:[{$test:{coalesce:false}}, {$test:{coalesce:false}}, {$test:{coalesce:false}}]}),
  173. results = p.run(function(err, results) {
  174. assert.deepEqual(results.result, [ { val: 5 }, { val: 4 }, { val: 3 }, { val: 2 }, { val: 1 } ]);
  175. });
  176. }
  177. },
  178. "#addInitialSource": {
  179. "should put the given source at the beginning of the pipeline": function () {
  180. var p = Pipeline.parseCommand({pipeline:[{$test:{coalesce:false}}, {$test:{coalesce:false}}, {$test:{coalesce:false}}]}),
  181. initialSource = Pipeline.stageDesc.$test({coalesce:false});
  182. p.addInitialSource(initialSource);
  183. assert.equal(initialSource, p.sources[0]);
  184. },
  185. "should be able to addInitialSource then stitch": function () {
  186. var p = Pipeline.parseCommand({pipeline:[{$test:{coalesce:false}}, {$test:{coalesce:false}}, {$test:{coalesce:false}}]}),
  187. initialSource = Pipeline.stageDesc.$test({coalesce:false});
  188. p.addInitialSource(initialSource);
  189. p.stitch();
  190. assert.equal(p.sources[1].source, p.sources[0]);
  191. }
  192. }
  193. }
  194. };
  195. if (!module.parent)(new(require("mocha"))()).ui("exports").reporter("spec").addFile(__filename).grep(process.env.MOCHA_GREP || '').run(process.exit);