Skip to content

Custom Transform: Merge All

Eugene Ghanizadeh edited this page Oct 28, 2021 · 4 revisions

In this example, we want to create an alternative to flatten() that maintains a connection to all inner sources (until they end), instead of maintaining only one connection. It is a an equivalent to RxJS's mergeAll() operator, and combined with map() would behave like RxJS's mergeMap().

              a:  --1---2----3------4---5----6----7-8-|
              b:  ----x---y-----z----|
              c:  ---@---*--------$-------#-----$---@----&-|
          outer:  -a---b---c----------------------|
mergeAll(outer):  --1---2-y--3--z-$-4---5-#--6--$---@----&-|

To get started, we can outline our transform:

export const mergeAll = src => {
  // TODO
}

💡 Note that we don't need to use transform() utility here, since our transform only takes one argument and doesn't need currying.


As with the case of collect() transform, we will use an intermediate sink and source between the original sink and source of the stream to add the modified behavior:

import { source, sink } from 'streamlets'

export const mergeAll = src => source(snk => {
  src.connect(sink({

    greet(tb) {
      // this will be called when the outer source wants to initialize
      // the process and provide the sink a talkback
    },

    receive(inner) {
      // this will be called with newly emitted inner sources
    },

    end(reason) {
      // this will be called when the outer source ends
      // the stream, potentially due to some error.
    }
  })
})

The simplest way of flattening all inner sources into one is to just pass the talkback provided by the outer source directly to the sink:

    greet(tb) {
      snk.greet(tb)
    },

And connect a proxy sink to every inner source, routing the incoming values to the actual sink:

    receive(inner) {
      inner.connect(sink({
        greet(tb) {
          tb.start()
        },

        receive(t) {
          snk.receive(t)
        },

        end(reason) {
          // TODO: handle errors / end of stream
        }
      })
    }

Which, put together, gives us the baseline mergeAll() transform:

import { source, sink } from 'streamlets'


export const mergeAll = src => source(snk => {
  src.connect(sink({
    greet(tb) {
      snk.greet(tb)
    },

    receive(inner) {
      inner.connect(sink({
        greet(tb) {
          tb.start()
        },

        receive(t) {
          snk.receive(t)
        },

        end(reason) {
          // TODO
        }
      })
    },

    end(reason) {
      // TODO
    }
  })
})

☝️ This bare-bones version would work with listenable sources properly, however it lacks any error-handling as it stands. What should be done when an inner source errors? What should be done when the outer source ends but some inner sources are still going?


Error Handling & End of Stream

First step in adding error handling would be handling ending or error of the outer source. When the outer source ends without an error, it should close the stream, also stopping all connected inner sources. For this purpose, we would need to maintain a list of inner talkbacks first (so that we would be able to stop them):

export const mergeAll = src => source(snk => {
  let innerTbs = []

  src.connect({
    // ...

    receive(inner) {
      let tb

      inner.connect(sink({
        greet(_tb) {
          innerTbs.push(tb = _tb)
          tb.start()
        },

        // ...

        end(reason) {
          innerTbs.splice(innerTbs.indexOf(tb), 1)
        }
      })
    },

    // ...
  })
})

Now we can end the inner sources when the outer source errors:

export const mergeAll = src => source(snk => {
  let innerTbs = []

  src.connect({
    // ...

    end(reason) {
      if (reason) {
        innerTbs.forEach(tb => tb.stop(reason))
        snk.end(reason)
      }
    }
  })
})

If the outer source doesn't end with an error, we should wait for all inner sources to end before ending the stream. This means we should maintain a state whether the outer source has ended or not:

export const mergeAll = src => source(snk => {
  let innerTbs = []
  let ended = false

  src.connect({
    // ...

    end(reason) {
      if (reason) {
        innerTbs.forEach(tb => tb.stop(reason))
        snk.end(reason)
      } else {
        ended = true
        if (innerTbs.length === 0) {
          snk.end()
        }
      }
    }
  })
})

Now every inner source should also check if the outer source has ended and if it is the last remaining inner source, and if so, it should end the stream:

export const mergeAll = src => source(snk => {
  let innerTbs = []
  let ended = false

  src.connect({
    // ...

    receive(inner) {
      let tb

      inner.connect(sink({
        // ...

        end(reason) {
          innerTbs.splice(innerTbs.indexOf(tb), 1)
          if (innerTbs.length === 0 && ended) {
            snk.end(reason)
          }
        }
      })
    },

    // ...
  })
})

Note that with our current implementation, if an inner source ends with an error, the error is handled (passed down to the sink) only if the inner source was the last remaining inner source. Ideally, when an inner source errors, we should stop all other inner sources and the outer source, and raise the error (to the sink) right away. This means we need to also hold a reference to the talkback the outer source provides:

export const mergeAll = src => source(snk => {
  let innerTbs = []
  let outerTb
  let ended = false

  src.connect({
    greet(tb) {
      outerTb = tb
      snk.greet(tb)
    },

    // ...
  })
})

Now we can conduct proper error handling for inner sources as well:

export const mergeAll = src => source(snk => {
  let innerTbs = []
  let outerTb
  let ended = false

  src.connect({
    // ...

    receive(inner) {
      let tb

      inner.connect(sink({
        // ...

        end(reason) {
          if (reason) {
            innerTbs.forEach(t => t !== tb && t.stop(reason))
            outerTb.stop(reason)
            snk.end(reason)
          } else {
            innerTbs.splice(innerTbs.indexOf(tb), 1)
            if (innerTbs.length === 0 && ended) {
              snk.end(reason)
            }
          }
        }
      })
    },

    // ...
  })
})

Putting it all together, we get a `mergeAll()` transform for listenable sources which properly handles errors / end of stream events:
export const mergeAll = src => source(snk => {
  let innerTbs = []
  let outerTb
  let ended = false

  src.connect({
    greet(tb) {
      outerTb = tb
      snk.greet(tb)
    },

    receive(inner) {
      let tb

      inner.connect(sink({
        greet(_tb) {
          innerTbs.push(tb = _tb)
          tb.start()
        },

        receive(t) {
          snk.receive(t)
        },

        end(reason) {
          if (reason) {
            innerTbs.forEach(t => t !== tb && t.stop(reason))
            outerTb.stop(reason)
            snk.end(reason)
          } else {
            innerTbs.splice(innerTbs.indexOf(tb), 1)
            if (innerTbs.length === 0 && ended) {
              snk.end(reason)
            }
          }
        }
      })
    },

    end(reason) {
      if (reason) {
        innerTbs.forEach(tb => tb.stop(reason))
        snk.end(reason)
      } else {
        ended = true
        if (innerTbs.length === 0) {
          snk.end()
        }
      }
    }
  })
})

Try in Sandbox

💡 Note that the version of mergeAll() provided in the playground has some additional code for handling pullable sources, as well as handling pause / resume actions.