PipelineD.js 3.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. "use strict";
  2. /**
  3. * Pipeline helper for reading data
  4. * @class PipelineD
  5. * @namespace mungedb-aggregate.pipeline
  6. * @module mungedb-aggregate
  7. * @constructor
  8. **/
  9. var PipelineD = module.exports = function PipelineD(){
  10. if(this.constructor == PipelineD) throw new Error("Never create instances of this! Use the static helpers only.");
  11. }, klass = PipelineD, base = Object, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
  12. // DEPENDENCIES
  13. var DocumentSource = require('./documentSources/DocumentSource'),
  14. CursorDocumentSource = require('./documentSources/CursorDocumentSource'),
  15. Cursor = require('../Cursor');
  16. /**
  17. * Create a Cursor wrapped in a DocumentSourceCursor, which is suitable to be the first source for a pipeline to begin with.
  18. * This source will feed the execution of the pipeline.
  19. *
  20. * //NOTE: Not doing anything here, as we don't use any of these cursor source features
  21. * //NOTE: DEVIATION FROM THE MONGO: We don't have special optimized cursors; You could support something similar by overriding `Pipeline#run` to call `DocumentSource#coalesce` on the `inputSource` if you really need it.
  22. *
  23. * This method looks for early pipeline stages that can be folded into
  24. * the underlying cursor, and when a cursor can absorb those, they
  25. * are removed from the head of the pipeline. For example, an
  26. * early match can be removed and replaced with a Cursor that will
  27. * do an index scan.
  28. *
  29. * @param pipeline {Pipeline} the logical "this" for this operation
  30. * @param ctx {Object} Context for expressions
  31. * @returns {CursorDocumentSource} the cursor that was created
  32. **/
  33. klass.prepareCursorSource = function prepareCursorSource(pipeline, expCtx){
  34. var sources = pipeline.sources;
  35. // NOTE: SKIPPED: look for initial match
  36. // NOTE: SKIPPED: create a query object
  37. // Look for an initial simple project; we'll avoid constructing Values for fields that won't make it through the projection
  38. var projection = {};
  39. var dependencies;
  40. var deps = {};
  41. var status = DocumentSource.GetDepsReturn.SEE_NEXT;
  42. for (var i=0; i < sources.length && status !== DocumentSource.GetDepsReturn.EXHAUSTIVE; i++) {
  43. status = sources[i].getDependencies(deps);
  44. if(Object.keys(deps).length === 0) {
  45. status = DocumentSource.GetDepsReturn.NOT_SUPPORTED;
  46. }
  47. }
  48. if (status === DocumentSource.GetDepsReturn.EXHAUSTIVE) {
  49. projection = DocumentSource.depsToProjection(deps);
  50. dependencies = DocumentSource.parseDeps(deps);
  51. }
  52. // NOTE: SKIPPED: Look for an initial sort
  53. // NOTE: SKIPPED: Create the sort object
  54. //get the full "namespace" name
  55. // var fullName = dbName + "." + pipeline.collectionName;
  56. // NOTE: SKIPPED: if(DEV) log messages
  57. // Create the necessary context to use a Cursor
  58. // NOTE: SKIPPED: pSortedCursor bit
  59. // NOTE: SKIPPED: pUnsortedCursor bit
  60. // NOTE: Deviating from mongo here. We're passing in a source or set of documents instead of collection name in the ctx.ns field
  61. var source;
  62. if(expCtx.ns instanceof DocumentSource){
  63. source = expCtx.ns;
  64. } else {
  65. var cursorWithContext = new CursorDocumentSource.CursorWithContext(/*fullName*/);
  66. // Now add the Cursor to cursorWithContext
  67. cursorWithContext._cursor = new Cursor( expCtx.ns ); //NOTE: collectionName will likely be an array of documents in munge
  68. // wrap the cursor with a DocumentSource and return that
  69. source = new CursorDocumentSource( cursorWithContext, expCtx );
  70. // NOTE: SKIPPED: Note the query and sort
  71. if (Object.keys(projection).length) source.setProjection(projection, dependencies);
  72. while(sources.length > 0 && source.coalesce(sources[0])) { //Note: Attempting to coalesce into the cursor source
  73. sources.shift();
  74. }
  75. }
  76. pipeline.addInitialSource(source);
  77. };