| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242 |
- "use strict";
- if (!module.parent) return require.cache[__filename] = 0, (new(require("mocha"))()).addFile(__filename).ui("exports").run(process.exit);
- var assert = require("assert"),
- Pipeline = require("../../../lib/pipeline/Pipeline"),
- DocumentSource = require("../../../lib/pipeline/documentSources/DocumentSource");
- function shardedTest(inputPipeString, expectedMergePipeString, expectedShardPipeString) {
- inputPipeString = "{\"pipeline\": " + inputPipeString + "}";
- expectedMergePipeString = "{\"pipeline\": " + expectedMergePipeString + "}";
- expectedShardPipeString = "{\"pipeline\": " + expectedShardPipeString + "}";
- var inputPipe = JSON.parse(inputPipeString),
- expectedMergePipe = JSON.parse(expectedMergePipeString),
- expectedShardPipe = JSON.parse(expectedShardPipeString);
- var mergePipe = Pipeline.parseCommand(inputPipe, {});
- assert.notEqual(mergePipe, null);
- var shardPipe = mergePipe.splitForSharded();
- assert.notEqual(shardPipe, null);
- assert.deepEqual(shardPipe.serialize().pipeline, expectedShardPipe.pipeline);
- assert.deepEqual(mergePipe.serialize().pipeline, expectedMergePipe.pipeline);
- }
- module.exports = {
- "Pipeline": {
- before: function () {
- Pipeline.stageDesc.$test = (function () {
- var klass = function TestDocumentSource(options, ctx) {
- base.call(this, ctx);
- this.shouldCoalesce = options.coalesce;
- this.coalesceWasCalled = false;
- this.optimizeWasCalled = false;
- this.works = options.works === false ? false : true; // don't judge
- this.current = 5;
- }, TestDocumentSource = klass, base = DocumentSource, proto = klass.prototype = Object.create(base.prototype, {constructor: {value: klass}}); //jshint ignore:line
- proto.coalesce = function () {
- this.coalesceWasCalled = true;
- var c = this.shouldCoalesce;//only coalesce with the first thing we find
- this.shouldCoalesce = false;
- return c;
- };
- proto.optimize = function () {
- this.optimizeWasCalled = true;
- };
- proto.getNext = function(callback){
- var answer = this.current > 0 ? {val:this.current--} : null,
- err = null;
- if (!this.works)
- err = new Error("doesn't work"), answer = undefined;
- if(callback) {
- return callback(err, answer);
- } else {
- return answer || err;
- }
- };
- klass.createFromJson = function (options, ctx) {
- return new TestDocumentSource(options, ctx);
- };
- return klass;
- })().createFromJson;
- },
- "parseCommand": {
- "should throw Error if given non-objects in the array": function () {
- assert.throws(function () {
- Pipeline.parseCommand({pipeline: [5]});
- });
- },
- "should throw Error if given objects with more / less than one field": function () {
- assert.throws(function () {
- Pipeline.parseCommand({pipeline: [
- {}
- ]});
- Pipeline.parseCommand({pipeline: [
- {a: 1, b: 2}
- ]});
- });
- },
- "should throw Error on unknown document sources": function () {
- assert.throws(function () {
- Pipeline.parseCommand({pipeline: [
- {$foo: "$sdfdf"}
- ]});
- });
- },
- "should swap $match and $sort if the $match immediately follows the $sort": function () {
- var p = Pipeline.parseCommand({pipeline: [
- {$sort: {"xyz": 1}},
- {$match: {}}
- ]});
- assert.equal(p.sources[0].constructor.matchName, "$match");
- assert.equal(p.sources[1].constructor.sortName, "$sort");
- },
- "should attempt to coalesce all sources": function () {
- var p = Pipeline.parseCommand({pipeline: [
- {$test: {coalesce: false}},
- {$test: {coalesce: true}},
- {$test: {coalesce: false}},
- {$test: {coalesce: false}}
- ]});
- assert.equal(p.sources.length, 3);
- p.sources.slice(0, -1).forEach(function (source) {
- assert.equal(source.coalesceWasCalled, true);
- });
- assert.equal(p.sources[p.sources.length -1].coalesceWasCalled, false);
- },
- "should optimize all sources": function () {
- var p = Pipeline.parseCommand({pipeline: [
- {$test: {coalesce: false}},
- {$test: {coalesce: false}}
- ]});
- p.sources.forEach(function (source) {
- assert.equal(source.optimizeWasCalled, true);
- });
- }
- },
- "sharded": {
- "should handle empty pipeline for sharded": function () {
- var inputPipe = "[]",
- expectedMergePipe = "[]",
- expectedShardPipe = "[]";
- shardedTest(inputPipe, expectedShardPipe, expectedMergePipe);
- },
- "should handle one unwind": function () {
- var inputPipe = JSON.stringify([{$unwind:"$a"}]),
- expectedShardPipe = "[]",
- expectedMergePipe = JSON.stringify([{$unwind:"$a"}]);
- shardedTest(inputPipe, expectedMergePipe, expectedShardPipe);
- },
- "should handle two unwinds": function () {
- var inputPipe = JSON.stringify([{$unwind:"$a"},{$unwind:"$b"}]),
- expectedShardPipe = "[]",
- expectedMergePipe = JSON.stringify([{$unwind:"$a"},{$unwind:"$b"}]);
- shardedTest(inputPipe, expectedMergePipe, expectedShardPipe);
- },
- "should handle unwind not final": function () {
- var inputPipe = JSON.stringify([{$unwind:"$a"},{$match:{"a":1}}]),
- expectedShardPipe = "[]",
- expectedMergePipe = JSON.stringify([{$unwind:"$a"},{$match:{"a":1}}]);
- shardedTest(inputPipe, expectedShardPipe, expectedMergePipe);
- },
- "should handle unwind with other": function () {
- var inputPipe = JSON.stringify([{$match:{"a":1}},{$unwind:"$a"}]),
- expectedShardPipe = JSON.stringify([{$match:{"a":1}}]),
- expectedMergePipe = JSON.stringify([{$unwind:"$a"}]);
- shardedTest(inputPipe,expectedMergePipe, expectedShardPipe);
- }
- },
- "#stitch": {
- "should set the parent source for all sources in the pipeline except the first one": function () {
- var p = Pipeline.parseCommand({pipeline:[{$test:{coalesce:false}}, {$test:{coalesce:false}}, {$test:{coalesce:false}}]});
- p.stitch();
- assert.equal(p.sources[1].source, p.sources[0]);
- }
- },
- "#run": {
- "should iterate through sources and return resultant array": function (done) {
- var p = Pipeline.parseCommand({pipeline:[{$test:{coalesce:false}}, {$test:{coalesce:false}}, {$test:{coalesce:false}}]}),
- results = [];
- p.run(function(err, doc) {
- if (err) throw err;
- if (!doc){
- assert.deepEqual(results, [ { val: 5 }, { val: 4 }, { val: 3 }, { val: 2 }, { val: 1 } ]);
- done();
- } else {
- results.push(doc);
- }
- });
- },
- "should handle sources that return errors": function (done) {
- var p = Pipeline.parseCommand({pipeline:[{$test:{works:false}}]});
- p.run(function(err, doc) {
- assert(err);
- done();
- });
- }
- },
- "#addInitialSource": {
- "should put the given source at the beginning of the pipeline": function () {
- var p = Pipeline.parseCommand({pipeline:[{$test:{coalesce:false}}, {$test:{coalesce:false}}, {$test:{coalesce:false}}]}),
- initialSource = Pipeline.stageDesc.$test({coalesce:false});
- p.addInitialSource(initialSource);
- assert.equal(initialSource, p.sources[0]);
- },
- "should be able to addInitialSource then stitch": function () {
- var p = Pipeline.parseCommand({pipeline:[{$test:{coalesce:false}}, {$test:{coalesce:false}}, {$test:{coalesce:false}}]}),
- initialSource = Pipeline.stageDesc.$test({coalesce:false});
- p.addInitialSource(initialSource);
- p.stitch();
- assert.equal(p.sources[1].source, p.sources[0]);
- }
- },
- "#getDependencies()": {
- "should properly detect dependencies": function testGetDependencies() {
- var p = Pipeline.parseCommand({pipeline: [
- {$sort: {"xyz": 1}},
- {$project: {"a":"$xyz"}}
- ]});
- var depsTracker = p.getDependencies();
- assert.equal(Object.keys(depsTracker.fields).length, 2);
- }
- }
- }
- };
|