Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

break: refreshable/v2 with Generic type handling #252

Merged
merged 15 commits into from
Aug 10, 2023
100 changes: 100 additions & 0 deletions refreshable/async.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright (c) 2022 Palantir Technologies. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package refreshable

import (
"context"
"time"
)

type ready[T any] struct {
in Updatable[T]
readyC <-chan struct{}
cancel context.CancelFunc
}

func newReady[T any](in Updatable[T]) *ready[T] {
ctx, cancel := context.WithCancel(context.Background())
return &ready[T]{
in: in,
readyC: ctx.Done(),
cancel: cancel,
}
}

func (r *ready[T]) Current() T {
return r.in.Current()
}

func (r *ready[T]) Subscribe(consumer func(T)) UnsubscribeFunc {
return r.in.Subscribe(consumer)
}

func (r *ready[T]) ReadyC() <-chan struct{} {
return r.readyC
}

func (r *ready[T]) Update(val T) {
r.cancel()
r.in.Update(val)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the order here be reversed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, fixed, thanks!

}

// NewFromChannel populates an Updatable with the values channel.
// If an element is already available, the returned Value is guaranteed to be populated.
// The channel should be closed when no longer used to avoid leaking resources.
func NewFromChannel[T any](values <-chan T) Ready[T] {
out := newReady(newZero[T]())
select {
case initial, ok := <-values:
if !ok {
return out // channel already closed
}
out.Update(initial)
default:
}
go func() {
for value := range values {
out.Update(value)
}
}()
return out
}

// NewFromTickerFunc returns a Ready Refreshable populated by the result of the provider called each interval.
// If the providers bool return is false, the value is ignored.
// The result's ReadyC channel is closed when a new value is populated.
func NewFromTickerFunc[T any](interval time.Duration, provider func() (T, bool)) (Ready[T], UnsubscribeFunc) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we pass context into the provider function so that clients can make use of it (e.g. for logging unexpected behaviour)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!

out := newReady(newZero[T]())
ctx, cancel := context.WithCancel(context.Background())
values := make(chan T)
go func() {
ticker := time.NewTicker(interval)
defer ticker.Stop()
defer close(values)
for {
if value, ok := provider(); ok {
out.Update(value)
}
select {
case <-ticker.C:
continue
case <-ctx.Done():
return
}
}
}()
return out, UnsubscribeFunc(cancel)
}

// Wait waits until the Ready has a current value or the context expires.
func Wait[T any](ctx context.Context, ready Ready[T]) (T, bool) {
select {
case <-ready.ReadyC():
return ready.Current(), true
case <-ctx.Done():
var zero T
return zero, false
}
}
2 changes: 1 addition & 1 deletion refreshable/go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module github.com/palantir/pkg/refreshable
module github.com/palantir/pkg/refreshable/v2

go 1.19

Expand Down
91 changes: 84 additions & 7 deletions refreshable/refreshable.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,91 @@

package refreshable

type Refreshable interface {
import (
"context"
)

// A Refreshable is a generic container type for a volatile underlying value.
// It supports atomic access and user-provided callback "subscriptions" on updates.
type Refreshable[T any] interface {
ravi-braj marked this conversation as resolved.
Show resolved Hide resolved
// Current returns the most recent value of this Refreshable.
Current() interface{}
// If the value has not been initialized, returns T's zero value.
Current() T

// Subscribe calls the consumer function when Value updates until stop is closed.
// The consumer must be relatively fast: Updatable.Set blocks until all subscribers have returned.
// Expensive or error-prone responses to refreshed values should be asynchronous.
// Updates considered no-ops by reflect.DeepEqual may be skipped.
Subscribe(consumer func(T)) UnsubscribeFunc
}

// A Updatable is a Refreshable which supports setting the value with a user-provided value.
// When a utility returns a (non-Updatable) Refreshable, it implies that value updates are handled internally.
type Updatable[T any] interface {
Refreshable[T]
// Update updates the Refreshable with a new T.
// It blocks until all subscribers have completed.
Update(T)
}

// A Validated is a Refreshable capable of rejecting updates according to validation logic.
// Its Current method returns the most recent value to pass validation.
type Validated[T any] interface {
Refreshable[T]
// Validation returns the result of the most recent validation.
// If the last value was valid, Validation returns the same value as Current and a nil error.
// If the last value was invalid, it and the error are returned. Current returns the most recent valid value.
Validation() (T, error)
}

// Ready extends Refreshable for asynchronous implementations which may not have a value when they are constructed.
// Callers should check that the Ready channel is closed before using the Current value.
type Ready[T any] interface {
Refreshable[T]
// ReadyC returns a channel which is closed after a value is successfully populated.
ReadyC() <-chan struct{}
}

// Subscribe subscribes to changes of this Refreshable. The provided function is called with the value of Current()
// whenever the value changes.
Subscribe(consumer func(interface{})) (unsubscribe func())
// UnsubscribeFunc removes a subscription from a refreshable's internal tracking and/or stops its update routine.
// It is safe to call multiple times.
type UnsubscribeFunc func()

func New[T any](val T) Updatable[T] {
return newDefault(val)
}

// Map returns a new Refreshable based on the current one that handles updates based on the current Refreshable.
func Map[T any, M any](original Refreshable[T], mapFn func(T) M) (Refreshable[M], UnsubscribeFunc) {
out := New(mapFn(original.Current()))
stop := original.Subscribe(func(v T) {
out.Update(mapFn(v))
})
out.Update(mapFn(original.Current()))
return out, stop
}

// MapContext is like Map but unsubscribes when the context is cancelled.
func MapContext[T any, M any](ctx context.Context, original Refreshable[T], mapFn func(T) M) Refreshable[M] {
out, stop := Map(original, mapFn)
go func() {
<-ctx.Done()
stop()
}()
return out
}

// MapWithError is similar to Validate but allows for the function to return a mapping/mutation
// of the input object in addition to returning an error. The returned validRefreshable will contain the mapped value.
// An error is returned if the current original value fails to map.
func MapWithError[T any, M any](original Refreshable[T], mapFn func(T) (M, error)) (Validated[M], UnsubscribeFunc, error) {
v, stop := newValidRefreshable(original, mapFn)
_, err := v.Validation()
return v, stop, err
}

// Map returns a new Refreshable based on the current one that handles updates based on the current Refreshable.
Map(func(interface{}) interface{}) Refreshable
// Validate returns a new Refreshable that returns the latest original value accepted by the validatingFn.
// If the upstream value results in an error, it is reported by Validation().
// An error is returned if the current original value is invalid.
func Validate[T any](original Refreshable[T], validatingFn func(T) error) (Validated[T], UnsubscribeFunc, error) {
return MapWithError(original, identity(validatingFn))
}
95 changes: 41 additions & 54 deletions refreshable/refreshable_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,84 +5,71 @@
package refreshable

import (
"fmt"
"reflect"
"sync"
"sync/atomic"
)

type DefaultRefreshable struct {
typ reflect.Type
current *atomic.Value

sync.Mutex // protects subscribers
subscribers []*func(interface{})
type defaultRefreshable[T any] struct {
mux sync.Mutex
current atomic.Value
subscribers []*func(T)
}

func NewDefaultRefreshable(val interface{}) *DefaultRefreshable {
current := atomic.Value{}
current.Store(val)

return &DefaultRefreshable{
current: &current,
typ: reflect.TypeOf(val),
}
func newDefault[T any](val T) Updatable[T] {
d := new(defaultRefreshable[T])
d.current.Store(&val)
return d
}

func (d *DefaultRefreshable) Update(val interface{}) error {
d.Lock()
defer d.Unlock()

if valType := reflect.TypeOf(val); valType != d.typ {
return fmt.Errorf("new refreshable value must be type %s: got %s", d.typ, valType)
}
func newZero[T any]() Updatable[T] {
d := new(defaultRefreshable[T])
var zero T
d.current.Store(&zero)
return d
}

if reflect.DeepEqual(d.current.Load(), val) {
return nil
// Update changes the value of the Refreshable, then blocks while subscribers are executed.
func (d *defaultRefreshable[T]) Update(val T) {
d.mux.Lock()
defer d.mux.Unlock()
old := d.current.Swap(&val)
if reflect.DeepEqual(*(old.(*T)), val) {
return
}
d.current.Store(val)

for _, sub := range d.subscribers {
(*sub)(val)
}
return nil
}

func (d *DefaultRefreshable) Current() interface{} {
return d.current.Load()
func (d *defaultRefreshable[T]) Current() T {
return *(d.current.Load().(*T))
}

func (d *DefaultRefreshable) Subscribe(consumer func(interface{})) (unsubscribe func()) {
d.Lock()
defer d.Unlock()
func (d *defaultRefreshable[T]) Subscribe(consumer func(T)) UnsubscribeFunc {
d.mux.Lock()
defer d.mux.Unlock()

consumerFnPtr := &consumer
d.subscribers = append(d.subscribers, consumerFnPtr)
return func() {
d.unsubscribe(consumerFnPtr)
}
return d.unsubscribe(consumerFnPtr)
}

func (d *DefaultRefreshable) unsubscribe(consumerFnPtr *func(interface{})) {
d.Lock()
defer d.Unlock()

matchIdx := -1
for idx, currSub := range d.subscribers {
if currSub == consumerFnPtr {
matchIdx = idx
break
func (d *defaultRefreshable[T]) unsubscribe(consumerFnPtr *func(T)) UnsubscribeFunc {
return func() {
d.mux.Lock()
defer d.mux.Unlock()

matchIdx := -1
for idx, currSub := range d.subscribers {
if currSub == consumerFnPtr {
matchIdx = idx
break
}
}
if matchIdx != -1 {
d.subscribers = append(d.subscribers[:matchIdx], d.subscribers[matchIdx+1:]...)
}
}
if matchIdx != -1 {
d.subscribers = append(d.subscribers[:matchIdx], d.subscribers[matchIdx+1:]...)
}
}

func (d *DefaultRefreshable) Map(mapFn func(interface{}) interface{}) Refreshable {
newRefreshable := NewDefaultRefreshable(mapFn(d.Current()))
d.Subscribe(func(updatedVal interface{}) {
_ = newRefreshable.Update(mapFn(updatedVal))
})
return newRefreshable
}
Loading