|
@@ -185,19 +185,20 @@ var Pipeline = module.exports = (function(){
|
|
|
if(inputSource && !(inputSource instanceof DocumentSource)) throw new Error("arg `inputSource` must be an instance of DocumentSource");
|
|
|
if(!callback) throw new Error("arg `callback` required");
|
|
|
var self = this;
|
|
|
- inputSource.setSource(undefined, function(err){ //TODO: HACK: temp solution to the fact that we need to initialize our source since we're using setSource as a workaround for the lack of real async cursors
|
|
|
- if(err) return callback(err);
|
|
|
- // chain together the sources we found
|
|
|
- async.eachSeries(self.sourceVector,
|
|
|
- function eachSrc(item, next){
|
|
|
- item.setSource(inputSource, function(err){
|
|
|
- if(err) return next(err);
|
|
|
- inputSource = item;
|
|
|
- return next();
|
|
|
- });
|
|
|
- },
|
|
|
- function doneSrcs(err){ //source is left pointing at the last source in the chain
|
|
|
- if(err) return callback(err);
|
|
|
+ // chain together the sources we found
|
|
|
+ var source = inputSource;
|
|
|
+ async.eachSeries(self.sourceVector,
|
|
|
+ function eachSrc(temp, next){
|
|
|
+ temp.setSource(source, function(err){
|
|
|
+ if (err) return next(err);
|
|
|
+ source = temp;
|
|
|
+ return next();
|
|
|
+ });
|
|
|
+ },
|
|
|
+ function doneSrcs(err){ //source is left pointing at the last source in the chain
|
|
|
+ if (err) return callback(err);
|
|
|
+ inputSource.setSource(undefined, function(err){ //TODO: HACK: temp solution to the fact that we need to initialize our source since we're using setSource as a workaround for the lack of real async cursors
|
|
|
+ if (err) return callback(err);
|
|
|
/*
|
|
|
Iterate through the resulting documents, and add them to the result.
|
|
|
We do this even if we're doing an explain, in order to capture the document counts and other stats.
|
|
@@ -206,10 +207,9 @@ var Pipeline = module.exports = (function(){
|
|
|
// the array in which the aggregation results reside
|
|
|
var resultArray = [];
|
|
|
try{
|
|
|
- for(var hasDoc = !inputSource.eof(); hasDoc; hasDoc = inputSource.advance()) {
|
|
|
- var document = inputSource.getCurrent();
|
|
|
+ for(var hasDoc = !source.eof(); hasDoc; hasDoc = source.advance()) {
|
|
|
+ var document = source.getCurrent();
|
|
|
resultArray.push(document); // add the document to the result set
|
|
|
-
|
|
|
//Commenting out this assertion for munge. MUHAHAHA!!!
|
|
|
// object will be too large, assert. the extra 1KB is for headers
|
|
|
//if(resultArray.len() < BSONObjMaxUserSize - 1024) throw new Error("aggregation result exceeds maximum document size (" + BSONObjMaxUserSize / (1024 * 1024) + "MB); code 16389");
|
|
@@ -222,9 +222,9 @@ var Pipeline = module.exports = (function(){
|
|
|
// ,ok: true; //not actually in here... where does this come from?
|
|
|
};
|
|
|
return callback(null, result);
|
|
|
- }
|
|
|
- );
|
|
|
- });
|
|
|
+ });
|
|
|
+ }
|
|
|
+ );
|
|
|
};
|
|
|
|
|
|
return klass;
|