Tutorial

Create a Reactive Data Pipeline with Vue.js

Published on May 9, 2017
Default avatar

By Joshua Bemenderfer

Create a Reactive Data Pipeline with Vue.js

While we believe that this content benefits our community, we have not yet thoroughly reviewed it. If you have any suggestions for improvements, please let us know by clicking the “report an issue“ button at the bottom of the tutorial.

Vue can do a lot of things. One of them, of course, is acting as the view layer for your apps. Actually, I think that’s the only intended purpose for Vue, but I know of a few others that are actually pretty neat. Today, we’ll eschew using Vue for the, well, view layer, and instead use it to… create a reactive data pipeline akin to RxJS Observables through the magic of computed properties. Yeah.

Now, there’s quite a bit of code here, and I’ll try to explain it as best I can, but it may be a bit difficult to understand.

The intended usage we’ll be aiming for will be something like this:

import { ReactivePipeline } from './reactive-pipeline';

const sourceArray = ['e', 'x', 'a', 'm', 'p', 'l', 'e'];

// Create a new pipeline.
new ReactivePipeline(sourceArray)
// Make all letters uppercase.
.pipe(array => array.map(letter => letter.toUpperCase()))
// Join the array into a string.
.pipe(array => array.join(''))
// Log any errors.
.error(e => {
  console.error(e)
})
// Start the pipeline and listen for changes.
.subscribe(result => {
  // Whenever the output changes, log it.
  console.log(result) // EXAMPLE
});

Now, whenever the original array changes, the callback for .subscribe will output the result of running that array through the pipeline. Slick, isn’t it? Oh, and did I mention that it works perfectly in Node too, without a browser environment?

Creating the Class

All you need for this is Vue installed as a dependency. If you use normal require(), it runs under Node just fine.

The first step code-wise is to create a simple class with a few functions.

reactive-pipeline.js
import Vue from 'vue';

export class ReactivePipeline {
  constructor (sourceData) {
    this._source = sourceData;
    this._tracker = null;
    this._transformers = [];
    this._subscribeHandler = function() {};
    this._errorHandler = function(e) { throw e };
  }

  pipe (transformer) {
    this._transformers.push(transformer);
    return this;
  }

  subscribe (callback) {
    this._subscribeHandler = callback;
    this.setupComponent();
    return this;
  }

  error (callback) {
    this._errorHandler = callback;
    return this;
  }

  setupComponent () {
    // ... We'll flesh this out next.
  }
}

It’s pretty self-explanatory. Really all we’re doing is creating a bunch of functions that collect data and store it in the class for use by setupComponent().

Now, the tricky part is what happens in setupComponent. Really, for what we’re trying to accomplish here it’s a bit overcomplicated (we could just use a single watcher and no computed properties,) but this method would allow you to add support for Vue’s dependency tracking system to cache computed properties and not rerun the whole thing when a dependency changes.

reactive-pipeline.js
...
setupComponent () {
  // Get everything in this closure so we can access it from inside the computed handlers.
  const source = this._source;
  const transformers = this._transformers;
  const subscribeHandler = this._subscribeHandler;
  const errorHandler = this._errorHandler;

  const computed = {};

  // Populate computed properties object with transformer function wrappers.
  transformers.forEach((transformer, index) => {
    // Create a named computed property for each transformer.
    // These can't be arrow functions, as they need to be bound to the generated component.
    computed[`transformer_${index}`] = function() {
      try {
        // Run each transformer against the previous value in the chain.
        return transformer(index === 0 ? this.source : this[`transformer_${index - 1}`]);
      } catch (e) {
        // Handle any errors.
        errorHandler(e);
      }
    }
  })

  // Create an "output" computed property that simply serves as the last one in the chain.
  computed['output'] = function() {
    return this[`transformer_${transformers.length - 1}`];
  }

  // Here's where the magic happens.
  // Create a new Vue component with the source data in it's data property.
  // (This makes it observable.)
  const PipelineComponent = Vue.extend({
    data() {
      return {
        source: this._source
      }
    },

    // We need one watcher to "use" the final computed property and cause the chain to update.
    watch: {
      // I do realize we could've just put the entire transformer chain in here, but that would be boring.
      source () {
        subscribeHandler(this.output);
      }
    },

    computed,
  });

  // Now, initialize the component and start the transformation chain going.
  this._tracker = new PipelineComponent();

  return this;
}
...

Once that’s done, you should be able to use it in the way demonstrated at the beginning of the article.

All together now:

The ReactivePipeline class…

reactive-pipeline.js
import Vue from 'vue';

export class ReactivePipeline {
  constructor (sourceData) {
    this._source = sourceData;
    this._tracker = null;
    this._transformers = [];
    this._subscribeHandler = function() {};
    this._errorHandler = function(e) { throw e };
  }

  pipe (transformer) {
    this._transformers.push(transformer);
    return this;
  }

  subscribe (callback) {
    this._subscribeHandler = callback;
    this.setupComponent();
    return this;
  }

  error (callback) {
    this._errorHandler = callback;
    return this;
  }

  setupComponent () {
    // Get everything in this closure so we can access it from inside the computed handlers.
    const source = this._source;
    const transformers = this._transformers;
    const subscribeHandler = this._subscribeHandler;
    const errorHandler = this._errorHandler;

    const computed = {};

    // Populate computed properties object with transformer function wrappers.
    transformers.forEach((transformer, index) => {
      // Create a named computed property for each transformer.
      // These can't be arrow functions, as they need to be bound to the generated component.
      computed[`transformer_${index}`] = function() {
        try {
          // Run each transformer against the previous value in the chain.
          return transformer(index === 0 ? this.source : this[`transformer_${index - 1}`]);
        } catch (e) {
          // Handle any errors.
          errorHandler(e);
        }
      }
    })

    // Create an "output" computed property that simply serves as the last one in the chain.
    computed['output'] = function() {
      return this[`transformer_${transformers.length - 1}`];
    }

    // Here's where the magic happens.
    // Create a new Vue component with the source data in it's data property.
    // (This makes it observable.)
    const PipelineComponent = Vue.extend({
      data() {
        return {
          source: this._source
        }
      },

      // We need one watcher to "use" the final computed property and cause the chain to update.
      watch: {
        // I do realize we could've just put the entire transformer chain in here, but that would be boring.
        source () {
          subscribeHandler(this.output);
        }
      },

      computed,
    });

    // Now, initialize the component and start the transformation chain going.
    this._tracker = new PipelineComponent();

    return this;
  }
}

… and usage:

main.js
import { ReactivePipeline } from './reactive-pipeline';

const sourceArray = ['e', 'x', 'a', 'm', 'p', 'l', 'e'];

// Create a new pipeline.
new ReactivePipeline(sourceArray)
// Make all letters uppercase.
.pipe(array => array.map(letter => letter.toUpperCase()))
// Join the array into a string.
.pipe(array => array.join(''))
// Log any errors.
.error(e => {
  console.error(e)
})
// Start the pipeline and listen for changes.
.subscribe(result => {
  // Whenever the output changes, log it.
  console.log(result) // EXAMPLE
});

BOOM! An RxJS-style Reactive data pipeline with Vue.js in less than 100 SLoC!

Just to rub it in, RxJS is ~140 kB minified, while Vue is around 60 kB. So you can have a view framework and your own custom observable system in less than half the size of RxJS. :D

Acknowledgments

I’m incredibly thankful for the work of Anirudh Sanjeev who first opened my eyes to the potential of Vue’s computed properties and set my imagination running like crazy with ideas.

Thanks for learning with the DigitalOcean Community. Check out our offerings for compute, storage, networking, and managed databases.

Learn more about us


About the authors
Default avatar
Joshua Bemenderfer

author

Still looking for an answer?

Ask a questionSearch for more help

Was this helpful?
 
Leave a comment


This textbox defaults to using Markdown to format your answer.

You can type !ref in this text area to quickly search our full set of tutorials, documentation & marketplace offerings and insert the link!

Try DigitalOcean for free

Click below to sign up and get $200 of credit to try our products over 60 days!

Sign up

Join the Tech Talk
Success! Thank you! Please check your email for further details.

Please complete your information!

Get our biweekly newsletter

Sign up for Infrastructure as a Newsletter.

Hollie's Hub for Good

Working on improving health and education, reducing inequality, and spurring economic growth? We'd like to help.

Become a contributor

Get paid to write technical tutorials and select a tech-focused charity to receive a matching donation.

Welcome to the developer cloud

DigitalOcean makes it simple to launch in the cloud and scale up as you grow — whether you're running one virtual machine or ten thousand.

Learn more
DigitalOcean Cloud Control Panel