PipelineD.js 3.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. "use strict";
  2. /**
  3. * Pipeline helper for reading data
  4. * @class PipelineD
  5. * @namespace mungedb-aggregate.pipeline
  6. * @module mungedb-aggregate
  7. * @constructor
  8. **/
  9. var PipelineD = module.exports = function PipelineD(){
  10. if(this.constructor == PipelineD) throw new Error("Never create instances of this! Use the static helpers only.");
  11. }, klass = PipelineD, base = Object, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
  12. // DEPENDENCIES
  13. var DocumentSource = require('./documentSources/DocumentSource'),
  14. CursorDocumentSource = require('./documentSources/CursorDocumentSource'),
  15. Cursor = require('../Cursor');
  16. /**
  17. * Create a Cursor wrapped in a DocumentSourceCursor, which is suitable to be the first source for a pipeline to begin with.
  18. * This source will feed the execution of the pipeline.
  19. *
  20. * //NOTE: Not doing anything here, as we don't use any of these cursor source features
  21. * //NOTE: DEVIATION FROM THE MONGO: We don't have special optimized cursors; You could support something similar by overriding `Pipeline#run` to call `DocumentSource#coalesce` on the `inputSource` if you really need it.
  22. *
  23. * This method looks for early pipeline stages that can be folded into
  24. * the underlying cursor, and when a cursor can absorb those, they
  25. * are removed from the head of the pipeline. For example, an
  26. * early match can be removed and replaced with a Cursor that will
  27. * do an index scan.
  28. *
  29. * @param pipeline {Pipeline} the logical "this" for this operation
  30. * @param ctx {Object} Context for expressions
  31. * @returns {CursorDocumentSource} the cursor that was created
  32. **/
  33. klass.prepareCursorSource = function prepareCursorSource(pipeline, expCtx){
  34. var sources = pipeline.sources;
  35. // NOTE: SKIPPED: look for initial match
  36. // NOTE: SKIPPED: create a query object
  37. // Look for an initial simple project; we'll avoid constructing Values for fields that won't make it through the projection
  38. var projection = {};
  39. var deps = {};
  40. var status = DocumentSource.GetDepsReturn.SEE_NEXT;
  41. for (var i=0; i < sources.length && status !== DocumentSource.GetDepsReturn.EXHAUSTIVE; i++) {
  42. status = sources[i].getDependencies(deps);
  43. if(Object.keys(deps).length === 0) {
  44. status = DocumentSource.GetDepsReturn.NOT_SUPPORTED;
  45. }
  46. }
  47. if (status === DocumentSource.GetDepsReturn.EXHAUSTIVE) {
  48. projection = DocumentSource.depsToProjection(deps);
  49. }
  50. // NOTE: SKIPPED: Look for an initial sort
  51. // NOTE: SKIPPED: Create the sort object
  52. //get the full "namespace" name
  53. // var fullName = dbName + "." + pipeline.collectionName;
  54. // NOTE: SKIPPED: if(DEV) log messages
  55. // Create the necessary context to use a Cursor
  56. // NOTE: SKIPPED: pSortedCursor bit
  57. // NOTE: SKIPPED: pUnsortedCursor bit
  58. // NOTE: Deviating from mongo here. We're passing in a source or set of documents instead of collection name in the ctx.ns field
  59. var source;
  60. if(expCtx.ns instanceof DocumentSource){
  61. source = expCtx.ns;
  62. } else {
  63. var cursorWithContext = new CursorDocumentSource.CursorWithContext(/*fullName*/);
  64. // Now add the Cursor to cursorWithContext
  65. cursorWithContext._cursor = new Cursor( expCtx.ns ); //NOTE: collectionName will likely be an array of documents in munge
  66. // wrap the cursor with a DocumentSource and return that
  67. source = new CursorDocumentSource( cursorWithContext, expCtx );
  68. // NOTE: SKIPPED: Note the query and sort
  69. if (Object.keys(projection).length) source.setProjection(projection);
  70. while(sources.length > 0 && source.coalesce(sources[0])) { //Note: Attempting to coalesce into the cursor source
  71. sources.shift();
  72. }
  73. }
  74. pipeline.addInitialSource(source);
  75. };