Prechádzať zdrojové kódy

refs #1783: Added to munge.

http://source.rd.rcg.local/trac/eagle6/changeset/1444/Eagle6_SVN
Jared Hall 12 rokov pred
rodič
commit
3f1ba48789

+ 3 - 1
lib/pipeline/Pipeline.js

@@ -12,7 +12,8 @@ var Pipeline = module.exports = (function(){
 		SkipDocumentSource = require('./documentSources/SkipDocumentSource'),
 		UnwindDocumentSource = require('./documentSources/UnwindDocumentSource'),
 		GroupDocumentSource = require('./documentSources/GroupDocumentSource'),
-		SortDocumentSource = require('./documentSources/SortDocumentSource');
+		SortDocumentSource = require('./documentSources/SortDocumentSource'),
+    SplitDocumentSource = require('./documentSources/SplitDocumentSource');
 	
 	klass.StageDesc = {};//attaching this to the class for test cases
 	klass.StageDesc[LimitDocumentSource.limitName] = LimitDocumentSource.createFromJson;
@@ -22,6 +23,7 @@ var Pipeline = module.exports = (function(){
 	klass.StageDesc[UnwindDocumentSource.unwindName] = UnwindDocumentSource.createFromJson;
 	klass.StageDesc[GroupDocumentSource.groupName] = GroupDocumentSource.createFromJson;
 	klass.StageDesc[SortDocumentSource.sortName] = SortDocumentSource.createFromJson;
+  klass.StageDesc[SplitDocumentSource.splitName] = SplitDocumentSource.createFromJson;
 	
     /**
      * Create a pipeline from the command.

+ 168 - 0
lib/pipeline/documentSources/SplitDocumentSource.js

@@ -0,0 +1,168 @@
+var SplitDocumentSource = module.exports = (function(){
+	// CONSTRUCTOR
+	/**
+	 * A document source sorter
+	 *
+	 * Since we don't have shards, this inherits from DocumentSource, instead of SplittableDocumentSource
+	 * 
+	 * @class SortDocumentSource
+	 * @namespace munge.pipeline.documentsource
+	 * @module munge
+	 * @constructor
+	**/
+	var klass = module.exports = SplitDocumentSource = function SplitDocumentSource(/* pCtx*/){
+		if(arguments.length !== 0) throw new Error("zero args expected");
+		base.call(this);
+		/*
+		* Before returning anything, this source must fetch everything from
+		* the underlying source and group it.  populate() is used to do that
+		* on the first call to any method on this source.  The populated
+		* boolean indicates that this has been done
+		**/
+		this.populated = false;
+		this.current = null;
+		this.docIterator = null; // a number tracking our position in the documents array
+		this.documents = []; // an array of documents
+		this.pipelines = {};
+
+	}, base = require('./DocumentSource'), proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
+
+
+	klass.splitName = "$split";
+	proto.getSourceName = function getSourceName(){
+		return klass.splitName;
+	};
+	
+	proto.getFactory = function getFactory(){
+		return klass;	// using the ctor rather than a separate .create() method
+	};
+
+	/**
+	 * Is the source at EOF?
+	 * 
+	 * @method	eof
+	 * @return {bool} return if we have hit the end of input
+	**/
+	proto.eof = function eof() {
+		if (!this.populated)
+			this.populate();
+		return (this.docIterator == this.documents.length);
+	};
+
+	/**
+	 * some implementations do the equivalent of verify(!eof()) so check eof() first
+	 * 
+	 * @method	getCurrent
+	 * @returns	{Document}	the current Document without advancing
+	**/
+	proto.getCurrent = function getCurrent() {
+		if (!this.populated)
+			this.populate();
+		return this.current;
+	};
+
+	/**
+	 * Advance the state of the DocumentSource so that it will return the next Document.  
+	 * The default implementation returns false, after checking for interrupts. 
+	 * Derived classes can call the default implementation in their own implementations in order to check for interrupts.
+	 * 
+	 * @method	advance
+	 * @returns	{Boolean}	whether there is another document to fetch, i.e., whether or not getCurrent() will succeed.  This default implementation always returns false.
+	**/
+	proto.advance = function advance() {
+		base.prototype.advance.call(this); // check for interrupts
+
+		if (!this.populated)
+			this.populate();
+
+		if (this.docIterator == this.documents.length) throw new Error("This should never happen");
+		++this.docIterator;
+
+		if (this.docIterator == this.documents.length) {
+			this.current = null;
+			return false;
+		}
+		this.current = this.documents[this.docIterator];
+		return true;
+	};
+
+	/**
+	 * Create an object that represents the document source.  The object
+	 * will have a single field whose name is the source's name.  This
+	 * will be used by the default implementation of addToJsonArray()
+	 * to add this object to a pipeline being represented in JSON.
+	 * 
+	 * @method	sourceToJson
+	 * @param	{Object} builder	JSONObjBuilder: a blank object builder to write to
+	 * @param	{Boolean}	explain	create explain output
+	**/
+	proto.sourceToJson = function sourceToJson(builder, explain) {
+		builder.$split = {}; // TODO: this is the default for split but it may need to have a key? 
+	};
+
+
+
+	proto.populate = function populate() {
+		/* pull everything from the underlying source */
+		for(var hasNext = !this.pSource.eof(); hasNext; hasNext = this.pSource.advance()) {
+			var doc = this.pSource.getCurrent();
+			this.documents.push(doc);
+		}
+		
+		var splitDocument = {};
+		for(var pipelineKey in this.pipelines){
+			var pipeline = this.pipelines[pipelineKey],
+				result = {};
+			result.ok = pipeline.run(this.documents, result);
+			splitDocument[pipelineKey] = result.result;
+		}
+
+		//"Join" all documents by placing the various pipeline results as the only doc in this.documents
+		this.documents = [splitDocument];
+
+		this.docIterator = 0;
+		if (this.docIterator < this.documents.length)
+			this.current = this.documents[this.docIterator];
+		this.populated = true;
+	};
+
+
+
+
+
+	/**
+	 * Creates a new SortDocumentSource 
+	 *
+	 * @param {Object} JsonElement
+	**/
+	klass.createFromJson = function createFromJson(jsonElement) {
+		if (typeof jsonElement !== "object") throw new Error("code 15973; the " + klass.sortName + " key specification must be an object");
+		
+
+		var split = new SplitDocumentSource(),
+			splitKeys = 0,
+			PipelineCommand = require('../../commands/PipelineCommand');
+		for(var key in jsonElement) {
+			split.pipelines[key] = new PipelineCommand(jsonElement[key]);
+			++splitKeys;
+		}
+
+		if ( splitKeys <= 0) throw new Error("code 15977; " + klass.splitName + " must have at least one split key");
+		return split;
+	};
+	
+	/**
+	 * Reset the document source so that it is ready for a new stream of data.
+	 * Note that this is a deviation from the mongo implementation.
+	 * 
+	 * @method	reset
+	**/
+	proto.reset = function reset(){
+		this.populated = false;
+		this.current = null;
+		this.docIterator = null; // a number tracking our position in the documents array
+		this.documents = []; // an array of documents
+	};
+
+	return klass;
+})();

+ 14 - 1
test/lib/munge.js

@@ -196,7 +196,20 @@ module.exports = {
 			//assert.deepEqual(a, e); //does not work with NaN
 			assert.equal(JSON.stringify(munger(i)), JSON.stringify(e), "Reuse of munger should yield the same results!");
 			assert.equal(JSON.stringify(munge(p, i)), JSON.stringify(e), "Alternate use of munge should yield the same results!");
-		}
+		},
+
+		"should be able to use a $split operator": function(){
+			var i = [{_id:0, a:1}, {_id:1, a:1}, {_id:2, a:1}, {_id:3, a:1}, {_id:4, a:1}, {_id:5, a:1}],
+				p = [{$match:{_id:0}}, {$split:{aX2:[{$project:{a:{$multiply:["$a", 2]}}}], aX3:[{$project:{a:{$multiply:["$a", 3]}}}]}}, {$unwind:"$aX2"}, {$unwind:"$aX3"}],
+				//e = [{aX2:[{_id:0, a:2}], aX3:[{_id:0, a:3}]}],
+				e = [{aX2:{_id:0, a:2}, aX3:{_id:0, a:3}}],
+				munger = munge(p),
+				a = munger(i);
+			assert.equal(JSON.stringify(a), JSON.stringify(e), "Unexpected value!");
+			assert.deepEqual(a, e, "Unexpected value (not deepEqual)!");
+			assert.equal(JSON.stringify(munger(i)), JSON.stringify(e), "Reuse of munger should yield the same results!");
+			assert.equal(JSON.stringify(munge(p, i)), JSON.stringify(e), "Alternate use of munge should yield the same results!");
+		},
 
 	}
 

+ 153 - 0
test/lib/pipeline/documentSources/SplitDocumentSource.js

@@ -0,0 +1,153 @@
+var assert = require("assert"),
+	SplitDocumentSource = require("../../../../lib/pipeline/documentSources/SplitDocumentSource"),
+	CursorDocumentSource = require("../../../../lib/pipeline/documentSources/CursorDocumentSource"),
+	Cursor = require("../../../../lib/Cursor");
+
+module.exports = {
+
+	"SplitDocumentSource": {
+
+		"constructor()": {
+
+			"should not throw Error when constructing without args": function testConstructor(){
+				assert.doesNotThrow(function(){
+					new SplitDocumentSource();
+				});
+			},
+
+			"should throw an error if called with arguments.length > 0": function throwsWithArgs(){
+				assert.throws(function(){
+					new SplitDocumentSource(1);
+				});
+			}
+
+		},
+
+		"#getSourceName()": {
+
+            "should return the correct source name; $split": function testSourceName(){
+                var pds = new SplitDocumentSource();
+                assert.strictEqual(pds.getSourceName(), SplitDocumentSource.splitName);
+            }
+
+        },
+
+        "#eof()": {
+
+			"shouldn't be eof after init": function testEOF(){
+				var cwc = new CursorDocumentSource.CursorWithContext();
+				cwc._cursor = new Cursor( [{a: 1}] );
+				var cds = new CursorDocumentSource(cwc);
+				var split = new SplitDocumentSource();
+				split.setSource(cds);
+				assert.ok(!split.eof());
+			},
+
+            "should be eof after one call to get current": function testAdvanceFirst() {
+                var cwc = new CursorDocumentSource.CursorWithContext();
+                var input = [{_id: 0, a: 1}, {_id: 1, a: 2}];
+                cwc._cursor = new Cursor( input );
+                var cds = new CursorDocumentSource(cwc);
+                var split = new SplitDocumentSource();
+                split.setSource(cds);
+                assert.ok(split.getCurrent()); 
+                assert.ok(split.eof);
+            }
+
+        },
+
+        "#advance()": {
+
+            "can't advance after one call to getCurrent": function testAdvanceFirst() {
+                var cwc = new CursorDocumentSource.CursorWithContext();
+                var input = [{_id: 0, a: 1}, {_id: 1, a: 2}];
+                cwc._cursor = new Cursor( input );
+                var cds = new CursorDocumentSource(cwc);
+                var split = new SplitDocumentSource();
+                split.setSource(cds);
+                assert.ok(split.getCurrent()); 
+                assert.ok(!split.advance());
+            },
+
+			"throws exception if advanced beyond eof": function throwsBeyondEof() {
+				assert.throws(function() {
+					var cwc = new CursorDocumentSource.CursorWithContext();
+					var input = [{_id: 0, a: 1}, {_id: 1, a: 2}];
+					cwc._cursor = new Cursor( input );
+					var cds = new CursorDocumentSource(cwc);
+					var split = new SplitDocumentSource();
+					split.setSource(cds);
+					split.getCurrent(); 
+					split.advance();
+					split.advance();
+				});
+			}
+		},
+
+		"#populate()": function testPopulate() {
+			var spec = {
+				aX2:[{$project:{a:{$multiply:["$a", 2]}}}],
+				aX3:[{$project:{a:{$multiply:["$a", 3]}}}]
+			};
+				
+			var cwc = new CursorDocumentSource.CursorWithContext();
+			var input = [{a:1}, {a:2}];
+			cwc._cursor = new Cursor( input );
+			var cds = new CursorDocumentSource(cwc);
+			var split = SplitDocumentSource.createFromJson(spec);
+			split.setSource(cds);
+			
+			assert.ok(!split.eof());
+			assert.deepEqual({aX2:[{a:2}, {a:4}], aX3:[{a:3}, {a:6}]}, split.getCurrent());
+			/*
+			assert.ok(!split.getCurrent().b);
+			assert.ok(split.advance());
+			assert.ok(!split.eof());
+			assert.equal(3, split.getCurret().a); 
+			assert.ok(!split.getCurrent().b);
+			assert.ok(!split.advance());
+			assertExhausted(split);
+			*/
+		},
+
+		"#createFromJson()": {
+
+            "should error if called with non-object": function testNonObjectPassed() {
+                //String as arg
+                assert.throws(function() {
+                    var split = SplitDocumentSource.createFromJson("not an object");
+                });
+                //Date as arg
+                assert.throws(function() {
+					var split = SplitDocumentSource.createFromJson(new Date());
+                });
+                //Array as arg
+                assert.throws(function() {
+					var split = SplitDocumentSource.createFromJson([]);
+                });
+                //Empty args
+                assert.throws(function() {
+					var split = SplitDocumentSource.createFromJson();
+                });
+            },
+
+            "should error if spec has no keys": function testNoKeys() {
+				assert.throws(function() {
+					var split = SplitDocumentSource.createFromJson({});
+                });
+            },
+
+            "should error if value of a key in top level is not an array": function testNoKeys() {
+                assert.throws(function() {
+                    var split = SplitDocumentSource.createFromJson({a: "not an array"});
+                });
+            }
+
+        },
+
+	}
+
+};
+
+if (!module.parent)(new(require("mocha"))()).ui("exports").reporter("spec").addFile(__filename).run(process.exit);
+