Skip to content

Commit

Permalink
Add CacheLatestTtlSecs to allow expiration of latest schemas (#1106)
Browse files Browse the repository at this point in the history
* chore: update repo semaphore config

* chore: update repo semaphore config

* chore: update repo semaphore config

* chore: update repo semaphore config

* chore: update repo semaphore config

* chore: update repo semaphore config

* chore: update repo semaphore config

* chore: update repo semaphore config

* chore: update repo semaphore config

* chore: update repo semaphore config

* chore: update repo semaphore config

* chore: update repo semaphore config

* chore: update repo semaphore config

* chore: update repo semaphore config

* chore: update repo semaphore config

* chore: update repo semaphore config

* Add CacheLatestTtlSecs to allow expiration of latest schemas

* Fix golint

* Add runtime finalizer to stop ticker

---------

Co-authored-by: Confluent Jenkins Bot <[email protected]>
  • Loading branch information
rayokota and ConfluentJenkins authored Nov 20, 2023
1 parent 8d530db commit af4a5f8
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 1 deletion.
2 changes: 2 additions & 0 deletions schemaregistry/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type Cache interface {
// Parameters:
// * `key` - the key to delete
Delete(key interface{})
// Clear clears the cache
Clear()
// ToMap returns the current cache entries copied into a map
ToMap() map[interface{}]interface{}
}
13 changes: 13 additions & 0 deletions schemaregistry/cache/lrucache.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,19 @@ func (c *LRUCache) Delete(key interface{}) {
}
}

// Clear clears the cache
func (c *LRUCache) Clear() {
c.cacheLock.Lock()
for key, value := range c.lruElements {
delete(c.lruElements, key)
c.lruKeys.Remove(value)
}
for key := range c.entries {
delete(c.entries, key)
}
c.cacheLock.Unlock()
}

// ToMap returns the current cache entries copied into a map
func (c *LRUCache) ToMap() map[interface{}]interface{} {
ret := make(map[interface{}]interface{})
Expand Down
7 changes: 7 additions & 0 deletions schemaregistry/cache/mapcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ func (c *MapCache) Delete(key interface{}) {
delete(c.entries, key)
}

// Clear clears the cache
func (c *MapCache) Clear() {
for key := range c.entries {
delete(c.entries, key)
}
}

// ToMap returns the current cache entries copied into a map
func (c *MapCache) ToMap() map[interface{}]interface{} {
ret := make(map[interface{}]interface{})
Expand Down
2 changes: 2 additions & 0 deletions schemaregistry/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ type Config struct {
RequestTimeoutMs int
// CacheCapacity positive integer or zero for unbounded capacity
CacheCapacity int
// CacheLatestTTLSecs ttl in secs for caching the latest schema
CacheLatestTTLSecs int

// HTTP client
HTTPClient *http.Client
Expand Down
68 changes: 67 additions & 1 deletion schemaregistry/schemaregistry_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
"encoding/json"
"fmt"
"net/url"
"runtime"
"strings"
"sync"
"time"

"github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/cache"
)
Expand Down Expand Up @@ -189,6 +191,9 @@ type client struct {
schemaToVersionCacheLock sync.RWMutex
versionToSchemaCache cache.Cache
versionToSchemaCacheLock sync.RWMutex
latestToSchemaCache cache.Cache
latestToSchemaCacheLock sync.RWMutex
evictor *evictor
}

var _ Client = new(client)
Expand Down Expand Up @@ -243,6 +248,7 @@ func NewClient(conf *Config) (Client, error) {
var idToSchemaCache cache.Cache
var schemaToVersionCache cache.Cache
var versionToSchemaCache cache.Cache
var latestToSchemaCache cache.Cache
if conf.CacheCapacity != 0 {
schemaToIDCache, err = cache.NewLRUCache(conf.CacheCapacity)
if err != nil {
Expand All @@ -260,18 +266,28 @@ func NewClient(conf *Config) (Client, error) {
if err != nil {
return nil, err
}
latestToSchemaCache, err = cache.NewLRUCache(conf.CacheCapacity)
if err != nil {
return nil, err
}
} else {
schemaToIDCache = cache.NewMapCache()
idToSchemaCache = cache.NewMapCache()
schemaToVersionCache = cache.NewMapCache()
versionToSchemaCache = cache.NewMapCache()
latestToSchemaCache = cache.NewMapCache()
}
handle := &client{
restService: restService,
schemaToIDCache: schemaToIDCache,
idToSchemaCache: idToSchemaCache,
schemaToVersionCache: schemaToVersionCache,
versionToSchemaCache: versionToSchemaCache,
latestToSchemaCache: latestToSchemaCache,
}
if conf.CacheLatestTTLSecs > 0 {
runEvictor(handle, time.Duration(conf.CacheLatestTTLSecs)*time.Second)
runtime.SetFinalizer(handle, stopEvictor)
}
return handle, nil
}
Expand Down Expand Up @@ -393,7 +409,26 @@ func (c *client) GetID(subject string, schema SchemaInfo, normalize bool) (id in
// GetLatestSchemaMetadata fetches latest version registered with the provided subject
// Returns SchemaMetadata object
func (c *client) GetLatestSchemaMetadata(subject string) (result SchemaMetadata, err error) {
return c.GetSchemaMetadata(subject, -1)
c.latestToSchemaCacheLock.RLock()
metadataValue, ok := c.latestToSchemaCache.Get(subject)
c.latestToSchemaCacheLock.RUnlock()
if ok {
return *metadataValue.(*SchemaMetadata), nil
}

c.latestToSchemaCacheLock.Lock()
// another goroutine could have already put it in cache
metadataValue, ok = c.latestToSchemaCache.Get(subject)
if !ok {
err = c.restService.handleRequest(newRequest("GET", versions, nil, url.PathEscape(subject), "latest"), &result)
if err == nil {
c.latestToSchemaCache.Put(subject, &result)
}
} else {
result = *metadataValue.(*SchemaMetadata)
}
c.latestToSchemaCacheLock.Unlock()
return result, err
}

// GetSchemaMetadata fetches the requested subject schema identified by version
Expand Down Expand Up @@ -687,3 +722,34 @@ func (c *client) UpdateDefaultCompatibility(update Compatibility) (compatibility

return result.CompatibilityUpdate, err
}

type evictor struct {
Interval time.Duration
stop chan bool
}

func (e *evictor) Run(c cache.Cache) {
ticker := time.NewTicker(e.Interval)
for {
select {
case <-ticker.C:
c.Clear()
case <-e.stop:
ticker.Stop()
return
}
}
}

func stopEvictor(c *client) {
c.evictor.stop <- true
}

func runEvictor(c *client, ci time.Duration) {
e := &evictor{
Interval: ci,
stop: make(chan bool),
}
c.evictor = e
go e.Run(c.latestToSchemaCache)
}

0 comments on commit af4a5f8

Please sign in to comment.