Skip to content

Commit

Permalink
Doc improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
destel committed Mar 20, 2024
1 parent 3c6505e commit 6b54a96
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 26 deletions.
157 changes: 142 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
# Rill
# Rill [![GoDoc](https://pkg.go.dev/badge/github.com/destel/rill)](https://pkg.go.dev/github.com/destel/rill)
Rill (noun: a small stream) is a comprehensive Go toolkit for streaming, parallel processing, and pipeline construction.
Designed to reduce boilerplate and simplify usage, it empowers developers to focus on core logic
without getting bogged down by the complexity of concurrency.



## Key features
- **Lightweight**: fast and modular, can be easily integrated into existing projects
- **Easy to use**: the complexity of managing goroutines, wait groups, and error handling is abstracted away
Expand All @@ -16,16 +17,19 @@ without getting bogged down by the complexity of concurrency.
- **Generic**: all operations are type-safe and can be used with any data type
- **Functional Programming**: based on functional programming concepts, making operations like map, filter, flatMap and others available for channel-based workflows



## Installation
```bash
go get github.com/destel/rill
```

## Example
A function that fetches keys from multiple URLs, retrieves their values from a Redis database, and prints them.
## Example usage
Consider function that fetches keys from multiple URLs, retrieves their values from a Redis database, and prints them.
This example demonstrates the library's strengths in handling concurrent tasks, error propagation, batching and data streaming,
all while maintaining simplicity and efficiency.
See full runnable example at examples/redis-read/main.go

See a full runnable example at examples/redis-read

```go
type KV struct {
Expand Down Expand Up @@ -103,6 +107,18 @@ func streamLines(ctx context.Context, url string) <-chan rill.Try[string] {
```



## Testing strategy
Rill has a test coverage of over 95%, with testing focused on:
- **Correctness**: ensuring that functions produce accurate results at different levels of concurrency
- **Concurrency**: confirming that correct number of goroutines are spawned and utilized
- **Ordering**: ensuring that ordered versions of the functions preserve the order, while basic versions do not






## Design philosophy
At the heart of rill lies a simple yet powerful concept: operating on channels of wrapped values, encapsulated by the Try structure.
Such channels can be created manually or through utilities like **WrapSlice** or **WrapChan**, and then transformed via operations
Expand All @@ -111,6 +127,7 @@ such as **Map**, **Filter**, **FlatMap** and others. Finally when all processing




## Batching
Batching is a common pattern in concurrent processing, especially when dealing with external services or databases.
Rill provides a Batch function that organizes a stream of items into batches of a specified size. It's also possible
Expand All @@ -120,7 +137,6 @@ when input stream is slow or sparse.




## Error handling
In the examples above errors are handled using **ForEach**, which is good for most use cases.
**ForEach** stops processing on the first error and returns it. If you need to handle error in the middle of pipeline,
Expand All @@ -145,12 +161,123 @@ err := rill.ForEach(results, 1, func(item int) error {
```





## Termination and Resource Leaks
In Go concurrent applications, if there are no readers for a channel, writers can become stuck,
leading to potential goroutine and memory leaks. This issue extends to rill pipelines, which are built on Go channels;
if any stage in a pipeline lacks a consumer, the whole chain of producers upstream may become blocked.
Therefore, it's vital to ensure that pipelines are fully consumed, especially in cases where errors lead to early termination.
The example below demonstrates a situation where the final processing stage exits upon the first encountered error,
risking a blocked pipeline state.

```go
func doWork(ctx context.Context) error {
// Initialize the first stage of the pipeline
ids := streamIDs(ctx)

// Define other pipeline stages...

// Final stage processing
for value := range results {
// Process value...
if someCondition {
return fmt.Errorf("some error") // Early exit on error
}
}
return nil
}
```

To prevent such issues, it's advisable to ensure the results channel is drained in the event of an error.
A straightforward approach is to use defer to invoke **DrainNB**:

```go
func doWork(ctx context.Context) error {
// Initialize the first stage of the pipeline
ids := streamIDs(ctx)

// Define other pipeline stages...

// Ensure pipeline is drained in case of failure
defer rill.DrainNB(results)

// Final stage processing
for value := range results {
// Process value...
if someCondition {
return fmt.Errorf("some error") // Early exit on error
}
}
return nil
}
```

Utilizing functions like **ForEach** or **UnwrapToSlice**, which incorporate built-in draining mechanisms, can simplify
the code and enhance readability:

```go
func doWork(ctx context.Context) error {
// Initialize the first stage of the pipeline
ids := streamIDs(ctx)

// Define other pipeline stages...

// Use ForEach for final stage processing, which includes error handling and automatic draining
return rill.ForEach(results, 5, func(value string) error {
// Process value...
if someCondition {
return fmt.Errorf("some error") // Early exit on error, with automatic draining
}
return nil
})
}
```

While these measures are effective in preventing leaks, the pipeline may continue to operate in the background as long
as the initial stage produces values. A best practice is to manage the first stage (and potentially others) with a context,
allowing for a controlled shutdown:

```go
func doWork(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel() // Ensures first stage is cancelled upon function exit

// Initialize the first stage of the pipeline
ids := streamIDs(ctx)

// Define other pipeline stages...

// Use ForEach for final stage processing, which includes error handling and automatic draining
return rill.ForEach(results, 5, func(value string) error {
// Process value
if someCondition {
return fmt.Errorf("some error") // Early exit on error, with automatic draining
}
return nil
})
}
```





## Order preservation
There are use cases where it's necessary to preserve the original order of data, while still allowing for concurrent processing.
Below is an example function that fetches temperature measurements for each day in a specified range
and prints temperature movements for each day. OrderedMap function fetches measurements in parallel, but returns them in chronological order.
This allows the next stage of processing to calculate temperature differences between consecutive days.
See full runnable example at examples/weather/main.go
In concurrent environments, maintaining the original sequence of processed items is challenging due to the nature of parallel execution.
When values are read from an input channel, processed through a function **f**, and written to an output channel, their order might not
mirror the input sequence. To address this, rill provides ordered versions of its core functions, such as **OrderedMap**, **OrderedFilter**,
and others. These ensure that if value **x** precedes value **y** in the input channel, then **f(x)** will precede **f(y)** in the output,
preserving the original order. It's important to note that these ordered functions incur a small overhead compared to their unordered counterparts,
due to the additional logic required to maintain order.

Order preservation is vital in scenarios where the sequence of data impacts the outcome. Take, for instance, a function that retrieves
daily temperature measurements over a specific period and calculates the change in temperature from one day to the next.
Although fetching the data in parallel boosts efficiency, processing it in the original order is crucial for
accurate computation of temperature variations.

See a full runnable example at examples/weather

```go
type Measurement struct {
Expand All @@ -159,7 +286,7 @@ type Measurement struct {
Movement float64
}

func printTemperatureMovements(ctx context.Context, city string, startDate, endDate time.Time) error {
func printTemperatureChanges(ctx context.Context, city string, startDate, endDate time.Time) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel() // In case of error, this ensures all pending operations are canceled

Expand All @@ -178,17 +305,17 @@ func printTemperatureMovements(ctx context.Context, city string, startDate, endD
return Measurement{Date: date, Temp: temp}, err
})

// Calculate the temperature movements. Use a single goroutine
// Calculate the temperature changes. Use a single goroutine
prev := Measurement{Temp: math.NaN()}
measurements = rill.OrderedMap(measurements, 1, func(m Measurement) (Measurement, error) {
m.Movement = m.Temp - prev.Temp
m.Change = m.Temp - prev.Temp
prev = m
return m, nil
})

// Iterate over the measurements and print the movements
// Iterate over the measurements and print the results
err := rill.ForEach(measurements, 1, func(m Measurement) error {
fmt.Printf("%s: %.1f°C (movement %+.1f°C)\n", m.Date.Format("2006-01-02"), m.Temp, m.Movement)
fmt.Printf("%s: %.1f°C (change %+.1f°C)\n", m.Date.Format("2006-01-02"), m.Temp, m.Change)
prev = m
return nil
})
Expand Down
22 changes: 11 additions & 11 deletions examples/weather/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,25 @@ import (
)

type Measurement struct {
Date time.Time
Temp float64
Movement float64
Date time.Time
Temp float64
Change float64
}

func main() {
endDate := time.Now()
startDate := endDate.AddDate(0, 0, -30)

err := printTemperatureMovements(context.Background(), "New York", startDate, endDate)
err := printTemperatureChanges(context.Background(), "New York", startDate, endDate)
if err != nil {
fmt.Println("Error:", err)
}
}

// printTemperatureMovements orchestrates a pipeline that fetches temperature measurements for a given city and
// prints the daily temperature movements. Measurements are fetched concurrently, but the movements are calculated
// printTemperatureChanges orchestrates a pipeline that fetches temperature measurements for a given city and
// prints the daily temperature changes. Measurements are fetched concurrently, but the changes are calculated
// in order, using a single goroutine.
func printTemperatureMovements(ctx context.Context, city string, startDate, endDate time.Time) error {
func printTemperatureChanges(ctx context.Context, city string, startDate, endDate time.Time) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel() // In case of error, this ensures all pending operations are canceled

Expand All @@ -48,17 +48,17 @@ func printTemperatureMovements(ctx context.Context, city string, startDate, endD
return Measurement{Date: date, Temp: temp}, err
})

// Calculate the temperature movements. Use a single goroutine
// Calculate the temperature changes. Use a single goroutine
prev := Measurement{Temp: math.NaN()}
measurements = rill.OrderedMap(measurements, 1, func(m Measurement) (Measurement, error) {
m.Movement = m.Temp - prev.Temp
m.Change = m.Temp - prev.Temp
prev = m
return m, nil
})

// Iterate over the measurements and print the movements
// Iterate over the measurements and print the results
err := rill.ForEach(measurements, 1, func(m Measurement) error {
fmt.Printf("%s: %.1f°C (movement %+.1f°C)\n", m.Date.Format("2006-01-02"), m.Temp, m.Movement)
fmt.Printf("%s: %.1f°C (change %+.1f°C)\n", m.Date.Format("2006-01-02"), m.Temp, m.Change)
prev = m
return nil
})
Expand Down

0 comments on commit 6b54a96

Please sign in to comment.