Skip to content
Eugene Ghanizadeh edited this page Oct 21, 2021 · 6 revisions
function of<T>(...values: T[]): Source<T>

Emits given values synchronously, until the sink stops the stream or the values exhaust. Will end the stream when the list of provided values exhausts.

import { pipe, of, tap, observe } from 'streamlets'

pipe(
  of(1, 2, 3, 4),
  tap(console.log),
  observe,
)

// > 1
// > 2
// > 3
// > 4

of() can be paused and resumed:

Sink Code
import { sink } from 'streamlets'

// this is a custom sink that pauses the stream for 3 seconds
// after it receives first two values.
export const pauser = () => {
  let count = 0
  let talkback

  return sink({
    greet: t => (talkback = t).start(),
    receive: () => {
      if (++count === 2) {
        talkback.stop()
        setTimeout(() => talkback.start(), 3000)
      }
    }
  })
}
import { of, pipe, connect, tap } from 'streamlets'
import { pauser } from './pauser.js'

pipe(
  of(1, 2, 3, 4),
  tap(console.log),
  connect(pauser())
)

// > 1
// > 2
// ----> a pause for 3 seconds here
// > 3
// > 4

Try in Sandbox