Explorar el Código

EAGLESIX-3087: Pipeline changes

Jason Walton hace 11 años
padre
commit
e3d8ee89dc
Se han modificado 2 ficheros con 82 adiciones y 21 borrados
  1. 30 20
      lib/pipeline/Pipeline.js
  2. 52 1
      test/lib/pipeline/Pipeline.js

+ 30 - 20
lib/pipeline/Pipeline.js

@@ -57,7 +57,7 @@ klass.optimizations.sharded = {};
  * @static
  * @method moveMatchBeforeSort
  * @param pipelineInst An instance of a Pipeline
- **/
+ */
 klass.optimizations.local.moveMatchBeforeSort = function moveMatchBeforeSort(pipelineInst) {
 	var sources = pipelineInst.sources;
 	debugger
@@ -79,7 +79,7 @@ klass.optimizations.local.moveMatchBeforeSort = function moveMatchBeforeSort(pip
  * @static
  * @method moveLimitBeforeSkip
  * @param pipelineInst An instance of a Pipeline
- **/
+ */
 klass.optimizations.local.moveLimitBeforeSkip = function moveLimitBeforeSkip(pipelineInst) {
 	var sources = pipelineInst.sources;
 	if(sources.length === 0) return;
@@ -112,7 +112,7 @@ klass.optimizations.local.moveLimitBeforeSkip = function moveLimitBeforeSkip(pip
  * @static
  * @method coalesceAdjacent
  * @param pipelineInst An instance of a Pipeline
- **/
+ */
 klass.optimizations.local.coalesceAdjacent = function coalesceAdjacent(pipelineInst) {
 	var sources = pipelineInst.sources;
 	if(sources.length === 0) return;
@@ -149,7 +149,7 @@ klass.optimizations.local.coalesceAdjacent = function coalesceAdjacent(pipelineI
  * @static
  * @method optimizeEachDocumentSource
  * @param pipelineInst An instance of a Pipeline
- **/
+ */
 klass.optimizations.local.optimizeEachDocumentSource = function optimizeEachDocumentSource(pipelineInst) {
 	var sources = pipelineInst.sources;
 	for(var srci = 0, srcn = sources.length; srci < srcn; ++srci) {
@@ -162,7 +162,7 @@ klass.optimizations.local.optimizeEachDocumentSource = function optimizeEachDocu
  * @static
  * @method duplicateMatchBeforeInitalRedact
  * @param pipelineInst An instance of a Pipeline
- **/
+ */
 klass.optimizations.local.duplicateMatchBeforeInitalRedact = function duplicateMatchBeforeInitalRedact(pipelineInst) {
 	var sources = pipelineInst.sources;
 	if(sources.length >= 2 && sources[0].constructor === RedactDocumentSource) {
@@ -181,7 +181,7 @@ klass.optimizations.local.duplicateMatchBeforeInitalRedact = function duplicateM
 /**
  * Perform optimizations for a pipeline through sharding
  * @method splitForSharded
- **/
+ */
 proto.splitForSharded = function splitForSharded() {
 	var shardPipeline = new Pipeline(ctx);
 	shardPipeline.explain = this.explain;
@@ -222,7 +222,7 @@ klass.optimizations.sharded.findSplitPoint = function findSplitPoint(shardPipe,
  * @method moveFinalUnwindFromShardsToMerger
  * @param shardPipe shard sources
  * @param mergePipe merge sources
- **/
+ */
 klass.optimizations.sharded.moveFinalUnwindFromShardsToMerger = function moveFinalUnwindFromShardsToMerger(shardPipe, mergePipe) {
 	while(!shardPipe.sources.length > 0 
 			&& shardPipe.sources[length-1].constructor === UnwindDocumentSource) {
@@ -231,6 +231,13 @@ klass.optimizations.sharded.moveFinalUnwindFromShardsToMerger = function moveFin
 	}
 };
 
+/**
+ * Optimize pipeline by adding $project stage if shard fields are not exhaustive
+ * @static
+ * @method limitFieldsSentFromShardsToMerger
+ * @param shardPipe shard sources
+ * @param mergePipe merge sources
+ */
 klass.limitFieldsSentFromShardsToMerger = function limitFieldsSentFromShardsToMerger(shardPipe, mergePipe) {
 	var mergeDeps = mergePipe.getDependencies(shardPipe.getInitialQuery());
 	if (mergeDeps.needWholeDocument) {
@@ -266,7 +273,7 @@ klass.limitFieldsSentFromShardsToMerger = function limitFieldsSentFromShardsToMe
  * @method parseDocumentSources
  * @param pipeline  {Array}  The JSON pipeline
  * @returns {Array}  The parsed `DocumentSource`s
- **/
+ */
 klass.parseDocumentSources = function parseDocumentSources(pipeline, ctx){
 	var sources = [];
 	debugger
@@ -309,12 +316,14 @@ klass.parseDocumentSources = function parseDocumentSources(pipeline, ctx){
  * @param   cmdObj.splitMongodPipeline	{Boolean}  should split?
  * @param ctx     {Object}  Not used yet in mungedb-aggregate
  * @returns	{Array}	the pipeline, if created, otherwise a NULL reference
- **/
+ */
 klass.parseCommand = function parseCommand(cmdObj, ctx){
 	var pipelineNamespace = require("./"),
 		Pipeline = pipelineNamespace.Pipeline,	// using require in case Pipeline gets replaced with an extension
 		pipelineInst = new Pipeline(ctx);
 
+	debugger;
+	
 	//gather the specification for the aggregation
 	var pipeline;
 	for(var fieldName in cmdObj){
@@ -334,10 +343,11 @@ klass.parseCommand = function parseCommand(cmdObj, ctx){
 	/**
 	 * If we get here, we've harvested the fields we expect for a pipeline
 	 * Set up the specified document source pipeline.
-	 **/
+	 */
 	// NOTE: DEVIATION FROM MONGO: split this into a separate function to simplify and better allow for extensions (now in parseDocumentSources)
+	debugger;
 	pipelineInst.sources = Pipeline.parseDocumentSources(pipeline, ctx);
-
+	debugger;
 	klass.optimizations.local.moveMatchBeforeSort(pipelineInst);
 	klass.optimizations.local.moveLimitBeforeSkip(pipelineInst);
 	klass.optimizations.local.coalesceAdjacent(pipelineInst);
@@ -363,7 +373,7 @@ function ifError(err) {
  * @param	inputSource		{DocumentSource}	The input document source for the pipeline
  * @param	[callback]		{Function}			Optional callback function if using async extensions
  * @return {Object}	An empty object or the match spec
-**/
+ */
 proto.getInitialQuery = function getInitialQuery() {
 	var sources = this.sources;
 	if(sources.length === 0) {
@@ -383,7 +393,7 @@ proto.getInitialQuery = function getInitialQuery() {
  * @param	inputSource		{DocumentSource}	The input document source for the pipeline
  * @param	[callback]		{Function}			Optional callback function if using async extensions
  * @return {Object}	An empty object or the match spec
-**/
+ */
 proto.serialize = function serialize() {
 	var serialized = {},
 		array = [];
@@ -404,7 +414,7 @@ proto.serialize = function serialize() {
 /**
  * Points each source at its previous source
  * @method stitch
-**/
+ */
 proto.stitch = function stitch() {
 	if(this.sources.length <= 0) throw new Error("should not have an empty pipeline; massert code 16600");
 
@@ -422,7 +432,7 @@ proto.stitch = function stitch() {
  * @method run
  * @param callback {Function} Optional. Run the pipeline in async mode; callback(err, result)
  * @return result {Object} The result of executing the pipeline
-**/
+ */
 proto.run = function run(callback) {
 	// should not get here in the explain case
 	if(this.explain) throw new Error("Should not be running a pipeline in explain mode!");
@@ -441,7 +451,7 @@ proto.run = function run(callback) {
  * @method _getFinalSource
  * @return {Object}		The DocumentSource at the end of the pipeline
  * @private
-**/
+ */
 proto._getFinalSource = function _getFinalSource() {
 	return this.sources[this.sources.length - 1];
 };
@@ -451,7 +461,7 @@ proto._getFinalSource = function _getFinalSource() {
  * @method _runSync
  * @return {Object}		The results object {result:resultArray}
  * @private
-**/
+ */
 proto._runSync = function _runSync(callback) {
 	var resultArray = [],
 		finalSource = this._getFinalSource(),
@@ -470,7 +480,7 @@ proto._runSync = function _runSync(callback) {
  * @method _runAsync
  * @param callback {Function} callback(err, resultObject)
  * @private
-**/
+ */
 proto._runAsync = function _runAsync(callback) {
 	var resultArray = [],
 		finalSource = this._getFinalSource(),
@@ -492,7 +502,7 @@ proto._runAsync = function _runAsync(callback) {
  * Get the pipeline explanation
  * @method writeExplainOps
  * @return {Array}	An array of source explanations
-**/
+ */
 proto.writeExplainOps = function writeExplainOps() {
 	var array = [];
 	this.sources.forEach(function(source) {
@@ -505,7 +515,7 @@ proto.writeExplainOps = function writeExplainOps() {
  * Set the source of documents for the pipeline
  * @method addInitialSource
  * @param source {DocumentSource}
-**/
+ */
 proto.addInitialSource = function addInitialSource(source) {
 	this.sources.unshift(source);
 };

+ 52 - 1
test/lib/pipeline/Pipeline.js

@@ -2,7 +2,31 @@
 var assert = require("assert"),
 	Pipeline = require("../../../lib/pipeline/Pipeline"),
 	FieldPath = require("../../../lib/pipeline/FieldPath"),
-	DocumentSource = require('../../../lib/pipeline/documentSources/DocumentSource');
+	DocumentSource = require('../../../lib/pipeline/documentSources/DocumentSource'),
+	CursorDocumentSource = require("../../../lib/pipeline/documentSources/CursorDocumentSource"),
+	ArrayRunner = require("../../../lib/query/ArrayRunner");
+
+var addSource = function addSource(match, data) {
+	var cds = new CursorDocumentSource(null, new ArrayRunner(data), null);
+	match.setSource(cds);
+};
+
+var shardedTest = function(inputPipeString, expectedMergePipeString, expectedShardPipeString) {
+	var inputPipe = JSON.parse(inputPipeString),
+		expectedMergePipe = JSON.parse(expectedMergePipeString),
+		expectedShardPipe = JSON.parse(expectedShardPipeString);
+
+	var mergePipe = Pipeline.parseCommand(inputPipe, {});
+	assert.notEqual(mergePipe, null);
+
+	var shardPipe = mergePipe.splitForSharded();
+	assert.notEqual(shardPipe, null);
+
+	assert.equal(shardPipe.serialize()["pipeline"],
+		expectedShardPipe["pipeline"]);
+	assert.equal(mergePipe.serialize()["pipeline"],
+		expectedMergePipe["pipeline"]);
+};
 
 module.exports = {
 
@@ -86,6 +110,7 @@ module.exports = {
 			},
 
 			"should swap $match and $sort if the $match immediately follows the $sort": function () {
+				debugger;
 				var p = Pipeline.parseCommand({pipeline: [
 					{$sort: {"xyz": 1}},
 					{$match: {}}
@@ -116,6 +141,32 @@ module.exports = {
 				p.sources.forEach(function (source) {
 					assert.equal(source.optimizeWasCalled, true);
 				});
+			},
+
+			"should duplicate match before redact": function () {
+
+			},
+
+			"should handle empty pipeline for sharded": function () {
+				var inputPipe = "{[]}",
+					expectedMergePipe = "{[]}",
+					expectedShardPipe = "{[]}";
+				shardedTest(inputPipe, expectedMergePipe, expectedShardPipe);
+			},
+
+			"should handle one unwind": function () {
+				var inputPipe = "[{$unwind: '$a'}]}",
+					expectedMergePipe = "[]}",
+					expectedShardPipe = "[{$unwind: '$a'}]}";
+				shardedTest(inputPipe, expectedMergePipe, expectedShardPipe);
+			},
+
+			"should handle two unwinds": function () {
+				var inputPipe = "[{$unwind: '$a'}, {$unwind: '$b'}]}",
+					expectedMergePipe = "[]}",
+					expectedShardPipe = "[{$unwind: '$a'}, {$unwind: '$b'}]}";
+				shardedTest(inputPipe, expectedMergePipe, expectedShardPipe);
+
 			}
 
 		},