Most of the operators accept a list of functional options to impact the Observable behaviour.
As an example:
observable.Map(func(_ context.Context, i interface{}) (interface{}, error) {
return i.(int) * 10, nil
}, rxgo.WithContext(),
rxgo.WithCPUPool(),
rxgo.WithBufferedChannel(1))
Configure the capacity of the output channel.
rxgo.WithBufferedChannel(1) // Create a buffered channel with a 1 capacity
Allows passing a context. The Observable will listen to its done signal to close itself.
rxgo.WithContext(ctx)
- Lazy (default): consume when an Observer starts to subscribe.
rxgo.WithObservationStrategy(rxgo.Lazy)
- Eager: consumer when the Observable is created:
rxgo.WithObservationStrategy(rxgo.Eager)
- StopOnError (default): stop processing if the Observable produces an error.
rxgo.WithErrorStrategy(rxgo.StopOnError)
- ContinueOnError: continue processing items if the Observable produces an error.
rxgo.WithErrorStrategy(rxgo.ContinueOnError)
This strategy is propagated to the parent(s) Observable(s).
Convert the operator in a parallel operator and specify the number of concurrent goroutines.
rxgo.WithPool(8) // Creates a pool of 8 goroutines
Convert the operator in a parallel operator and specify the number of concurrent goroutines as runtime.NumCPU()
.
rxgo.WithCPUPool()
Force an Observable to produce items sequentially.
rxgo.Serialize()
This option should be used in coordination with rxgo.WithPool(n)
or rxgo.WithCPUPool()
.
Create a Connectable Observable.
rxgo.WithPublishStrategy()
This option is propagated to the parent(s) Observable(s).