Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add FIlterMap function #30

Merged
merged 4 commits into from
Jul 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,41 @@ func ExampleOrderedFilter() {
printStream(evens)
}

func ExampleFilterMap() {
numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)

// Keep only odd numbers and square them
// Concurrency = 3; Unordered
squares := rill.FilterMap(numbers, 3, func(x int) (int, bool, error) {
if x%2 == 0 {
return 0, false, nil
}

randomSleep(1000 * time.Millisecond) // simulate some additional work
return x * x, true, nil
})

printStream(squares)
}

// The same example as for the [FilterMap], but using ordered versions of functions.
func ExampleOrderedFilterMap() {
numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)

// Keep only odd numbers and square them
// Concurrency = 3; Ordered
squares := rill.OrderedFilterMap(numbers, 3, func(x int) (int, bool, error) {
if x%2 == 0 {
return 0, false, nil
}

randomSleep(1000 * time.Millisecond) // simulate some additional work
return x * x, true, nil
})

printStream(squares)
}

func ExampleFirst() {
numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)

Expand Down
2 changes: 1 addition & 1 deletion internal/core/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func MapReduce[A any, K comparable, V any](in <-chan A, nm int, mapper func(A) (
}

// Phase 1: Map
mapped := MapOrFilter(in, nm, func(a A) (keyValue[K, V], bool) {
mapped := FilterMap(in, nm, func(a A) (keyValue[K, V], bool) {
k, v := mapper(a)
return keyValue[K, V]{k, v}, true
})
Expand Down
4 changes: 2 additions & 2 deletions internal/core/transform.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package core

func MapOrFilter[A, B any](in <-chan A, n int, f func(A) (B, bool)) <-chan B {
func FilterMap[A, B any](in <-chan A, n int, f func(A) (B, bool)) <-chan B {
if in == nil {
return nil
}
Expand All @@ -17,7 +17,7 @@ func MapOrFilter[A, B any](in <-chan A, n int, f func(A) (B, bool)) <-chan B {
return out
}

func OrderedMapOrFilter[A, B any](in <-chan A, n int, f func(A) (B, bool)) <-chan B {
func OrderedFilterMap[A, B any](in <-chan A, n int, f func(A) (B, bool)) <-chan B {
if in == nil {
return nil
}
Expand Down
14 changes: 7 additions & 7 deletions internal/core/transform_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,25 @@ import (
"github.com/destel/rill/internal/th"
)

func universalMapOrFilter[A, B any](ord bool, in <-chan A, n int, f func(A) (B, bool)) <-chan B {
func universalFilterMap[A, B any](ord bool, in <-chan A, n int, f func(A) (B, bool)) <-chan B {
if ord {
return OrderedMapOrFilter(in, n, f)
return OrderedFilterMap(in, n, f)
}
return MapOrFilter(in, n, f)
return FilterMap(in, n, f)
}

func TestMapOrFilter(t *testing.T) {
func TestFilterMap(t *testing.T) {
th.TestBothOrderings(t, func(t *testing.T, ord bool) {
for _, n := range []int{1, 5} {

t.Run(th.Name("nil", n), func(t *testing.T) {
out := universalMapOrFilter(ord, nil, n, func(x int) (int, bool) { return x, true })
out := universalFilterMap(ord, nil, n, func(x int) (int, bool) { return x, true })
th.ExpectValue(t, out, nil)
})

t.Run(th.Name("correctness", n), func(t *testing.T) {
in := th.FromRange(0, 20)
out := universalMapOrFilter(ord, in, n, func(x int) (string, bool) {
out := universalFilterMap(ord, in, n, func(x int) (string, bool) {
return fmt.Sprintf("%03d", x), x%2 == 0
})

Expand All @@ -46,7 +46,7 @@ func TestMapOrFilter(t *testing.T) {
t.Run(th.Name("ordering", n), func(t *testing.T) {
in := th.FromRange(0, 20000)

out := universalMapOrFilter(ord, in, n, func(x int) (int, bool) {
out := universalFilterMap(ord, in, n, func(x int) (int, bool) {
return x, x%2 == 0
})

Expand Down
51 changes: 45 additions & 6 deletions transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
//
// See the package documentation for more information on non-blocking unordered functions and error handling.
func Map[A, B any](in <-chan Try[A], n int, f func(A) (B, error)) <-chan Try[B] {
return core.MapOrFilter(in, n, func(a Try[A]) (Try[B], bool) {
return core.FilterMap(in, n, func(a Try[A]) (Try[B], bool) {
if a.Error != nil {
return Try[B]{Error: a.Error}, true
}
Expand All @@ -28,7 +28,7 @@ func Map[A, B any](in <-chan Try[A], n int, f func(A) (B, error)) <-chan Try[B]

// OrderedMap is the ordered version of [Map].
func OrderedMap[A, B any](in <-chan Try[A], n int, f func(A) (B, error)) <-chan Try[B] {
return core.OrderedMapOrFilter(in, n, func(a Try[A]) (Try[B], bool) {
return core.OrderedFilterMap(in, n, func(a Try[A]) (Try[B], bool) {
if a.Error != nil {
return Try[B]{Error: a.Error}, true
}
Expand All @@ -50,7 +50,7 @@ func OrderedMap[A, B any](in <-chan Try[A], n int, f func(A) (B, error)) <-chan
//
// See the package documentation for more information on non-blocking unordered functions and error handling.
func Filter[A any](in <-chan Try[A], n int, f func(A) (bool, error)) <-chan Try[A] {
return core.MapOrFilter(in, n, func(a Try[A]) (Try[A], bool) {
return core.FilterMap(in, n, func(a Try[A]) (Try[A], bool) {
if a.Error != nil {
return a, true // never filter out errors
}
Expand All @@ -66,7 +66,7 @@ func Filter[A any](in <-chan Try[A], n int, f func(A) (bool, error)) <-chan Try[

// OrderedFilter is the ordered version of [Filter].
func OrderedFilter[A any](in <-chan Try[A], n int, f func(A) (bool, error)) <-chan Try[A] {
return core.OrderedMapOrFilter(in, n, func(a Try[A]) (Try[A], bool) {
return core.OrderedFilterMap(in, n, func(a Try[A]) (Try[A], bool) {
if a.Error != nil {
return a, true // never filter out errors
}
Expand All @@ -80,6 +80,45 @@ func OrderedFilter[A any](in <-chan Try[A], n int, f func(A) (bool, error)) <-ch
})
}

// FilterMap takes a stream of items of type A, applies a function f that can filter and transform them into items of type B.
// Returns a new stream of transformed items that passed the filter. This operation is equivalent to a
// [Filter] followed by a [Map].
//
// This is a non-blocking unordered function that processes items concurrently using n goroutines.
// An ordered version of this function, [OrderedFilterMap], is also available.
//
// See the package documentation for more information on non-blocking unordered functions and error handling.
func FilterMap[A, B any](in <-chan Try[A], n int, f func(A) (B, bool, error)) <-chan Try[B] {
return core.FilterMap(in, n, func(a Try[A]) (Try[B], bool) {
if a.Error != nil {
return Try[B]{Error: a.Error}, true
}

b, keep, err := f(a.Value)
if err != nil {
return Try[B]{Error: err}, true
}

return Try[B]{Value: b}, keep
})
}

// OrderedFilterMap is the ordered version of [FilterMap].
func OrderedFilterMap[A, B any](in <-chan Try[A], n int, f func(A) (B, bool, error)) <-chan Try[B] {
return core.OrderedFilterMap(in, n, func(a Try[A]) (Try[B], bool) {
if a.Error != nil {
return Try[B]{Error: a.Error}, true
}

b, keep, err := f(a.Value)
if err != nil {
return Try[B]{Error: err}, true
}

return Try[B]{Value: b}, keep
})
}

// FlatMap takes a stream of items of type A and transforms each item into a new sub-stream of items of type B using a function f.
// Those sub-streams are then flattened into a single output stream, which is returned.
//
Expand Down Expand Up @@ -146,7 +185,7 @@ func OrderedFlatMap[A, B any](in <-chan Try[A], n int, f func(A) <-chan Try[B])
//
// See the package documentation for more information on non-blocking unordered functions and error handling.
func Catch[A any](in <-chan Try[A], n int, f func(error) error) <-chan Try[A] {
return core.MapOrFilter(in, n, func(a Try[A]) (Try[A], bool) {
return core.FilterMap(in, n, func(a Try[A]) (Try[A], bool) {
if a.Error == nil {
return a, true
}
Expand All @@ -162,7 +201,7 @@ func Catch[A any](in <-chan Try[A], n int, f func(error) error) <-chan Try[A] {

// OrderedCatch is the ordered version of [Catch].
func OrderedCatch[A any](in <-chan Try[A], n int, f func(error) error) <-chan Try[A] {
return core.OrderedMapOrFilter(in, n, func(a Try[A]) (Try[A], bool) {
return core.OrderedFilterMap(in, n, func(a Try[A]) (Try[A], bool) {
if a.Error == nil {
return a, true
}
Expand Down
79 changes: 79 additions & 0 deletions transform_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,85 @@ func TestFilter(t *testing.T) {
})
}

func universalFilterMap[A, B any](ord bool, in <-chan Try[A], n int, f func(A) (B, bool, error)) <-chan Try[B] {
if ord {
return OrderedFilterMap(in, n, f)
}
return FilterMap(in, n, f)
}

func TestFilterMap(t *testing.T) {
th.TestBothOrderings(t, func(t *testing.T, ord bool) {
for _, n := range []int{1, 5} {

t.Run(th.Name("nil", n), func(t *testing.T) {
out := universalFilterMap(ord, nil, n, func(x int) (int, bool, error) { return x, true, nil })
th.ExpectValue(t, out, nil)
})

t.Run(th.Name("correctness", n), func(t *testing.T) {
in := FromChan(th.FromRange(0, 20), nil)
in = replaceWithError(in, 15, fmt.Errorf("err15"))

out := universalFilterMap(ord, in, n, func(x int) (string, bool, error) {
if x == 5 {
return "", false, fmt.Errorf("err05")
}
if x == 6 {
return "", true, fmt.Errorf("err06")
}

return fmt.Sprintf("%03d", x), x%2 == 0, nil
})

outSlice, errSlice := toSliceAndErrors(out)

expectedSlice := make([]string, 0, 20)
for i := 0; i < 20; i++ {
if i == 5 || i == 6 || i == 15 || i%2 == 1 {
continue
}
expectedSlice = append(expectedSlice, fmt.Sprintf("%03d", i))
}

sort.Strings(outSlice)
sort.Strings(errSlice)

th.ExpectSlice(t, outSlice, expectedSlice)
th.ExpectSlice(t, errSlice, []string{"err05", "err06", "err15"})
})

t.Run(th.Name("ordering", n), func(t *testing.T) {
in := FromChan(th.FromRange(0, 20000), nil)

out := universalFilterMap(ord, in, n, func(x int) (int, bool, error) {
switch x % 3 {
case 2:
return x, false, fmt.Errorf("err%06d", x)
case 1:
return x, false, nil
default:
return x, true, nil

}
})

outSlice, errSlice := toSliceAndErrors(out)

if ord || n == 1 {
th.ExpectSorted(t, outSlice)
th.ExpectSorted(t, errSlice)
} else {
th.ExpectUnsorted(t, outSlice)
th.ExpectUnsorted(t, errSlice)
}

})

}
})
}

func universalFlatMap[A, B any](ord bool, in <-chan Try[A], n int, f func(A) <-chan Try[B]) <-chan Try[B] {
if ord {
return OrderedFlatMap(in, n, f)
Expand Down
Loading