Skip to content

Flow Control

Eugene Ghanizadeh edited this page Oct 19, 2021 · 11 revisions

In many cases you need to control rate and flow of data. For example, you don't want to spam your APIs, or you might want to wait for the user to finish typing before querying their input, etc.

This control can go both ways: in some cases, you want to control the rate at which some source (like an API endpoint) is pulled. In other cases, you might want to control the rate at which some source pushes data (like user input).


Pull Rate & Connect Rate

The pullrate() transform controls the rate at which the source can be pulled:

pipe(
  iterable([1, 2, 3, 4, 5]),
  pullrate(1000),
  tap(console.log),
  iterate
)

// the values will be logged with one second delay between each.
// > 1
// > 2
// > 3
// ...

It can be combined with fetch() (which creates sources from network requests) and iterate() to poll an endpoint with given rate. For example, this code checks if XKCD is alive every 5 seconds:

import { pipe, fetch, pullrate, tap, finalize, iterate } from 'streamlets'

pipe(
  fetch('http://xkcd.com/info.0.json'),
  pullrate(5000),
  tap(() => console.log('XKCD is online!')),
  finalize(() => console.log('XKCD is offline!')),
  iterate
)

Try in Sandbox


💡finalize() is like tap(), but it will only execute when the source has ended the stream (possibly due to an error).


In this example, if XKCD is down, fetch() will emit an error and the stream will end. If we want to retry to see when XKCD will be back up, we can use the retry() transform:

import { pipe, fetch, pullrate, tap, finalize, iterate, retry } from 'streamlets'

pipe(
  fetch('http://xkcd.com/info.0.json'),
  pullrate(5000),
  tap(() => console.log('XKCD is online!')),
  finalize(() => console.log('XKCD is offline!')),

  // when there is an error, retry instead of giving up!
  retry,

  iterate
)

retry() operator will reconnect the sink to the source whenever the source ends the stream due to an error. The issue with this example is that when XKCD is down, retry() will reconnect rapidly, bypassing the pullrate() control and spamming the API. To avoid this, we can use connectRate() transform, which controls the rate at which sinks can connect to the source:

import {
  pipe, fetch, pullrate, connectRate,
  tap, finalize, iterate, retry
} from 'streamlets'

pipe(
  fetch('http://xkcd.com/info.0.json'),
  pullrate(5000),

  // we can only attempt reconnecting once every second
  connectRate(1000),

  tap(() => console.log('XKCD is online!')),
  finalize(() => console.log('XKCD is offline!')),
  retry,
  iterate
)



Debouncing & Throttling

debounce() controls the rate at which the source can push data. It blocks the stream and drops all incoming data until the rate of incoming data falls under the specified rate:

HTML Code
<button>Click Me Rapidly!</button>
<div></div>
import { pipe, event, debounce, observe, tap } from 'streamlets'

const button = document.querySelector('button')
const div = document.querySelector('div')

pipe(
  event(button, 'click'),
  tap(() => div.textContent = 'Clicking ...'),

  // the stream doesn't get passed here as long as the button
  // is clicked faster than once per 500ms
  debounce(500),

  tap(() => div.textContent = 'Tired of clicking?'),
  observe
)

Try in Sandbox


💡event() creates a stream from DOM events.


throttle() also controls the rate at which the source can emit, but instead of blocking the stream, it will simply drop values enough that the rate of incoming data matches the specified rate:

HTML Code
<button>Click Me Rapidly!</button>
<div></div>
import { pipe, event, observe, tap, scan } from 'streamlets'

const button = document.querySelector('button')
const div = document.querySelector('div')

pipe(
  event(button, 'click'),

  // no matter how fast you click on the button,
  // the counter won't increase faster than once per second.
  throttle(1000),

  scan(c => c + 1, 0),
  tap(c => div.textContent = c),
  observe
)

Try in Sandbox

💡scan() accumulates incoming data based on given accumulator. It is like .reduce() but for streams.