Create a Reactive Data Pipeline with Vue.js

Joshua Bemenderfer

Vue can do a lot of things. One of them, of course, is acting as the view layer for your apps. Actually, I think that’s the only intended purpose for Vue, but I know of a few others that are actually pretty neat. Today, we’ll eschew using Vue for the, well, view layer, and instead use it to… create a reactive data pipeline akin to RxJS Observables through the magic of computed properties. Yeah.

Now, there’s quite a bit of code here, and I’ll try to explain it as best I can, but it may be a bit difficult to understand.

The intended usage we’ll be aiming for will be something like this:

import { ReactivePipeline } from './reactive-pipeline';

const sourceArray = ['e', 'x', 'a', 'm', 'p', 'l', 'e'];

// Create a new pipeline.
new ReactivePipeline(sourceArray)
// Make all letters uppercase.
.pipe(array => array.map(letter => letter.toUpperCase()))
// Join the array into a string.
.pipe(array => array.join(''))
// Log any errors.
.error(e => {
  console.error(e)
})
// Start the pipeline and listen for changes.
.subscribe(result => {
  // Whenever the output changes, log it.
  console.log(result) // EXAMPLE
});

Now, whenever the original array changes, the callback for .subscribe will output the result of running that array through the pipeline. Slick, isn’t it? Oh, and did I mention that it works perfectly in Node too, without a browser environment?

Creating the Class

All you need for this is Vue installed as a dependency. (And, if you’re using ES6 import / export, some sort of transpiler. [Or a browser from the future.]) If you use normal require(), it runs under Node just fine.

The first step code-wise is to create a simple class with a few functions.

reactive-pipeline.js

import Vue from 'vue';

export class ReactivePipeline {
  constructor (sourceData) {
    this._source = sourceData;
    this._tracker = null;
    this._transformers = [];
    this._subscribeHandler = function() {};
    this._errorHandler = function(e) { throw e };
  }

  pipe (transformer) {
    this._transformers.push(transformer);
    return this;
  }

  subscribe (callback) {
    this._subscribeHandler = callback;
    this.setupComponent();
    return this;
  }

  error (callback) {
    this._errorHandler = callback;
    return this;
  }

  setupComponent () {
    // ... We'll flesh this out next.
  }
}

It’s pretty self-explanatory. Really all we’re doing is creating a bunch of functions that collect data and store it in the class for use by setupComponent().

Now, the tricky part is what happens in setupComponent. Really, for what we’re trying to accomplish here it’s a bit overcomplicated (we could just use a single watcher and no computed properties,) but this method would allow you to add support for Vue’s dependency tracking system to cache computed properties and not rerun the whole thing when a dependency changes.

reactive-pipeline.js

...
setupComponent () {
  // Get everything in this closure so we can access it from inside the computed handlers.
  const source = this._source;
  const transformers = this._transformers;
  const subscribeHandler = this._subscribeHandler;
  const errorHandler = this._errorHandler;

  const computed = {};

  // Populate computed properties object with transformer function wrappers.
  transformers.forEach((transformer, index) => {
    // Create a named computed property for each transformer.
    // These can't be arrow functions, as they need to be bound to the generated component.
    computed[`transformer_${index}`] = function() {
      try {
        // Run each transformer against the previous value in the chain.
        return transformer(index === 0 ? this.source : this[`transformer_${index - 1}`]);
      } catch (e) {
        // Handle any errors.
        errorHandler(e);
      }
    }
  })

  // Create an "output" computed property that simply serves as the last one in the chain.
  computed['output'] = function() {
    return this[`transformer_${transformers.length - 1}`];
  }

  // Here's where the magic happens.
  // Create a new Vue component with the source data in it's data property.
  // (This makes it observable.)
  const PipelineComponent = Vue.extend({
    data() {
      return {
        source: this._source
      }
    },

    // We need one watcher to "use" the final computed property and cause the chain to update.
    watch: {
      // I do realize we could've just put the entire transformer chain in here, but that would be boring.
      source () {
        subscribeHandler(this.output);
      }
    },

    computed,
  });

  // Now, initialize the component and start the transformation chain going.
  this._tracker = new PipelineComponent();

  return this;
}
...

Once that’s done, you should be able to use it in the way demonstrated at the beginning of the article.

All together now:

The ReactivePipeline class…

reactive-pipeline.js

import Vue from 'vue';

export class ReactivePipeline {
  constructor (sourceData) {
    this._source = sourceData;
    this._tracker = null;
    this._transformers = [];
    this._subscribeHandler = function() {};
    this._errorHandler = function(e) { throw e };
  }

  pipe (transformer) {
    this._transformers.push(transformer);
    return this;
  }

  subscribe (callback) {
    this._subscribeHandler = callback;
    this.setupComponent();
    return this;
  }

  error (callback) {
    this._errorHandler = callback;
    return this;
  }

  setupComponent () {
    // Get everything in this closure so we can access it from inside the computed handlers.
    const source = this._source;
    const transformers = this._transformers;
    const subscribeHandler = this._subscribeHandler;
    const errorHandler = this._errorHandler;

    const computed = {};

    // Populate computed properties object with transformer function wrappers.
    transformers.forEach((transformer, index) => {
      // Create a named computed property for each transformer.
      // These can't be arrow functions, as they need to be bound to the generated component.
      computed[`transformer_${index}`] = function() {
        try {
          // Run each transformer against the previous value in the chain.
          return transformer(index === 0 ? this.source : this[`transformer_${index - 1}`]);
        } catch (e) {
          // Handle any errors.
          errorHandler(e);
        }
      }
    })

    // Create an "output" computed property that simply serves as the last one in the chain.
    computed['output'] = function() {
      return this[`transformer_${transformers.length - 1}`];
    }

    // Here's where the magic happens.
    // Create a new Vue component with the source data in it's data property.
    // (This makes it observable.)
    const PipelineComponent = Vue.extend({
      data() {
        return {
          source: this._source
        }
      },

      // We need one watcher to "use" the final computed property and cause the chain to update.
      watch: {
        // I do realize we could've just put the entire transformer chain in here, but that would be boring.
        source () {
          subscribeHandler(this.output);
        }
      },

      computed,
    });

    // Now, initialize the component and start the transformation chain going.
    this._tracker = new PipelineComponent();

    return this;
  }
}

… and usage:

main.js

import { ReactivePipeline } from './reactive-pipeline';

const sourceArray = ['e', 'x', 'a', 'm', 'p', 'l', 'e'];

// Create a new pipeline.
new ReactivePipeline(sourceArray)
// Make all letters uppercase.
.pipe(array => array.map(letter => letter.toUpperCase()))
// Join the array into a string.
.pipe(array => array.join(''))
// Log any errors.
.error(e => {
  console.error(e)
})
// Start the pipeline and listen for changes.
.subscribe(result => {
  // Whenever the output changes, log it.
  console.log(result) // EXAMPLE
});

BOOM! An RxJS-style Reactive data pipeline with Vue.js in less than 100 SLoC!

Just to rub it in, RxJS is ~140 kB minified, while Vue is around 60 kB. So you can have a view framework and your own custom observable system in less than half the size of RxJS. :D

Acknowledgments

I’m incredibly thankful for the work of Anirudh Sanjeev who first opened my eyes to the potential of Vue’s computed properties and set my imagination running like crazy with ideas.

✖ Clear

🕵 Search Results

🔎 Searching...