Explorar o código

EAGLESIX-3087: Add Optimizations.Sharded functions to Pipeline.js, added comment to OutDocumentSource.js

Jason Walton %!s(int64=11) %!d(string=hai) anos
pai
achega
bcf8b1ebee

+ 90 - 5
lib/pipeline/Pipeline.js

@@ -50,6 +50,7 @@ klass.nStageDesc = Object.keys(klass.stageDesc).length;
 
 klass.optimizations = {};
 klass.optimizations.local = {};
+klass.optimizations.sharded = {};
 
 /**
  * Moves $match before $sort when they are placed next to one another
@@ -175,6 +176,89 @@ klass.optimizations.local.duplicateMatchBeforeInitalRedact = function duplicateM
 	}
 };
 
+//SKIPPED: addRequiredPrivileges
+
+/**
+ * Perform optimizations for a pipeline through sharding
+ * @method splitForSharded
+ **/
+proto.splitForSharded = function splitForSharded() {
+	var shardPipeline = new Pipeline(ctx);
+	shardPipeline.explain = this.explain;
+
+	klass.optimizations.sharded.findSplitPoint(shardPipeline, this);
+	klass.optimizations.sharded.moveFinalUnwindFromShardsToMerger(shardPipeline, this);
+	klass.optimizations.sharded.limitFieldsSentFromShardsToMerger(shardPipeline, this);
+};
+
+/**
+ * Split the source into Merge sources and Shard sources
+ * @static
+ * @method findSplitPoint
+ * @param shardPipe Shard sources
+ * @param mergePipe Merge sources
+ */
+klass.optimizations.sharded.findSplitPoint = function findSplitPoint(shardPipe, mergePipe) {
+	while(mergePipe.sources.length > 0) {
+		var current = mergePipe.sources[0];
+		mergePipe.sources.splice(0, 1);
+
+		if (typeof current.isSplittable != "undefined") {
+			shardPipe.sources.push(current);
+		}
+		else {
+			var shardSource = current.getShardSource(),
+				mergeSource = current.getMergeSource();
+			if (typeof shardSource != "undefined") { shardPipe.sources.push(shardSource); }		//push_back
+			if (typeof mergeSource != "undefined") { mergePipe.sources.unshift(mergeSource); }	//push_front
+			break;
+		}
+	}
+};
+
+/**
+ * Optimize pipeline through moving unwind to the end
+ * @static
+ * @method moveFinalUnwindFromShardsToMerger
+ * @param shardPipe shard sources
+ * @param mergePipe merge sources
+ **/
+klass.optimizations.sharded.moveFinalUnwindFromShardsToMerger = function moveFinalUnwindFromShardsToMerger(shardPipe, mergePipe) {
+	while(!shardPipe.sources.length > 0 
+			&& shardPipe.sources[length-1].constructor === UnwindDocumentSource) {
+		mergePipe.sources.unshift(shardPipe.sources[length-1]);
+		shardPipe.sources.pop();
+	}
+};
+
+klass.limitFieldsSentFromShardsToMerger = function limitFieldsSentFromShardsToMerger(shardPipe, mergePipe) {
+	var mergeDeps = mergePipe.getDependencies(shardPipe.getInitialQuery());
+	if (mergeDeps.needWholeDocument) {
+		return;
+	}
+	if (mergeDeps.fields.length == 0) {
+		mergeDeps.fields["_id"] = 0;
+	}
+	//NOTE: Deviation from Mongo: not setting mergeDeps.needTextScore because we aren't handling that (Document meta stuff)
+
+    // HEURISTIC: only apply optimization if none of the shard stages have an exhaustive list of
+    // field dependencies. While this may not be 100% ideal in all cases, it is simple and
+    // avoids the worst cases by ensuring that:
+    // 1) Optimization IS applied when the shards wouldn't have known their exhaustive list of
+    //    dependencies. This situation can happen when a $sort is before the first $project or
+    //    $group. Without the optimization, the shards would have to reify and transmit full
+    //    objects even though only a subset of fields are needed.
+    // 2) Optimization IS NOT applied immediately following a $project or $group since it would
+    //    add an unnecessary project (and therefore a deep-copy).
+    for (var i = 0; i < shardPipe.sources.length; i++) {
+        if (shardPipe.sources.getDependencies() & DocumentSource.GetDepsReturn.EXHAUSTIVE_FIELDS)
+            return;
+    }
+
+    // if we get here, add the project.
+    shardPipe.sources.push(ProjectDocumentSource.createFromJson({"$project": mergeDeps.toProjection()[0]}, shardPipe.ctx));
+};
+
 /**
  * Create an `Array` of `DocumentSource`s from the given JSON pipeline
  * // NOTE: DEVIATION FROM MONGO: split out into a separate function to better allow extensions (was in parseCommand)
@@ -199,7 +283,7 @@ klass.parseDocumentSources = function parseDocumentSources(pipeline, ctx){
 
 		// Create a DocumentSource pipeline stage from 'stageSpec'.
 		var desc = klass.stageDesc[stageName];
-		if (!desc) throw new Error("Unrecognized pipeline stage name: '" + stageName + "'; code 16435");
+		if (!desc) throw new Error("Unrecognized pipeline stage name: '" + stageName + "'; code 16436");
 
 		// Parse the stage
 		var stage = desc(stageSpec, ctx);
@@ -207,7 +291,7 @@ klass.parseDocumentSources = function parseDocumentSources(pipeline, ctx){
 		sources.push(stage);
 
 		if(stage.constructor === OutDocumentSource && iStep !== nSteps - 1) {
-			throw new Error("$out can only be the final stage in the pipeline; code 16435");
+			throw new Error("$out can only be the final stage in the pipeline; code 16991");
 		}
 	}
 	return sources;
@@ -252,7 +336,7 @@ klass.parseCommand = function parseCommand(cmdObj, ctx){
 	 * Set up the specified document source pipeline.
 	 **/
 	// NOTE: DEVIATION FROM MONGO: split this into a separate function to simplify and better allow for extensions (now in parseDocumentSources)
-	var sources = pipelineInst.sources = Pipeline.parseDocumentSources(pipeline, ctx);
+	pipelineInst.sources = Pipeline.parseDocumentSources(pipeline, ctx);
 
 	klass.optimizations.local.moveMatchBeforeSort(pipelineInst);
 	klass.optimizations.local.moveLimitBeforeSkip(pipelineInst);
@@ -445,8 +529,9 @@ proto.getDependencies = function getDependencies (initialQuery) {
 
         if (!knowAllFields) {
             //C++ insert this range: deps.fields.insert(localDeps.fields.begin(), localDeps.fields.end());
-        	for (var j = 0; j < localDeps.fields.length; j++) {
-        		deps.fields[localDeps.fields[i].fieldName] = localDeps.fields[i];
+        	for (var j = 0; j < localDeps.fields.keys.length; j++) {
+        		var key = localDeps.fields.keys[i];
+        		deps.fields[key] = localDeps.fields[key];
         	}
             if (localDeps.needWholeDocument)
                 deps.needWholeDocument = true;

+ 2 - 0
lib/pipeline/documentSources/OutDocumentSource.js

@@ -48,6 +48,8 @@ klass.createFromJson = function(jsonElement, ctx) {
 };
 
 // SplittableDocumentSource implementation.
+// Mongo doesn't fully implement SplittableDocumentSource on DocumentSourceOut.
+//	It doesn't implement getShardSource or getMergeSource
 klass.isSplittableDocumentSource = true;
 
 //NeedsMongodDocumentSource implementation

+ 5 - 5
test/lib/pipeline/Pipeline.js

@@ -90,8 +90,8 @@ module.exports = {
 					{$sort: {"xyz": 1}},
 					{$match: {}}
 				]});
-				assert.equal(p.sourceVector[0].constructor.matchName, "$match");
-				assert.equal(p.sourceVector[1].constructor.sortName, "$sort");
+				assert.equal(p.sources[0].constructor.matchName, "$match");
+				assert.equal(p.sources[1].constructor.sortName, "$sort");
 			},
 
 			"should attempt to coalesce all sources": function () {
@@ -101,8 +101,8 @@ module.exports = {
 					{$test: {coalesce: false}},
 					{$test: {coalesce: false}}
 				]});
-				assert.equal(p.sourceVector.length, 3);
-				p.sourceVector.slice(0, -1).forEach(function (source) {
+				assert.equal(p.sources.length, 3);
+				p.sources.slice(0, -1).forEach(function (source) {
 					assert.equal(source.coalesceWasCalled, true);
 				});
 				assert.equal(p.sources[p.sources.length -1].coalesceWasCalled, false);
@@ -113,7 +113,7 @@ module.exports = {
 					{$test: {coalesce: false}},
 					{$test: {coalesce: false}}
 				]});
-				p.sourceVector.forEach(function (source) {
+				p.sources.forEach(function (source) {
 					assert.equal(source.optimizeWasCalled, true);
 				});
 			}