Browse Source

EAGLESIX-812: finished up most of the functionality for PipelineD. Test cases still pending

Phil Murray 11 năm trước cách đây
mục cha
commit
749efb7259

+ 1 - 0
lib/index.js

@@ -81,6 +81,7 @@ exports.aggregate = exports;
 //Expose these so that mungedb-aggregate can be extended.
 exports.Cursor = require("./Cursor");
 exports.pipeline = require("./pipeline/");
+exports.query = require("./query/");
 
 // version info
 exports.version = "r2.5.4";

+ 12 - 93
lib/pipeline/PipelineD.js

@@ -16,7 +16,7 @@ var DocumentSource = require('./documentSources/DocumentSource'),
 	CursorDocumentSource = require('./documentSources/CursorDocumentSource'),
 	SortDocumentSource = require('./documentSources/SortDocumentSource'),
 	MatchDocumentSource = require('./documentSources/MatchDocumentSource'),
-	Cursor = require('../query/ArrayRunner');
+	getRunner = require('../query').getRunner;
 
 /**
  * Create a Cursor wrapped in a DocumentSourceCursor, which is suitable to be the first source for a pipeline to begin with.
@@ -37,9 +37,6 @@ var DocumentSource = require('./documentSources/DocumentSource'),
 **/
 klass.prepareCursorSource = function prepareCursorSource(pipeline, expCtx){
 
-	// get the full "namespace" name
-	var data = expCtx.ns; //NOTE: ns will likely be either an array of documents or a document source in munge
-
 	// We will be modifying the source vector as we go
 	var sources = pipeline.sources;
 
@@ -68,20 +65,20 @@ klass.prepareCursorSource = function prepareCursorSource(pipeline, expCtx){
 	var projectionForQuery = deps.needTextScore ? deps.toProjection() : {};
 
 	/*
-	  Look for an initial sort; we'll try to add this to the
-	  Cursor we create.  If we're successful in doing that (further down),
-	  we'll remove the $sort from the pipeline, because the documents
-	  will already come sorted in the specified order as a result of the
-	  index scan.
+	Look for an initial sort; we'll try to add this to the
+	Cursor we create.  If we're successful in doing that (further down),
+	we'll remove the $sort from the pipeline, because the documents
+	will already come sorted in the specified order as a result of the
+	index scan.
 	*/
-	var sortStorage,
+	var sortStage,
 		sortObj,
 		sortInRunner = false;
 	if (sources.length) {
 		sortStage = sources[0] instanceof SortDocumentSource ? sources[0] : undefined;
 		
 		//need to check the next source since we are not deleting the initial match in munge
-		if (!sortStorage && sources[0] instanceof MatchDocumentSource){
+		if (!sortStage && sources[0] instanceof MatchDocumentSource){
 			sortStage = sources[1] instanceof SortDocumentSource ? sources[1] : undefined;
 		}
 		if (sortStage) {
@@ -92,23 +89,15 @@ klass.prepareCursorSource = function prepareCursorSource(pipeline, expCtx){
 	}
 
 	// Create the Runner.
-	// NOTE: the logic here is munge specific
-	var runner;
-	if (data.constructor === Array) {
-		runner = new ArrayRunner(data);
-	} else if (data instanceof DocumentSource) {
-		//do something else here.  TODO: make a new Runner Type?
-	} else {
-		throw new Error('unrecognized data source');
-	}
-
+	// NOTE: the logic here is simplified for munge
+	var runner = getRunner(expCtx.ns, queryObj, sortObj, projectionForQuery, sources);
 
 	// DocumentSourceCursor expects a yielding Runner that has had its state saved.
-	//runner->setYieldPolicy(Runner::YIELD_AUTO);
+	//runner.setYieldPolicy(Runner.RunnerState.YIELD_AUTO); //Skipped as we don't really support yielding yet
 	runner.saveState();
 
 	// Put the Runner into a DocumentSourceCursor and add it to the front of the pipeline.
-	var source = new DocumentSourceCursor("", runner, pExpCtx);
+	var source = new CursorDocumentSource("", runner, expCtx);
 
 	// Note the query, sort, and projection for explain.
 	source.setQuery(queryObj);
@@ -124,74 +113,4 @@ klass.prepareCursorSource = function prepareCursorSource(pipeline, expCtx){
 	pipeline.addInitialSource(source);
 
 	return runner;
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-	var sources = pipeline.sources;
-
-	// NOTE: SKIPPED: look for initial match
-	// NOTE: SKIPPED: create a query object
-
-	// Look for an initial simple project; we'll avoid constructing Values for fields that won't make it through the projection
-	var projection = {};
-	var dependencies;
-	var deps = {};
-	var status = DocumentSource.GetDepsReturn.SEE_NEXT;
-	for (var i=0; i < sources.length && status !== DocumentSource.GetDepsReturn.EXHAUSTIVE; i++) {
-		status = sources[i].getDependencies(deps);
-		if(Object.keys(deps).length === 0) {
-			status = DocumentSource.GetDepsReturn.NOT_SUPPORTED;
-		}
-	}
-	if (status === DocumentSource.GetDepsReturn.EXHAUSTIVE) {
-		projection = DocumentSource.depsToProjection(deps);
-		dependencies = DocumentSource.parseDeps(deps);
-	}
-
-	// NOTE: SKIPPED: Look for an initial sort
-	// NOTE: SKIPPED: Create the sort object
-
-	//get the full "namespace" name
-	// var fullName = dbName + "." + pipeline.collectionName;
-
-	// NOTE: SKIPPED: if(DEV) log messages
-
-	// Create the necessary context to use a Cursor
-	// NOTE: SKIPPED: pSortedCursor bit
-	// NOTE: SKIPPED: pUnsortedCursor bit
-
-	// NOTE: Deviating from mongo here. We're passing in a source or set of documents instead of collection name in the ctx.ns field
-	var source;
-	if(expCtx.ns instanceof DocumentSource){
-		source = expCtx.ns;
-	} else {
-		var cursorWithContext = new CursorDocumentSource.CursorWithContext(/*fullName*/);
-
-		// Now add the Cursor to cursorWithContext
-		cursorWithContext._cursor = new Cursor( expCtx.ns );	//NOTE: collectionName will likely be an array of documents in munge
-
-		// wrap the cursor with a DocumentSource and return that
-		source = new CursorDocumentSource( cursorWithContext, expCtx );
-
-		// NOTE: SKIPPED: Note the query and sort
-
-		if (Object.keys(projection).length) source.setProjection(projection, dependencies);
-
-		while(sources.length > 0 && source.coalesce(sources[0])) { //Note: Attempting to coalesce into the cursor source
-			sources.shift();
-		}
-	}
-
-	pipeline.addInitialSource(source);
 };

+ 7 - 6
lib/query/DocumentSourceRunner.js

@@ -30,18 +30,19 @@ var klass = module.exports = function DocumentSourceRunner(docSrc, pipeline){
  * @param [callback] {Function}
  */
 proto.getNext = function getNext(callback) {
-	if (this._state === Runner.RunnerState.RUNNER_ADVANCED) {
-		return this._docSrc.getNext(function (err, obj){
+	var self = this;
+	if (self._state === Runner.RunnerState.RUNNER_ADVANCED) {
+		return self._docSrc.getNext(function (err, obj){
 			if (err){
-				this._state = Runner.RunnerState.RUNNER_ERROR;
+				self._state = Runner.RunnerState.RUNNER_ERROR;
 			}
 			if (obj === null){
-				this._state = Runner.RunnerState.RUNNER_EOF;
+				self._state = Runner.RunnerState.RUNNER_EOF;
 			}
-			return callback(err, obj, this._state);
+			return callback(err, obj, self._state);
 		});
 	}
-	return callback(null, null, this._state);
+	return callback(null, null, self._state);
 };
 
 /**

+ 18 - 2
lib/query/index.js

@@ -1,5 +1,21 @@
 "use strict";
+
+var DocumentSource = require('../pipeline/documentSources/DocumentSource'),
+	Runner = require("./Runner.js"),
+	ArrayRunner = require("./ArrayRunner.js"),
+	DocumentSourceRunner = require("./DocumentSourceRunner.js");
+
 module.exports = {
-	Runner: require("./Runner.js"),
-	ArrayRunner: require("./ArrayRunner.js")
+	Runner: Runner,
+	ArrayRunner: ArrayRunner,
+	DocumentSourceRunner: DocumentSourceRunner,
+	getRunner: function(data, queryObj, sortObj, projectionForQuery, sources){
+		if (data && data.constructor === Array){
+			return new ArrayRunner(data);
+		} else if (data && data instanceof DocumentSource){
+			return new DocumentSourceRunner(data, sources);
+		} else {
+			throw new Error('could not construct Runner from given data');
+		}
+	}
 };

+ 11 - 64
test/lib/pipeline/PipelineD.js

@@ -10,78 +10,19 @@ module.exports = {
 
 	"PipelineD": {
 
-		before: function(){
-
-			Pipeline.stageDesc.$test = (function(){
-
-				var klass = function TestDocumentSource(options, ctx){
-					base.call(this, ctx);
-
-					this.shouldCoalesce = options.coalesce;
-					this.coalesceWasCalled = false;
-					this.optimizeWasCalled = false;
-					this.resetWasCalled = false;
-
-					this.current = 5;
-				}, TestDocumentSource = klass, base = DocumentSource, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
-
-				proto.coalesce = function(){
-					this.coalesceWasCalled = true;
-					var c = this.shouldCoalesce;//only coalesce with the first thing we find
-					this.shouldCoalesce = false;
-					return c;
-				};
-
-				proto.optimize = function(){
-					this.optimizeWasCalled = true;
-				};
-
-				proto.eof = function(){
-					return this.current < 0;
-				};
-
-				proto.advance = function(){
-					this.current = this.current - 1;
-					return !this.eof();
-				};
-
-				proto.getCurrent = function(){
-					return this.current;
-				};
-
-				proto.reset = function(){
-					this.resetWasCalled = true;
-				};
-
-				proto.getDependencies = function(deps){
-					if (!deps.testDep){
-						deps.testDep = 1;
-						return DocumentSource.GetDepsReturn.EXHAUSTIVE;
-					}
-					return DocumentSource.GetDepsReturn.SEE_NEXT;
-				};
-
-				klass.createFromJson = function(options, ctx){
-					return new TestDocumentSource(options, ctx);
-				};
-
-				return klass;
-			})().createFromJson;
-
-		},
-
 		"prepareCursorSource": {
 
 			"should place a CursorDocumentSource in pipeline": function () {
-				var p = Pipeline.parseCommand({pipeline:[{$test:{coalesce:false}}, {$test:{coalesce:false}}], aggregate:[]}),
+				var p = Pipeline.parseCommand({pipeline:[{$match:{a:true}}], aggregate:[]}),
 					cs = PipelineD.prepareCursorSource(p, {ns:[1,2,3,4,5]});
 				assert.equal(p.sources[0].constructor, CursorDocumentSource);
 			},
 
 			"should get projection from all sources": function () {
-				var p = Pipeline.parseCommand({pipeline:[{$test:{coalesce:false}}, {$test:{coalesce:false}}], aggregate:[]}),
+				var p = Pipeline.parseCommand({pipeline:[{$project:{a:"$x"}}], aggregate:[]}),
 					cs = PipelineD.prepareCursorSource(p, {ns:[1,2,3,4,5]});
-				assert.deepEqual(p.sources[0]._projection, {"_id":0,"testDep":1});
+				assert.deepEqual(p.sources[0]._projection, {"x":1});
+				assert.deepEqual(p.sources[0]._dependencies, {}); //TODO: what goes here???
 			},
 
 			"should get projection's deps": function () {
@@ -106,6 +47,7 @@ module.exports = {
 				var p = Pipeline.parseCommand(cmdObj),
 					cs = PipelineD.prepareCursorSource(p, {ns:[1,2,3,4,5]});
 				assert.equal(JSON.stringify(p.sources[0]._projection), JSON.stringify({'a.b.c': 1, d: 1, 'e.f.g': 1, _id: 1}));
+				assert.deepEqual(p.sources[0]._dependencies, {}); //TODO: what goes here???
 			},
 
 			"should get group's deps": function(){
@@ -132,7 +74,12 @@ module.exports = {
 				var p = Pipeline.parseCommand(cmdObj),
 					cs = PipelineD.prepareCursorSource(p, {ns:[1,2,3,4,5]});
 				assert.equal(JSON.stringify(p.sources[0]._projection), JSON.stringify({ _id: 0, a: 1, b: 1, 'x.y.z': 1 }));
-			}
+				assert.deepEqual(p.sources[0]._dependencies, {}); //TODO: what goes here???
+			},
+			"should set the queryObj on the Cursor": function(){},
+			"should set the sort on the Cursor": function(){},
+			"should set the sort on the Cursor if there is a match first": function(){},
+			"should coalesce the Cursor with the rest of the pipeline": function(){},
 		}
 	}
 

+ 148 - 0
test/lib/query/DocumentSourceRunner.js

@@ -0,0 +1,148 @@
+"use strict";
+var assert = require("assert"),
+	Runner = require("../../../lib/query/Runner"),
+	CursorDocumentSource = require("../../../lib/pipeline/documentSources/CursorDocumentSource"),
+	LimitDocumentSource = require("../../../lib/pipeline/documentSources/LimitDocumentSource"),
+	MatchDocumentSource = require("../../../lib/pipeline/documentSources/MatchDocumentSource"),
+	ArrayRunner = require("../../../lib/query/ArrayRunner"),
+	DocumentSourceRunner = require("../../../lib/query/DocumentSourceRunner");
+
+
+module.exports = {
+
+	"ArrayRunner": {
+		"#constructor": {
+			"should accept an array of data": function(){
+				var cds = new CursorDocumentSource(null, new ArrayRunner([]), null),
+					pipeline = [];
+				assert.doesNotThrow(function(){
+					var ar = new DocumentSourceRunner(cds, pipeline);
+				});
+			},
+			"should fail if not given a document source or pipeline": function(){
+				var cds = new CursorDocumentSource(null, new ArrayRunner([]), null);
+				
+				assert.throws(function(){
+					var ar = new DocumentSourceRunner();
+				});
+				assert.throws(function(){
+					var ar = new DocumentSourceRunner(123);
+				});
+				assert.throws(function(){
+					var ar = new DocumentSourceRunner(cds, 123);
+				});
+			},
+			"should coalesce the pipeline into the given documentsource": function(){
+				var cds = new CursorDocumentSource(null, new ArrayRunner([]), null),
+					pipeline = [new LimitDocumentSource(3), new MatchDocumentSource({"a":true})],
+					expected = [{$match:{a:true}}];
+				
+				var ds = new DocumentSourceRunner(cds, pipeline);
+				var actual = pipeline.map(function(d){return d.serialize();});
+				
+				assert.deepEqual(expected, actual);
+			}
+		},
+		"#getNext": {
+			"should return the next item in the given documentsource": function(done){
+				var cds = new CursorDocumentSource(null, new ArrayRunner([1,2,3]), null),
+					pipeline = [new LimitDocumentSource(3)];
+				
+				var ds = new DocumentSourceRunner(cds, pipeline);
+				
+				ds.getNext(function(err, out, state){
+					assert.strictEqual(state, Runner.RunnerState.RUNNER_ADVANCED);
+					assert.strictEqual(out, 1);
+					ds.getNext(function(err, out, state){
+						assert.strictEqual(state, Runner.RunnerState.RUNNER_ADVANCED);
+						assert.strictEqual(out, 2);
+						ds.getNext(function(err, out, state){
+							assert.strictEqual(state, Runner.RunnerState.RUNNER_ADVANCED);
+							assert.strictEqual(out, 3);
+							done();
+						});
+					});
+				});
+			},
+			"should return EOF if there is nothing left in the given documentsource": function(done){
+				var cds = new CursorDocumentSource(null, new ArrayRunner([1,2,3]), null),
+					pipeline = [new LimitDocumentSource({}, 1)];
+				
+				var ds = new DocumentSourceRunner(cds, pipeline);
+				
+				ds.getNext(function(err, out, state){
+					assert.strictEqual(state, Runner.RunnerState.RUNNER_ADVANCED);
+					assert.strictEqual(out, 1);
+					ds.getNext(function(err, out, state){
+						assert.strictEqual(state, Runner.RunnerState.RUNNER_EOF);
+						assert.strictEqual(out, null);
+						done();
+					});
+				});
+			}
+		},
+		"#getInfo": {
+			"should return nothing if explain flag is not set": function(){
+				var cds = new CursorDocumentSource(null, new ArrayRunner([1,2,3]), null),
+					pipeline = [new LimitDocumentSource({}, 1)];
+				
+				var ds = new DocumentSourceRunner(cds, pipeline);
+				assert.strictEqual(ds.getInfo(), undefined);
+			},
+			"should return information about the runner if explain flag is set": function(){
+				var cds = new CursorDocumentSource(null, new ArrayRunner([1,2,3]), null),
+					pipeline = [new LimitDocumentSource({}, 1)];
+				var ds = new DocumentSourceRunner(cds, pipeline);
+				
+				assert.deepEqual(ds.getInfo(true), {
+					"type": "DocumentSourceRunner",
+					"docSrc": {
+						"$cursor": {
+							"query": undefined,
+							"sort": null,
+							"limit": 1,
+							"fields": null,
+							"plan": {
+								"type": "ArrayRunner",
+								"nDocs": 3,
+								"position": 0,
+								"state": "RUNNER_ADVANCED"
+							}
+						}
+					},
+					"state": "RUNNER_ADVANCED"
+				});
+			}
+		},
+		"#reset": {
+			"should dispose of the documentSource": function(){
+				var cds = new CursorDocumentSource(null, new ArrayRunner([1,2,3]), null),
+					pipeline = [new LimitDocumentSource({}, 1)];
+				var ds = new DocumentSourceRunner(cds, pipeline);
+				
+				ds.reset();
+				assert.deepEqual(ds.getInfo(true), {
+					"type": "DocumentSourceRunner",
+					"docSrc": {
+						"$cursor": {
+							"query": undefined,
+							"sort": null,
+							"limit": 1,
+							"fields": null,
+							"plan": {
+								"type": "ArrayRunner",
+								"nDocs": 0,
+								"position": 0,
+								"state": "RUNNER_DEAD"
+							}
+						}
+					},
+					"state": "RUNNER_DEAD"
+				});
+			}
+		}
+	}
+
+};
+
+if (!module.parent)(new(require("mocha"))()).ui("exports").reporter("spec").addFile(__filename).run();