123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352 |
- var UnwindDocumentSource = module.exports = (function(){
- // CONSTRUCTOR
- /**
- * A document source unwindper
- *
- * @class UnwindDocumentSource
- * @namespace munge.pipeline.documentsource
- * @module munge
- * @constructor
- * @param {Object} query the match query to use
- **/
- var klass = module.exports = UnwindDocumentSource = function UnwindDocumentSource(/* pCtx*/){
- if(arguments.length !== 0) throw new Error("zero args expected");
- base.call(this);
-
- // Configuration state.
- this._unwindPath = null;
-
- // Iteration state.
- this._unwinder = null;
-
- }, base = require('./DocumentSource'), proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
- var DocumentSource = base,
- FieldPath = require('../FieldPath'),
- Document = require('../Document'),
- Expression = require('../expressions/Expression');
- klass.Unwinder = (function(){
- /**
- * Helper class to unwind arrays within a series of documents.
- *
- * @param {String} unwindPath is the field path to the array to unwind.
- **/
- var klass = function Unwinder(unwindPath){
- // Path to the array to unwind.
- this._unwindPath = unwindPath;
- // The souce document to unwind.
- this._document = null;
- // Document indexes of the field path components.
- this._unwindPathFieldIndexes = [];
- // Iterator over the array within _document to unwind.
- this._unwindArrayIterator = null;
- // The last value returned from _unwindArrayIterator.
- //this._unwindArrayIteratorCurrent = undefined; //dont define this yet
- }, base = Object, proto = klass.prototype = Object.create(base.prototype, {constructor:{value:klass}});
-
- /**
- * Reset the unwinder to unwind a new document.
- *
- * @param {Object} document
- **/
- proto.resetDocument = function resetDocument(document){
- if (!document){
- throw new Error("document is required!");
- }
-
- // Reset document specific attributes.
- this._document = document;
- this._unwindPathFieldIndexes.length = 0;
- this._unwindArrayIterator = null;
- delete this._unwindArrayIteratorCurrent;
-
- var pathValue = this.extractUnwindValue(); // sets _unwindPathFieldIndexes
- if (!pathValue || pathValue.length === 0) {
- // The path does not exist.
- return;
- }
- if (pathValue.constructor !== Array){
- throw new Error(UnwindDocumentSource.unwindName + ": value at end of field path must be an array; code 15978");
- }
-
- // Start the iterator used to unwind the array.
- this._unwindArrayIterator = pathValue.slice(0);
- this._unwindArrayIteratorCurrent = this._unwindArrayIterator.splice(0,1)[0];
- };
-
- /**
- * eof
- *
- * @returns {Boolean} true if done unwinding the last document passed to resetDocument().
- **/
- proto.eof = function eof(){
- return !this.hasOwnProperty("_unwindArrayIteratorCurrent");
- };
-
- /**
- * Try to advance to the next document unwound from the document passed to resetDocument().
- *
- * @returns {Boolean} true if advanced to a new unwound document, but false if done advancing.
- **/
- proto.advance = function advance(){
- if (!this._unwindArrayIterator) {
- // resetDocument() has not been called or the supplied document had no results to
- // unwind.
- delete this._unwindArrayIteratorCurrent;
- } else if (!this._unwindArrayIterator.length) {
- // There are no more results to unwind.
- delete this._unwindArrayIteratorCurrent;
- } else {
- this._unwindArrayIteratorCurrent = this._unwindArrayIterator.splice(0, 1)[0];
- }
- };
-
- /**
- * Get the current document unwound from the document provided to resetDocument(), using
- * the current value in the array located at the provided unwindPath. But return
- * intrusive_ptr<Document>() if resetDocument() has not been called or the results to unwind
- * have been exhausted.
- *
- * @returns {Object}
- **/
- proto.getCurrent = function getCurrent(){
- if (!this.hasOwnProperty("_unwindArrayIteratorCurrent")) {
- return null;
- }
-
- // Clone all the documents along the field path so that the end values are not shared across
- // documents that have come out of this pipeline operator. This is a partial deep clone.
- // Because the value at the end will be replaced, everything along the path leading to that
- // will be replaced in order not to share that change with any other clones (or the
- // original).
-
- var clone = Document.clone(this._document);
- var current = clone;
- var n = this._unwindPathFieldIndexes.length;
- if (!n) {
- throw new Error("unwindFieldPathIndexes are empty");
- }
- for (var i = 0; i < n; ++i) {
- var fi = this._unwindPathFieldIndexes[i];
- var fp = current[fi];
- if (i + 1 < n) {
- // For every object in the path but the last, clone it and continue on down.
- var next = Document.clone(fp);
- current[fi] = next;
- current = next;
- } else {
- // In the last nested document, subsitute the current unwound value.
- current[fi] = this._unwindArrayIteratorCurrent;
- }
- }
-
- return clone;
- };
-
- /**
- * Get the value at the unwind path, otherwise an empty pointer if no such value
- * exists. The _unwindPathFieldIndexes attribute will be set as the field path is traversed
- * to find the value to unwind.
- *
- * @returns {Object}
- **/
- proto.extractUnwindValue = function extractUnwindValue(){
- var current = this._document;
- var pathValue;
- var pathLength = this._unwindPath.getPathLength();
- for (var i = 0; i < pathLength; ++i) {
-
- var idx = this._unwindPath.getFieldName(i);
-
- if (!current.hasOwnProperty(idx)) {
- // The target field is missing.
- return null;
- }
-
- // Record the indexes of the fields down the field path in order to quickly replace them
- // as the documents along the field path are cloned.
- this._unwindPathFieldIndexes.push(idx);
-
- pathValue = current[idx];
-
- if (i < pathLength - 1) {
-
- if (typeof pathValue !== 'object') {
- // The next field in the path cannot exist (inside a non object).
- return null;
- }
-
- // Move down the object tree.
- current = pathValue;
- }
- }
-
- return pathValue;
- };
-
- return klass;
- })();
-
- /**
- * Lazily construct the _unwinder and initialize the iterator state of this DocumentSource.
- * To be called by all members that depend on the iterator state.
- **/
- proto.lazyInit = function lazyInit(){
- if (!this._unwinder) {
- if (!this._unwindPath){
- throw new Error("unwind path does not exist!");
- }
- this._unwinder = new klass.Unwinder(this._unwindPath);
- if (!this.pSource.eof()) {
- // Set up the first source document for unwinding.
- this._unwinder.resetDocument(this.pSource.getCurrent());
- }
- this.mayAdvanceSource();
- }
- };
-
- /**
- * If the _unwinder is exhausted and the source may be advanced, advance the pSource and
- * reset the _unwinder's source document.
- **/
- proto.mayAdvanceSource = function mayAdvanceSource(){
- while(this._unwinder.eof()) {
- // The _unwinder is exhausted.
- if (this.pSource.eof()) {
- // The source is exhausted.
- return;
- }
- if (!this.pSource.advance()) {
- // The source is exhausted.
- return;
- }
- // Reset the _unwinder with pSource's next document.
- this._unwinder.resetDocument(this.pSource.getCurrent());
- }
- };
-
- /**
- * Specify the field to unwind.
- **/
- proto.unwindPath = function unwindPath(fieldPath){
- // Can't set more than one unwind path.
- if (this._unwindPath){
- throw new Error(this.getSourceName() + " can't unwind more than one path; code 15979");
- }
-
- // Record the unwind path.
- this._unwindPath = new FieldPath(fieldPath);
- };
- klass.unwindName = "$unwind";
- proto.getSourceName = function getSourceName(){
- return klass.unwindName;
- };
-
- /**
- * Get the fields this operation needs to do its job.
- * Deps should be in "a.b.c" notation
- *
- * @method getDependencies
- * @param {Object} deps set (unique array) of strings
- * @returns DocumentSource.GetDepsReturn
- **/
- proto.getDependencies = function getDependencies(deps) {
- if (!this._unwindPath){
- throw new Error("unwind path does not exist!");
- }
- deps[this._unwindPath.getPath(false)] = 1;
- return DocumentSource.GetDepsReturn.SEE_NEXT;
- };
- /**
- * Is the source at EOF?
- *
- * @method eof
- **/
- proto.eof = function eof() {
- this.lazyInit();
- return this._unwinder.eof();
- };
- /**
- * some implementations do the equivalent of verify(!eof()) so check eof() first
- *
- * @method getCurrent
- * @returns {Document} the current Document without advancing
- **/
- proto.getCurrent = function getCurrent() {
- this.lazyInit();
- return this._unwinder.getCurrent();
- };
- /**
- * Advance the state of the DocumentSource so that it will return the next Document.
- * The default implementation returns false, after checking for interrupts.
- * Derived classes can call the default implementation in their own implementations in order to check for interrupts.
- *
- * @method advance
- * @returns {Boolean} whether there is another document to fetch, i.e., whether or not getCurrent() will succeed. This default implementation always returns false.
- **/
- proto.advance = function advance() {
- base.prototype.advance.call(this); // check for interrupts
- this.lazyInit();
- this._unwinder.advance();
- this.mayAdvanceSource();
- return !this._unwinder.eof();
- };
- /**
- * Create an object that represents the document source. The object
- * will have a single field whose name is the source's name. This
- * will be used by the default implementation of addToJsonArray()
- * to add this object to a pipeline being represented in JSON.
- *
- * @method sourceToJson
- * @param {Object} builder JSONObjBuilder: a blank object builder to write to
- * @param {Boolean} explain create explain output
- **/
- proto.sourceToJson = function sourceToJson(builder, explain) {
- if (!this._unwindPath){
- throw new Error("unwind path does not exist!");
- }
- builder[this.getSourceName()] = this._unwindPath.getPath(true);
- };
- /**
- * Creates a new UnwindDocumentSource with the input path as the path to unwind
- *
- * @param {String} JsonElement this thing is *called* Json, but it expects a string
- **/
- klass.createFromJson = function createFromJson(JsonElement) {
- /*
- The value of $unwind should just be a field path.
- */
- if (JsonElement.constructor !== String){
- throw new Error("the " + klass.unwindName + " field path must be specified as a string; code 15981");
- }
- var pathString = Expression.removeFieldPrefix(JsonElement);
- var unwind = new UnwindDocumentSource();
- unwind.unwindPath(pathString);
- return unwind;
- };
-
- /**
- * Reset the document source so that it is ready for a new stream of data.
- * Note that this is a deviation from the mongo implementation.
- *
- * @method reset
- **/
- proto.reset = function reset(){
- this._unwinder = null;
- };
- return klass;
- })();
|