An Introduction to RxJS Buffering Operators

By default, observable streams in RxJS don’t buffer or keep a cache of the values in a stream. It can be useful however to buffer some of the values when processing in batches would be beneficial. RxJS provides 5 operators to make that easy: buffer, bufferCount, bufferTime, bufferToggle and bufferWhen. Let’s explore the buffering operators with simple examples.

Note that buffered value are emitted as arrays of values instead of standalone values.

For our examples, we’ll use button clicks as our source observable and map clicks to a random number .

buffer

The buffer operator takes another observable as argument and will buffer the emitted values from the original observable until the provided observable emits. The buffer then resets and starts buffering again until the provided observable emits once more.

In the following example, the values mapped from button clicks are buffered until our release$ observable emits (when the release button is clicked):

const btn = document.querySelector('.click-me');
const releaseBtn = document.querySelector('.release');

const release$ = Rx.Observable.fromEvent(releaseBtn, 'click');

const myObs1 = Rx.Observable.fromEvent(btn, 'click')
  .map(_ => Math.floor(Math.random() * 100))
  .buffer(release$)
  .subscribe(random => console.log(random));

bufferCount

With bufferCount you provide a number of values to keep in a buffer before the values are emitted.

Here, 4 mapped values (4 button clicks) are buffered until they are emitted and logged to the console:

const btn = document.querySelector('.click-me');

const myObs1 = Rx.Observable.fromEvent(btn, 'click')
  .map(_ => Math.floor(Math.random() * 100))
  .bufferCount(4)
  .subscribe(random => console.log(random));

With that, every 4 clicks an array like [86, 93, 57, 64] will be logged to the console.

bufferTime

bufferTime takes an amount of milliseconds to buffer values for. After the time as elapsed, the buffered values are emitted and the buffer starts again.

Here, values mapped from the click events are buffered for 1 second and then emitted:

const btn = document.querySelector('.click-me');

const myObs1 = Rx.Observable.fromEvent(btn, 'click')
  .map(_ => Math.floor(Math.random() * 100))
  .bufferTime(1000)
  .take(15)
  .subscribe(random => console.log(random));

If the user clicks 3 times during one of the 1-second buffering periods, an array like [44, 71, 90] will be logged to the console. This could go on forever, so here we’re also using the take operator to close the stream after 15 array values have been emitted.

bufferWhen

bufferWhen is similar to buffer, but instead of taking an observable, it takes a selector function known as the closing selector. Here we’ll use the same example as buffer with the slight distinction that we’re using a function that returns our release$ observable:

const btn = document.querySelector('.click-me');
const releaseBuffer = document.querySelector('.release');

const release$ = Rx.Observable.fromEvent(releaseBuffer, 'click');

const myObs1 = Rx.Observable.fromEvent(btn, 'click')
  .map(_ => Math.floor(Math.random() * 100))
  .bufferWhen(() => release$)
  .subscribe(random => console.log(random));

bufferToggle

bufferToggle is similar to buffer, but instead it takes two arguments: an observable to start the buffer and a closing selector function to stop buffering and emit the values.

Here are button clicks are ignored until the start$ observable emits. Then, the values are buffered until the stop$ observable emits. start$ then needs to emit once more to start a new buffer:

const btn = document.querySelector('.click-me');

const startBtn = document.querySelector('.start');
const stopBtn = document.querySelector('.stop');

const start$ = Rx.Observable
  .fromEvent(startBtn, 'click')
  .do(_ => console.log('Start buffering!'));

const stop$ = Rx.Observable
  .fromEvent(stopBtn, 'click')
  .do(_ => console.log('Stop buffering!'));

const myObs1 = Rx.Observable.fromEvent(btn, 'click')
  .map(_ => Math.floor(Math.random() * 100))
  .bufferToggle(start$, () => stop$)
  .subscribe(random => console.log(random));

Here we're also using the do operator on our start$ and stop$ observables to log a message to the console when they emit.

  Tweet It
✖ Clear

🕵 Search Results

🔎 Searching...