123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960 |
- var su = require("stream-utils");
- var Munger = (function(){
- // CONSTRUCTOR
- var base = Object, proto, klass = function Munger(ops){
- this.ops = typeof(ops) == "object" && typeof(ops.length) === "number" ? ops : Array.prototype.slice.call(arguments, 0);
- this.opStreams = this.ops.map(function opCompiler(op, i){ //TODO: demote to local only?
- if(typeof(op) !== "object")
- throw new Error("pipeline element " + i + " is not an object");
- for(var opName in op) break; // get first key
- if(typeof(op) === "function")
- return su.through(op);
- if(!(opName in klass.ops))
- throw new Error("Unrecognized pipeline op: " + JSON.stringify({opName:opName}));
- var IOp = klass.ops[opName];
- return new IOp(op[opName], i);
- });
- console.log("OPS:", this.ops);
- this.pipeline = new su.PipelineStream(this.opStreams);
- };
- proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
- // STATIC MEMBERS
- // klass.ops = {
- // $skip: SkipOp,
- // $limit: LimitOp,
- // $match: MatchOp,
- // $project: ProjectOp,
- // $unwind: UnwindOp,
- // $group: GroupOp,
- // $sort: SortOp
- // };
- // PROTOTYPE MEMBERS
- proto.execute = function execute(inputs){
- console.debug("\n#execute called with:", inputs);
- var outputs = [];
- //TODO: why does this break things??
- this.pipeline.reset();
- this.pipeline.on("data", function(data){
- console.debug("PIPELINE WRITE TO OUTPUTS:", data);
- outputs.push(data);
- });
- inputs.forEach(this.pipeline.write);
- console.debug("PIPELINE ENDING...");
- this.pipeline.end();
- this.pipeline.reset();
- return outputs;
- };
- return klass;
- })();
- module.exports = function mung(ops, inputs) {
- var munger = new Munger(ops);
- if(inputs)
- return munger.execute(inputs);
- return munger.execute.bind(munger);
- };
|