"use strict"; if (!module.parent) return require.cache[__filename] = 0, (new(require("mocha"))()).addFile(__filename).ui("exports").run(process.exit); var assert = require("assert"), Pipeline = require("../../../lib/pipeline/Pipeline"), DocumentSource = require("../../../lib/pipeline/documentSources/DocumentSource"); function shardedTest(inputPipeString, expectedMergePipeString, expectedShardPipeString) { inputPipeString = "{\"pipeline\": " + inputPipeString + "}"; expectedMergePipeString = "{\"pipeline\": " + expectedMergePipeString + "}"; expectedShardPipeString = "{\"pipeline\": " + expectedShardPipeString + "}"; var inputPipe = JSON.parse(inputPipeString), expectedMergePipe = JSON.parse(expectedMergePipeString), expectedShardPipe = JSON.parse(expectedShardPipeString); var mergePipe = Pipeline.parseCommand(inputPipe, {}); assert.notEqual(mergePipe, null); var shardPipe = mergePipe.splitForSharded(); assert.notEqual(shardPipe, null); assert.deepEqual(shardPipe.serialize().pipeline, expectedShardPipe.pipeline); assert.deepEqual(mergePipe.serialize().pipeline, expectedMergePipe.pipeline); } module.exports = { "Pipeline": { before: function () { Pipeline.stageDesc.$test = (function () { var klass = function TestDocumentSource(options, ctx) { base.call(this, ctx); this.shouldCoalesce = options.coalesce; this.coalesceWasCalled = false; this.optimizeWasCalled = false; this.works = options.works === false ? false : true; // don't judge this.current = 5; }, TestDocumentSource = klass, base = DocumentSource, proto = klass.prototype = Object.create(base.prototype, {constructor: {value: klass}}); //jshint ignore:line proto.coalesce = function () { this.coalesceWasCalled = true; var c = this.shouldCoalesce;//only coalesce with the first thing we find this.shouldCoalesce = false; return c; }; proto.optimize = function () { this.optimizeWasCalled = true; }; proto.getNext = function(callback){ var answer = this.current > 0 ? {val:this.current--} : null, err = null; if (!this.works) err = new Error("doesn't work"), answer = undefined; if(callback) { return callback(err, answer); } else { return answer || err; } }; klass.createFromJson = function (options, ctx) { return new TestDocumentSource(options, ctx); }; return klass; })().createFromJson; }, "parseCommand": { "should throw Error if given non-objects in the array": function () { assert.throws(function () { Pipeline.parseCommand({pipeline: [5]}); }); }, "should throw Error if given objects with more / less than one field": function () { assert.throws(function () { Pipeline.parseCommand({pipeline: [ {} ]}); Pipeline.parseCommand({pipeline: [ {a: 1, b: 2} ]}); }); }, "should throw Error on unknown document sources": function () { assert.throws(function () { Pipeline.parseCommand({pipeline: [ {$foo: "$sdfdf"} ]}); }); }, "should swap $match and $sort if the $match immediately follows the $sort": function () { var p = Pipeline.parseCommand({pipeline: [ {$sort: {"xyz": 1}}, {$match: {}} ]}); assert.equal(p.sources[0].constructor.matchName, "$match"); assert.equal(p.sources[1].constructor.sortName, "$sort"); }, "should attempt to coalesce all sources": function () { var p = Pipeline.parseCommand({pipeline: [ {$test: {coalesce: false}}, {$test: {coalesce: true}}, {$test: {coalesce: false}}, {$test: {coalesce: false}} ]}); assert.equal(p.sources.length, 3); p.sources.slice(0, -1).forEach(function (source) { assert.equal(source.coalesceWasCalled, true); }); assert.equal(p.sources[p.sources.length -1].coalesceWasCalled, false); }, "should optimize all sources": function () { var p = Pipeline.parseCommand({pipeline: [ {$test: {coalesce: false}}, {$test: {coalesce: false}} ]}); p.sources.forEach(function (source) { assert.equal(source.optimizeWasCalled, true); }); } }, "sharded": { "should handle empty pipeline for sharded": function () { var inputPipe = "[]", expectedMergePipe = "[]", expectedShardPipe = "[]"; shardedTest(inputPipe, expectedShardPipe, expectedMergePipe); }, "should handle one unwind": function () { var inputPipe = JSON.stringify([{$unwind:"$a"}]), expectedShardPipe = "[]", expectedMergePipe = JSON.stringify([{$unwind:"$a"}]); shardedTest(inputPipe, expectedMergePipe, expectedShardPipe); }, "should handle two unwinds": function () { var inputPipe = JSON.stringify([{$unwind:"$a"},{$unwind:"$b"}]), expectedShardPipe = "[]", expectedMergePipe = JSON.stringify([{$unwind:"$a"},{$unwind:"$b"}]); shardedTest(inputPipe, expectedMergePipe, expectedShardPipe); }, "should handle unwind not final": function () { var inputPipe = JSON.stringify([{$unwind:"$a"},{$match:{"a":1}}]), expectedShardPipe = "[]", expectedMergePipe = JSON.stringify([{$unwind:"$a"},{$match:{"a":1}}]); shardedTest(inputPipe, expectedShardPipe, expectedMergePipe); }, "should handle unwind with other": function () { var inputPipe = JSON.stringify([{$match:{"a":1}},{$unwind:"$a"}]), expectedShardPipe = JSON.stringify([{$match:{"a":1}}]), expectedMergePipe = JSON.stringify([{$unwind:"$a"}]); shardedTest(inputPipe,expectedMergePipe, expectedShardPipe); } }, "#stitch": { "should set the parent source for all sources in the pipeline except the first one": function () { var p = Pipeline.parseCommand({pipeline:[{$test:{coalesce:false}}, {$test:{coalesce:false}}, {$test:{coalesce:false}}]}); p.stitch(); assert.equal(p.sources[1].source, p.sources[0]); } }, "#run": { "should iterate through sources and return resultant array": function (done) { var p = Pipeline.parseCommand({pipeline:[{$test:{coalesce:false}}, {$test:{coalesce:false}}, {$test:{coalesce:false}}]}), results = []; p.run(function(err, doc) { if (err) throw err; if (!doc){ assert.deepEqual(results, [ { val: 5 }, { val: 4 }, { val: 3 }, { val: 2 }, { val: 1 } ]); done(); } else { results.push(doc); } }); }, "should handle sources that return errors": function (done) { var p = Pipeline.parseCommand({pipeline:[{$test:{works:false}}]}); p.run(function(err, doc) { assert(err); done(); }); } }, "#addInitialSource": { "should put the given source at the beginning of the pipeline": function () { var p = Pipeline.parseCommand({pipeline:[{$test:{coalesce:false}}, {$test:{coalesce:false}}, {$test:{coalesce:false}}]}), initialSource = Pipeline.stageDesc.$test({coalesce:false}); p.addInitialSource(initialSource); assert.equal(initialSource, p.sources[0]); }, "should be able to addInitialSource then stitch": function () { var p = Pipeline.parseCommand({pipeline:[{$test:{coalesce:false}}, {$test:{coalesce:false}}, {$test:{coalesce:false}}]}), initialSource = Pipeline.stageDesc.$test({coalesce:false}); p.addInitialSource(initialSource); p.stitch(); assert.equal(p.sources[1].source, p.sources[0]); } }, "#getDependencies()": { "should properly detect dependencies": function testGetDependencies() { var p = Pipeline.parseCommand({pipeline: [ {$sort: {"xyz": 1}}, {$project: {"a":"$xyz"}} ]}); var depsTracker = p.getDependencies(); assert.equal(Object.keys(depsTracker.fields).length, 2); } } } };