Pipeline.js 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. "use strict";
  2. var assert = require("assert"),
  3. Pipeline = require("../../../lib/pipeline/Pipeline"),
  4. FieldPath = require("../../../lib/pipeline/FieldPath"),
  5. DocumentSource = require('../../../lib/pipeline/documentSources/DocumentSource'),
  6. CursorDocumentSource = require("../../../lib/pipeline/documentSources/CursorDocumentSource"),
  7. ArrayRunner = require("../../../lib/query/ArrayRunner");
  8. var addSource = function addSource(match, data) {
  9. var cds = new CursorDocumentSource(null, new ArrayRunner(data), null);
  10. match.setSource(cds);
  11. };
  12. var shardedTest = function(inputPipe, expectedMergePipeString, expectedShardPipeString) {
  13. var expectedMergePipe = JSON.parse(expectedMergePipeString),
  14. expectedShardPipe = JSON.parse(expectedShardPipeString);
  15. var mergePipe = Pipeline.parseCommand(inputPipe, {});
  16. assert.notEqual(mergePipe, null);
  17. var shardPipe = mergePipe.splitForSharded();
  18. assert.notEqual(shardPipe, null);
  19. assert.equal(shardPipe.serialize()["pipeline"],
  20. expectedShardPipe["pipeline"]);
  21. assert.equal(mergePipe.serialize()["pipeline"],
  22. expectedMergePipe["pipeline"]);
  23. };
  24. module.exports = {
  25. "Pipeline": {
  26. before: function () {
  27. Pipeline.stageDesc.$test = (function () {
  28. var klass = function TestDocumentSource(options, ctx) {
  29. base.call(this, ctx);
  30. this.shouldCoalesce = options.coalesce;
  31. this.coalesceWasCalled = false;
  32. this.optimizeWasCalled = false;
  33. this.works = options.works === false ? false : true; // don't judge
  34. this.current = 5;
  35. }, TestDocumentSource = klass, base = DocumentSource, proto = klass.prototype = Object.create(base.prototype, {constructor: {value: klass}});
  36. proto.coalesce = function () {
  37. this.coalesceWasCalled = true;
  38. var c = this.shouldCoalesce;//only coalesce with the first thing we find
  39. this.shouldCoalesce = false;
  40. return c;
  41. };
  42. proto.optimize = function () {
  43. this.optimizeWasCalled = true;
  44. };
  45. proto.getNext = function(callback){
  46. var answer = this.current > 0 ? {val:this.current--} : DocumentSource.EOF,
  47. err = null;
  48. if (!this.works)
  49. err = new Error("doesn't work"), answer = undefined;
  50. if(callback) {
  51. return callback(err, answer);
  52. } else {
  53. return answer || err;
  54. }
  55. };
  56. klass.createFromJson = function (options, ctx) {
  57. return new TestDocumentSource(options, ctx);
  58. };
  59. return klass;
  60. })().createFromJson;
  61. },
  62. "parseCommand": {
  63. "should throw Error if given non-objects in the array": function () {
  64. assert.throws(function () {
  65. Pipeline.parseCommand({pipeline: [5]});
  66. });
  67. },
  68. "should throw Error if given objects with more / less than one field": function () {
  69. assert.throws(function () {
  70. Pipeline.parseCommand({pipeline: [
  71. {}
  72. ]});
  73. Pipeline.parseCommand({pipeline: [
  74. {a: 1, b: 2}
  75. ]});
  76. });
  77. },
  78. "should throw Error on unknown document sources": function () {
  79. assert.throws(function () {
  80. Pipeline.parseCommand({pipeline: [
  81. {$foo: "$sdfdf"}
  82. ]});
  83. });
  84. },
  85. "should swap $match and $sort if the $match immediately follows the $sort": function () {
  86. var p = Pipeline.parseCommand({pipeline: [
  87. {$sort: {"xyz": 1}},
  88. {$match: {}}
  89. ]});
  90. assert.equal(p.sources[0].constructor.matchName, "$match");
  91. assert.equal(p.sources[1].constructor.sortName, "$sort");
  92. },
  93. "should attempt to coalesce all sources": function () {
  94. var p = Pipeline.parseCommand({pipeline: [
  95. {$test: {coalesce: false}},
  96. {$test: {coalesce: true}},
  97. {$test: {coalesce: false}},
  98. {$test: {coalesce: false}}
  99. ]});
  100. assert.equal(p.sources.length, 3);
  101. p.sources.slice(0, -1).forEach(function (source) {
  102. assert.equal(source.coalesceWasCalled, true);
  103. });
  104. assert.equal(p.sources[p.sources.length -1].coalesceWasCalled, false);
  105. },
  106. "should optimize all sources": function () {
  107. var p = Pipeline.parseCommand({pipeline: [
  108. {$test: {coalesce: false}},
  109. {$test: {coalesce: false}}
  110. ]});
  111. p.sources.forEach(function (source) {
  112. assert.equal(source.optimizeWasCalled, true);
  113. });
  114. }
  115. },
  116. // "sharded": {
  117. // "should handle empty pipeline for sharded": function () {
  118. // var inputPipe = {pipeline: []},
  119. // expectedMergePipe = "[]",
  120. // expectedShardPipe = "[]";
  121. // shardedTest(inputPipe, expectedMergePipe, expectedShardPipe);
  122. // },
  123. // "should handle one unwind": function () {
  124. // var inputPipe = "[{$unwind: '$a'}]}",
  125. // expectedMergePipe = "[]",
  126. // expectedShardPipe = "[{$unwind: '$a'}]";
  127. // shardedTest(inputPipe, expectedMergePipe, expectedShardPipe);
  128. // },
  129. // "should handle two unwinds": function () {
  130. // var inputPipe = "[{$unwind: '$a'}, {$unwind: '$b'}]}",
  131. // expectedMergePipe = "[]",
  132. // expectedShardPipe = "[{$unwind: '$a'}, {$unwind: '$b'}]}";
  133. // shardedTest(inputPipe, expectedMergePipe, expectedShardPipe);
  134. // }
  135. // },
  136. "#stitch": {
  137. "should set the parent source for all sources in the pipeline except the first one": function () {
  138. var p = Pipeline.parseCommand({pipeline:[{$test:{coalesce:false}}, {$test:{coalesce:false}}, {$test:{coalesce:false}}]});
  139. p.stitch();
  140. assert.equal(p.sources[1].source, p.sources[0]);
  141. }
  142. },
  143. "#_runSync": {
  144. "should iterate through sources and return resultant array": function () {
  145. var p = Pipeline.parseCommand({pipeline:[{$test:{coalesce:false}}, {$test:{coalesce:false}}, {$test:{coalesce:false}}]}),
  146. results = p.run(function(err, results) {
  147. assert.deepEqual(results.result, [ { val: 5 }, { val: 4 }, { val: 3 }, { val: 2 }, { val: 1 } ]);
  148. });
  149. },
  150. "should catch parse errors": function () {
  151. // The $foo part is invalid and causes a throw.
  152. assert.throws(function () {
  153. Pipeline.parseCommand({pipeline: [
  154. {$foo: {bar: "baz"}}
  155. ]});
  156. });
  157. },
  158. "should call callback with errors from pipeline components": function (next) {
  159. var p = Pipeline.parseCommand({pipeline: [
  160. {$foo: {bar: "baz"}}
  161. ]});
  162. p.run(new DocumentSource({}), function (err, results) {
  163. assert(err instanceof Error);
  164. return next();
  165. });
  166. }
  167. },
  168. "#_runAsync": {
  169. "should iterate through sources and return resultant array asynchronously": function () {
  170. var p = Pipeline.parseCommand({pipeline:[{$test:{coalesce:false}}, {$test:{coalesce:false}}, {$test:{coalesce:false}}]}),
  171. results = p.run(function(err, results) {
  172. assert.deepEqual(results.result, [ { val: 5 }, { val: 4 }, { val: 3 }, { val: 2 }, { val: 1 } ]);
  173. });
  174. }
  175. },
  176. "#addInitialSource": {
  177. "should put the given source at the beginning of the pipeline": function () {
  178. var p = Pipeline.parseCommand({pipeline:[{$test:{coalesce:false}}, {$test:{coalesce:false}}, {$test:{coalesce:false}}]}),
  179. initialSource = Pipeline.stageDesc.$test({coalesce:false});
  180. p.addInitialSource(initialSource);
  181. assert.equal(initialSource, p.sources[0]);
  182. },
  183. "should be able to addInitialSource then stitch": function () {
  184. var p = Pipeline.parseCommand({pipeline:[{$test:{coalesce:false}}, {$test:{coalesce:false}}, {$test:{coalesce:false}}]}),
  185. initialSource = Pipeline.stageDesc.$test({coalesce:false});
  186. p.addInitialSource(initialSource);
  187. p.stitch();
  188. assert.equal(p.sources[1].source, p.sources[0]);
  189. }
  190. }
  191. }
  192. };
  193. if (!module.parent)(new(require("mocha"))()).ui("exports").reporter("spec").addFile(__filename).grep(process.env.MOCHA_GREP || '').run(process.exit);