|
|
@@ -1,5 +1,5 @@
|
|
|
"use strict";
|
|
|
-
|
|
|
+var async = require('async');
|
|
|
/**
|
|
|
* mongodb "commands" (sent via db.$cmd.findOne(...)) subclass to make a command. define a singleton object for it.
|
|
|
* @class Pipeline
|
|
|
@@ -13,7 +13,6 @@ var Pipeline = module.exports = function Pipeline(theCtx){
|
|
|
this.explain = false;
|
|
|
this.splitMongodPipeline = false;
|
|
|
this.ctx = theCtx;
|
|
|
- this.SYNC_MODE = false;
|
|
|
}, klass = Pipeline, base = Object, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
|
|
|
|
|
|
var DocumentSource = require("./documentSources/DocumentSource"),
|
|
|
@@ -360,12 +359,6 @@ klass.parseCommand = function parseCommand(cmdObj, ctx){
|
|
|
return pipelineInst;
|
|
|
};
|
|
|
|
|
|
-// sync callback for Pipeline#run if omitted
|
|
|
-klass.SYNC_CALLBACK = function(err, results){
|
|
|
- if (err) throw err;
|
|
|
- return results.result;
|
|
|
-};
|
|
|
-
|
|
|
function ifError(err) {
|
|
|
if (err) throw err;
|
|
|
}
|
|
|
@@ -434,20 +427,31 @@ proto.stitch = function stitch() {
|
|
|
/**
|
|
|
* Run the pipeline
|
|
|
* @method run
|
|
|
- * @param callback {Function} Optional. Run the pipeline in async mode; callback(err, result)
|
|
|
- * @return result {Object} The result of executing the pipeline
|
|
|
+ * @param callback {Function} gets called once for each document result from the pipeline
|
|
|
*/
|
|
|
proto.run = function run(callback) {
|
|
|
// should not get here in the explain case
|
|
|
if(this.explain) throw new Error("Should not be running a pipeline in explain mode!");
|
|
|
-
|
|
|
- /* NOTE: DEVIATION FROM MONGO SOURCE. WE'RE SUPPORTING SYNC AND ASYNC */
|
|
|
- if(this.SYNC_MODE) {
|
|
|
- callback();
|
|
|
- return this._runSync();
|
|
|
- } else {
|
|
|
- return this._runAsync(callback);
|
|
|
- }
|
|
|
+
|
|
|
+ var doc = null,
|
|
|
+ error = null,
|
|
|
+ finalSource = this._getFinalSource();
|
|
|
+
|
|
|
+ async.doWhilst(
|
|
|
+ function iterator(next){
|
|
|
+ return finalSource.getNext(function (err, obj){
|
|
|
+ callback(err, obj);
|
|
|
+ doc = obj;
|
|
|
+ error = err;
|
|
|
+ next();
|
|
|
+ });
|
|
|
+ },
|
|
|
+ function test(){
|
|
|
+ return doc !== null && !error;
|
|
|
+ },
|
|
|
+ function done(err){
|
|
|
+ //nothing to do here
|
|
|
+ });
|
|
|
};
|
|
|
|
|
|
/**
|
|
|
@@ -460,48 +464,6 @@ proto._getFinalSource = function _getFinalSource() {
|
|
|
return this.sources[this.sources.length - 1];
|
|
|
};
|
|
|
|
|
|
-/**
|
|
|
- * Run the pipeline synchronously
|
|
|
- * @method _runSync
|
|
|
- * @return {Object} The results object {result:resultArray}
|
|
|
- * @private
|
|
|
- */
|
|
|
-proto._runSync = function _runSync(callback) {
|
|
|
- var resultArray = [],
|
|
|
- finalSource = this._getFinalSource(),
|
|
|
- handleErr = function(err) {
|
|
|
- if(err) throw err;
|
|
|
- },
|
|
|
- next;
|
|
|
- while((next = finalSource.getNext(handleErr)) !== DocumentSource.EOF) {
|
|
|
- resultArray.push(next);
|
|
|
- }
|
|
|
- return {result:resultArray};
|
|
|
-};
|
|
|
-
|
|
|
-/**
|
|
|
- * Run the pipeline asynchronously
|
|
|
- * @method _runAsync
|
|
|
- * @param callback {Function} callback(err, resultObject)
|
|
|
- * @private
|
|
|
- */
|
|
|
-proto._runAsync = function _runAsync(callback) {
|
|
|
- var resultArray = [],
|
|
|
- finalSource = this._getFinalSource(),
|
|
|
- gotNext = function(err, doc) {
|
|
|
- if(err) return callback(err);
|
|
|
- if(doc !== DocumentSource.EOF) {
|
|
|
- resultArray.push(doc);
|
|
|
- return setImmediate(function() { //setImmediate to avoid callstack size issues
|
|
|
- finalSource.getNext(gotNext);
|
|
|
- });
|
|
|
- } else {
|
|
|
- return callback(null, {result:resultArray});
|
|
|
- }
|
|
|
- };
|
|
|
- finalSource.getNext(gotNext);
|
|
|
-};
|
|
|
-
|
|
|
/**
|
|
|
* Get the pipeline explanation
|
|
|
* @method writeExplainOps
|