munge.js 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. var su = require("stream-utils");
  2. var Munger = (function(){
  3. // CONSTRUCTOR
  4. var base = Object, proto, klass = function Munger(ops){
  5. this.ops = typeof(ops) == "object" && typeof(ops.length) === "number" ? ops : Array.prototype.slice.call(arguments, 0);
  6. this.opStreams = this.ops.map(function opCompiler(op, i){ //TODO: demote to local only?
  7. if(typeof(op) !== "object")
  8. throw new Error("pipeline element " + i + " is not an object");
  9. for(var opName in op) break; // get first key
  10. if(typeof(op) === "function")
  11. return su.through(op);
  12. if(!(opName in klass.ops))
  13. throw new Error("Unrecognized pipeline op: " + JSON.stringify({opName:opName}));
  14. var IOp = klass.ops[opName];
  15. return new IOp(op[opName], i);
  16. });
  17. console.log("OPS:", this.ops);
  18. this.pipeline = new su.PipelineStream(this.opStreams);
  19. };
  20. proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
  21. // STATIC MEMBERS
  22. // klass.ops = {
  23. // $skip: SkipOp,
  24. // $limit: LimitOp,
  25. // $match: MatchOp,
  26. // $project: ProjectOp,
  27. // $unwind: UnwindOp,
  28. // $group: GroupOp,
  29. // $sort: SortOp
  30. // };
  31. // PROTOTYPE MEMBERS
  32. proto.execute = function execute(inputs){
  33. console.debug("\n#execute called with:", inputs);
  34. var outputs = [];
  35. //TODO: why does this break things??
  36. this.pipeline.reset();
  37. this.pipeline.on("data", function(data){
  38. console.debug("PIPELINE WRITE TO OUTPUTS:", data);
  39. outputs.push(data);
  40. });
  41. inputs.forEach(this.pipeline.write);
  42. console.debug("PIPELINE ENDING...");
  43. this.pipeline.end();
  44. this.pipeline.reset();
  45. return outputs;
  46. };
  47. return klass;
  48. })();
  49. module.exports = function mung(ops, inputs) {
  50. var munger = new Munger(ops);
  51. if(inputs)
  52. return munger.execute(inputs);
  53. return munger.execute.bind(munger);
  54. };