Browse Source

EAGLESIX-812: added better error handling and test cases to document sources

Phil Murray 11 years ago
parent
commit
699c520c1f

+ 43 - 35
lib/pipeline/documentSources/GroupDocumentSource.js

@@ -108,14 +108,20 @@ proto.getNext = function getNext(callback) {
 				if(self.currentGroupsKeysIndex === self.groupsKeys.length) {
 					return next(null, null);
 				}
-
-				var id = self.originalGroupsKeys[self.currentGroupsKeysIndex],
-					stringifiedId = self.groupsKeys[self.currentGroupsKeysIndex],
-					accumulators = self.groups[stringifiedId],
+				
+				var out;
+				try {
+					var id = self.originalGroupsKeys[self.currentGroupsKeysIndex],
+						stringifiedId = self.groupsKeys[self.currentGroupsKeysIndex],
+						accumulators = self.groups[stringifiedId];
+						
 					out = self.makeDocument(id, accumulators, self.expCtx.inShard);
 
-				if(++self.currentGroupsKeysIndex === self.groupsKeys.length) {
-					self.dispose();
+					if(++self.currentGroupsKeysIndex === self.groupsKeys.length) {
+						self.dispose();
+					}
+				} catch (ex) {
+					return next(ex);
 				}
 
 				return next(null, out);
@@ -300,44 +306,46 @@ proto.populate = function populate(callback) {
 					input = doc;
 					return cb(); //Need to stop now, no new input
 				}
+				try {
+					input = doc;
+					self.variables.setRoot(input);
 
-				input = doc;
-				self.variables.setRoot(input);
-
-				/* get the _id value */
-				var id = self.computeId(self.variables);
+					/* get the _id value */
+					var id = self.computeId(self.variables);
 
-				if(undefined === id) id = null;
+					if(undefined === id) id = null;
 
-				var groupKey = JSON.stringify(id),
-					group = self.groups[groupKey];
+					var groupKey = JSON.stringify(id),
+						group = self.groups[groupKey];
 
-				if(!group) {
-					self.originalGroupsKeys.push(id);
-					self.groupsKeys.push(groupKey);
-					group = [];
-					self.groups[groupKey] = group;
-					// Add the accumulators
-					for(var afi = 0; afi<self.accumulatorFactories.length; afi++) {
-						group.push(new self.accumulatorFactories[afi]());
+					if(!group) {
+						self.originalGroupsKeys.push(id);
+						self.groupsKeys.push(groupKey);
+						group = [];
+						self.groups[groupKey] = group;
+						// Add the accumulators
+						for(var afi = 0; afi<self.accumulatorFactories.length; afi++) {
+							group.push(new self.accumulatorFactories[afi]());
+						}
 					}
-				}
-				//NOTE: Skipped memory usage stuff for case when group already existed
+					//NOTE: Skipped memory usage stuff for case when group already existed
 
-				if(numAccumulators !== group.length) {
-					throw new Error('Group must have one of each accumulator');
-				}
-
-				//NOTE: passing the input to each accumulator
-				for(var gi=0; gi<group.length; gi++) {
-					group[gi].process(self.expressions[gi].evaluate(self.variables, self.doingMerge));
-				}
+					if(numAccumulators !== group.length) {
+						throw new Error('Group must have one of each accumulator');
+					}
 
-				// We are done with the ROOT document so release it.
-				self.variables.clearRoot();
+					//NOTE: passing the input to each accumulator
+					for(var gi=0; gi<group.length; gi++) {
+						group[gi].process(self.expressions[gi].evaluate(self.variables, self.doingMerge));
+					}
 
-				//NOTE: Skipped the part about sorted files
+					// We are done with the ROOT document so release it.
+					self.variables.clearRoot();
 
+					//NOTE: Skipped the part about sorted files
+				} catch (ex) {
+					return cb(ex);
+				}
 				return cb();
 			});
 		},

+ 7 - 3
lib/pipeline/documentSources/MatchDocumentSource.js

@@ -60,9 +60,13 @@ proto.getNext = function getNext(callback) {
 	async.doUntil(
 		function(cb) {
 			self.source.getNext(function(err, doc) {
-				if(err) return callback(err);
-				if (makeReturn(doc) !== undefined) {
-					next = doc;
+				if(err) return cb(err);
+				try {
+					if (makeReturn(doc) !== undefined) {
+						next = doc;
+					}
+				} catch (ex) {
+					return cb(ex);
 				}
 				return cb();
 			});

+ 7 - 3
lib/pipeline/documentSources/ProjectDocumentSource.js

@@ -60,9 +60,13 @@ proto.getNext = function getNext(callback) {
 		 * If we're excluding fields at the top level, leave out the _id if
 		 * it is found, because we took care of it above.
 		 **/
-		self._variables.setRoot(input);
-		self.OE.addToDocument(out, input, self._variables);
-		self._variables.clearRoot();
+		try {
+			self._variables.setRoot(input);
+			self.OE.addToDocument(out, input, self._variables);
+			self._variables.clearRoot();
+		} catch (ex){
+			return callback(ex);
+		}
 
 		return callback(null, out);
 	});

+ 12 - 5
lib/pipeline/documentSources/RedactDocumentSource.js

@@ -44,17 +44,24 @@ proto.getNext = function getNext(callback) {
 				doc = input;
 				if (input === null)
 					return cb();
-				self._variables.setRoot(input);
-				self._variables.setValue(self._currentId, input);
-				var result = self.redactObject();
+				var result;
+				try {
+					self._variables.setRoot(input);
+					self._variables.setValue(self._currentId, input);
+					result = self.redactObject();
+				} catch (ex) {
+					return cb(ex);
+				}
 				if (result !== null)
 					return cb(result); //Using the err argument to pass the result document; this lets us break out without having EOF
 				return cb();
 			});
 		},
 		function(doc) {
-			if (doc)
-				return callback(null, doc);
+			if (doc){
+				if (doc instanceof Error) return callback(doc);
+				else return callback(null, doc);
+			}
 			return callback(null, null);
 		}
 	);

+ 6 - 3
lib/pipeline/documentSources/SortDocumentSource.js

@@ -210,9 +210,12 @@ proto.populate = function populate(callback) {
 				return next !== null;
 			},
 			function(err) {
-				/* sort the list */
-				self.documents.sort(SortDocumentSource.prototype.compare.bind(self));
-
+				try {
+					/* sort the list */
+					self.documents.sort(SortDocumentSource.prototype.compare.bind(self));
+				} catch (ex) {
+					return callback(ex);
+				}
 				/* start the sort iterator */
 				self.docIterator = 0;
 

+ 16 - 6
lib/pipeline/documentSources/UnwindDocumentSource.js

@@ -110,8 +110,14 @@ proto.getNext = function getNext(callback) {
 	}
 
 	var self = this,
-		out = this._unwinder.getNext(),
+		out,
 		exhausted = false;
+		
+	try {
+		out = this._unwinder.getNext();
+	} catch (ex) {
+		return callback(ex);
+	}
 
 	async.until(
 		function () {
@@ -127,11 +133,15 @@ proto.getNext = function getNext(callback) {
 					return cb(err);
 				}
 
-				if (doc === null) {
-					exhausted = true;
-				} else {
-					self._unwinder.resetDocument(doc);
-					out = self._unwinder.getNext();
+				try {
+					if (doc === null) {
+						exhausted = true;
+					} else {
+						self._unwinder.resetDocument(doc);
+						out = self._unwinder.getNext();
+					}
+				} catch (ex) {
+					return cb(ex);
 				}
 
 				return cb();

+ 12 - 0
test/lib/pipeline/documentSources/GroupDocumentSource.js

@@ -301,6 +301,18 @@ module.exports = {
 					spec: {_id:0, first:{$first:"$missing"}},
 					expected: [{_id:0, first:null}]
 				});
+			},
+			
+			"should return errors in the callback": function(done){
+				var gds = GroupDocumentSource.createFromJson({_id:null, sum: {$sum:"$a"}}),
+					next,
+					results = [],
+					cds = new CursorDocumentSource(null, new ArrayRunner([{"a":"foo"}]), null);
+				gds.setSource(cds);
+				gds.getNext(function(err, doc) {
+					assert(err, "Expected Error");
+					done();
+				});
 			}
 		}
 

+ 12 - 1
test/lib/pipeline/documentSources/ProjectDocumentSource.js

@@ -7,7 +7,8 @@ var assert = require("assert"),
 	CursorDocumentSource = require("../../../../lib/pipeline/documentSources/CursorDocumentSource"),
 	ArrayRunner = require("../../../../lib/query/ArrayRunner"),
 	TestBase = require("./TestBase"),
-	And = require("../../../../lib/pipeline/expressions/AndExpression");
+	And = require("../../../../lib/pipeline/expressions/AndExpression"),
+	Add = require("../../../../lib/pipeline/expressions/AddExpression");
 
 
 /**
@@ -65,6 +66,16 @@ module.exports = {
 
 	"#getNext()": {
 
+		"should return errors in the callback": function Errors() {
+			var input = [{_id: 0, a: "foo"}];
+			var cds = new CursorDocumentSource(null, new ArrayRunner(input), null);
+			var pds = ProjectDocumentSource.createFromJson({x:{"$add":["$a", "$a"]}})
+			pds.setSource(cds);
+			pds.getNext(function(err, actual) {
+				assert(err, "Expected error");
+			});
+		},
+		
 		"should return EOF": function testEOF(next) {
 			var pds = createProject({});
 			pds.setSource({

+ 12 - 0
test/lib/pipeline/documentSources/RedactDocumentSource.js

@@ -61,6 +61,18 @@ module.exports = {
 					next();
 				});
 			},
+			"should return Error in callback": function testError(next) {
+				var rds = RedactDocumentSource.createFromJson({$cond:{
+					if:{$gt:[0,{$add:["$a", 3]}]},
+					then:"$$DESCEND",
+					else:"$$PRUNE"
+				}});
+				rds.setSource(createCursorDocumentSource([{a:"foo"}]));
+				rds.getNext(function(err, doc) {
+					assert(err, "Expected Error");
+					next();
+				});
+			},
 
 			"iterator state accessors consistently report the source is exhausted": function assertExhausted() {
 				var input = [{}];