diff --git a/example_test.go b/example_test.go index 3683885..e2f15f6 100644 --- a/example_test.go +++ b/example_test.go @@ -407,6 +407,41 @@ func ExampleOrderedFilter() { printStream(evens) } +func ExampleFilterMap() { + numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) + + // Keep only odd numbers and square them + // Concurrency = 3; Unordered + squares := rill.FilterMap(numbers, 3, func(x int) (int, bool, error) { + if x%2 == 0 { + return 0, false, nil + } + + randomSleep(1000 * time.Millisecond) // simulate some additional work + return x * x, true, nil + }) + + printStream(squares) +} + +// The same example as for the [FilterMap], but using ordered versions of functions. +func ExampleOrderedFilterMap() { + numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) + + // Keep only odd numbers and square them + // Concurrency = 3; Ordered + squares := rill.OrderedFilterMap(numbers, 3, func(x int) (int, bool, error) { + if x%2 == 0 { + return 0, false, nil + } + + randomSleep(1000 * time.Millisecond) // simulate some additional work + return x * x, true, nil + }) + + printStream(squares) +} + func ExampleFirst() { numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) diff --git a/internal/core/reduce.go b/internal/core/reduce.go index 59f8cfe..c29dfbb 100644 --- a/internal/core/reduce.go +++ b/internal/core/reduce.go @@ -92,7 +92,7 @@ func MapReduce[A any, K comparable, V any](in <-chan A, nm int, mapper func(A) ( } // Phase 1: Map - mapped := MapOrFilter(in, nm, func(a A) (keyValue[K, V], bool) { + mapped := FilterMap(in, nm, func(a A) (keyValue[K, V], bool) { k, v := mapper(a) return keyValue[K, V]{k, v}, true }) diff --git a/internal/core/transform.go b/internal/core/transform.go index 02b5144..c97123b 100644 --- a/internal/core/transform.go +++ b/internal/core/transform.go @@ -1,6 +1,6 @@ package core -func MapOrFilter[A, B any](in <-chan A, n int, f func(A) (B, bool)) <-chan B { +func FilterMap[A, B any](in <-chan A, n int, f func(A) (B, bool)) <-chan B { if in == nil { return nil } @@ -17,7 +17,7 @@ func MapOrFilter[A, B any](in <-chan A, n int, f func(A) (B, bool)) <-chan B { return out } -func OrderedMapOrFilter[A, B any](in <-chan A, n int, f func(A) (B, bool)) <-chan B { +func OrderedFilterMap[A, B any](in <-chan A, n int, f func(A) (B, bool)) <-chan B { if in == nil { return nil } diff --git a/internal/core/transform_test.go b/internal/core/transform_test.go index bab021f..618aa3b 100644 --- a/internal/core/transform_test.go +++ b/internal/core/transform_test.go @@ -7,25 +7,25 @@ import ( "github.com/destel/rill/internal/th" ) -func universalMapOrFilter[A, B any](ord bool, in <-chan A, n int, f func(A) (B, bool)) <-chan B { +func universalFilterMap[A, B any](ord bool, in <-chan A, n int, f func(A) (B, bool)) <-chan B { if ord { - return OrderedMapOrFilter(in, n, f) + return OrderedFilterMap(in, n, f) } - return MapOrFilter(in, n, f) + return FilterMap(in, n, f) } -func TestMapOrFilter(t *testing.T) { +func TestFilterMap(t *testing.T) { th.TestBothOrderings(t, func(t *testing.T, ord bool) { for _, n := range []int{1, 5} { t.Run(th.Name("nil", n), func(t *testing.T) { - out := universalMapOrFilter(ord, nil, n, func(x int) (int, bool) { return x, true }) + out := universalFilterMap(ord, nil, n, func(x int) (int, bool) { return x, true }) th.ExpectValue(t, out, nil) }) t.Run(th.Name("correctness", n), func(t *testing.T) { in := th.FromRange(0, 20) - out := universalMapOrFilter(ord, in, n, func(x int) (string, bool) { + out := universalFilterMap(ord, in, n, func(x int) (string, bool) { return fmt.Sprintf("%03d", x), x%2 == 0 }) @@ -46,7 +46,7 @@ func TestMapOrFilter(t *testing.T) { t.Run(th.Name("ordering", n), func(t *testing.T) { in := th.FromRange(0, 20000) - out := universalMapOrFilter(ord, in, n, func(x int) (int, bool) { + out := universalFilterMap(ord, in, n, func(x int) (int, bool) { return x, x%2 == 0 }) diff --git a/transform.go b/transform.go index a06984a..68ad8af 100644 --- a/transform.go +++ b/transform.go @@ -12,7 +12,7 @@ import ( // // See the package documentation for more information on non-blocking unordered functions and error handling. func Map[A, B any](in <-chan Try[A], n int, f func(A) (B, error)) <-chan Try[B] { - return core.MapOrFilter(in, n, func(a Try[A]) (Try[B], bool) { + return core.FilterMap(in, n, func(a Try[A]) (Try[B], bool) { if a.Error != nil { return Try[B]{Error: a.Error}, true } @@ -28,7 +28,7 @@ func Map[A, B any](in <-chan Try[A], n int, f func(A) (B, error)) <-chan Try[B] // OrderedMap is the ordered version of [Map]. func OrderedMap[A, B any](in <-chan Try[A], n int, f func(A) (B, error)) <-chan Try[B] { - return core.OrderedMapOrFilter(in, n, func(a Try[A]) (Try[B], bool) { + return core.OrderedFilterMap(in, n, func(a Try[A]) (Try[B], bool) { if a.Error != nil { return Try[B]{Error: a.Error}, true } @@ -50,7 +50,7 @@ func OrderedMap[A, B any](in <-chan Try[A], n int, f func(A) (B, error)) <-chan // // See the package documentation for more information on non-blocking unordered functions and error handling. func Filter[A any](in <-chan Try[A], n int, f func(A) (bool, error)) <-chan Try[A] { - return core.MapOrFilter(in, n, func(a Try[A]) (Try[A], bool) { + return core.FilterMap(in, n, func(a Try[A]) (Try[A], bool) { if a.Error != nil { return a, true // never filter out errors } @@ -66,7 +66,7 @@ func Filter[A any](in <-chan Try[A], n int, f func(A) (bool, error)) <-chan Try[ // OrderedFilter is the ordered version of [Filter]. func OrderedFilter[A any](in <-chan Try[A], n int, f func(A) (bool, error)) <-chan Try[A] { - return core.OrderedMapOrFilter(in, n, func(a Try[A]) (Try[A], bool) { + return core.OrderedFilterMap(in, n, func(a Try[A]) (Try[A], bool) { if a.Error != nil { return a, true // never filter out errors } @@ -80,6 +80,45 @@ func OrderedFilter[A any](in <-chan Try[A], n int, f func(A) (bool, error)) <-ch }) } +// FilterMap takes a stream of items of type A, applies a function f that can filter and transform them into items of type B. +// Returns a new stream of transformed items that passed the filter. This operation is equivalent to a +// [Filter] followed by a [Map]. +// +// This is a non-blocking unordered function that processes items concurrently using n goroutines. +// An ordered version of this function, [OrderedFilterMap], is also available. +// +// See the package documentation for more information on non-blocking unordered functions and error handling. +func FilterMap[A, B any](in <-chan Try[A], n int, f func(A) (B, bool, error)) <-chan Try[B] { + return core.FilterMap(in, n, func(a Try[A]) (Try[B], bool) { + if a.Error != nil { + return Try[B]{Error: a.Error}, true + } + + b, keep, err := f(a.Value) + if err != nil { + return Try[B]{Error: err}, true + } + + return Try[B]{Value: b}, keep + }) +} + +// OrderedFilterMap is the ordered version of [FilterMap]. +func OrderedFilterMap[A, B any](in <-chan Try[A], n int, f func(A) (B, bool, error)) <-chan Try[B] { + return core.OrderedFilterMap(in, n, func(a Try[A]) (Try[B], bool) { + if a.Error != nil { + return Try[B]{Error: a.Error}, true + } + + b, keep, err := f(a.Value) + if err != nil { + return Try[B]{Error: err}, true + } + + return Try[B]{Value: b}, keep + }) +} + // FlatMap takes a stream of items of type A and transforms each item into a new sub-stream of items of type B using a function f. // Those sub-streams are then flattened into a single output stream, which is returned. // @@ -146,7 +185,7 @@ func OrderedFlatMap[A, B any](in <-chan Try[A], n int, f func(A) <-chan Try[B]) // // See the package documentation for more information on non-blocking unordered functions and error handling. func Catch[A any](in <-chan Try[A], n int, f func(error) error) <-chan Try[A] { - return core.MapOrFilter(in, n, func(a Try[A]) (Try[A], bool) { + return core.FilterMap(in, n, func(a Try[A]) (Try[A], bool) { if a.Error == nil { return a, true } @@ -162,7 +201,7 @@ func Catch[A any](in <-chan Try[A], n int, f func(error) error) <-chan Try[A] { // OrderedCatch is the ordered version of [Catch]. func OrderedCatch[A any](in <-chan Try[A], n int, f func(error) error) <-chan Try[A] { - return core.OrderedMapOrFilter(in, n, func(a Try[A]) (Try[A], bool) { + return core.OrderedFilterMap(in, n, func(a Try[A]) (Try[A], bool) { if a.Error == nil { return a, true } diff --git a/transform_test.go b/transform_test.go index 2f7f59e..30e286a 100644 --- a/transform_test.go +++ b/transform_test.go @@ -161,6 +161,85 @@ func TestFilter(t *testing.T) { }) } +func universalFilterMap[A, B any](ord bool, in <-chan Try[A], n int, f func(A) (B, bool, error)) <-chan Try[B] { + if ord { + return OrderedFilterMap(in, n, f) + } + return FilterMap(in, n, f) +} + +func TestFilterMap(t *testing.T) { + th.TestBothOrderings(t, func(t *testing.T, ord bool) { + for _, n := range []int{1, 5} { + + t.Run(th.Name("nil", n), func(t *testing.T) { + out := universalFilterMap(ord, nil, n, func(x int) (int, bool, error) { return x, true, nil }) + th.ExpectValue(t, out, nil) + }) + + t.Run(th.Name("correctness", n), func(t *testing.T) { + in := FromChan(th.FromRange(0, 20), nil) + in = replaceWithError(in, 15, fmt.Errorf("err15")) + + out := universalFilterMap(ord, in, n, func(x int) (string, bool, error) { + if x == 5 { + return "", false, fmt.Errorf("err05") + } + if x == 6 { + return "", true, fmt.Errorf("err06") + } + + return fmt.Sprintf("%03d", x), x%2 == 0, nil + }) + + outSlice, errSlice := toSliceAndErrors(out) + + expectedSlice := make([]string, 0, 20) + for i := 0; i < 20; i++ { + if i == 5 || i == 6 || i == 15 || i%2 == 1 { + continue + } + expectedSlice = append(expectedSlice, fmt.Sprintf("%03d", i)) + } + + sort.Strings(outSlice) + sort.Strings(errSlice) + + th.ExpectSlice(t, outSlice, expectedSlice) + th.ExpectSlice(t, errSlice, []string{"err05", "err06", "err15"}) + }) + + t.Run(th.Name("ordering", n), func(t *testing.T) { + in := FromChan(th.FromRange(0, 20000), nil) + + out := universalFilterMap(ord, in, n, func(x int) (int, bool, error) { + switch x % 3 { + case 2: + return x, false, fmt.Errorf("err%06d", x) + case 1: + return x, false, nil + default: + return x, true, nil + + } + }) + + outSlice, errSlice := toSliceAndErrors(out) + + if ord || n == 1 { + th.ExpectSorted(t, outSlice) + th.ExpectSorted(t, errSlice) + } else { + th.ExpectUnsorted(t, outSlice) + th.ExpectUnsorted(t, errSlice) + } + + }) + + } + }) +} + func universalFlatMap[A, B any](ord bool, in <-chan Try[A], n int, f func(A) <-chan Try[B]) <-chan Try[B] { if ord { return OrderedFlatMap(in, n, f)