Skip to content

Commit

Permalink
refactor the cache.Cache interface
Browse files Browse the repository at this point in the history
Since all the proxy backends use the same copy+pasted logic to use
the disk cache, refactor to put that functionality in the disk cache.
This simplifies the proxy backends.

Fixes buchgr#161.
  • Loading branch information
mostynb committed Jan 16, 2020
1 parent 4472079 commit 369ddc3
Show file tree
Hide file tree
Showing 17 changed files with 331 additions and 464 deletions.
4 changes: 0 additions & 4 deletions cache/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,4 @@ go_library(
srcs = ["cache.go"],
importpath = "github.com/buchgr/bazel-remote/cache",
visibility = ["//visibility:public"],
deps = [
"@com_github_bazelbuild_remote_apis//build/bazel/remote/execution/v2:go_default_library",
"@com_github_golang_protobuf//proto:go_default_library",
],
)
92 changes: 12 additions & 80 deletions cache/cache.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
package cache

import (
"errors"
"io"
"io/ioutil"

pb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
"github.com/golang/protobuf/proto"
)

// EntryKind describes the kind of cache entry
Expand Down Expand Up @@ -45,85 +42,20 @@ type Error struct {
Text string
}

var ErrNotFound = errors.New("Item not found")

func (e *Error) Error() string {
return e.Text
}

// Cache is the interface for a generic blob storage backend. Implementers should handle
// locking internally.
type Cache interface {

// Put stores a stream of `size` bytes from `rdr` into the cache. If `hash` is
// not the empty string, and the contents don't match it, a non-nil error is
// returned.
Put(kind EntryKind, hash string, size int64, rdr io.Reader) error

// Get returns an io.ReadCloser with the content of the cache item stored under `hash`
// and the number of bytes that can be read from it. If the item is not found, `rdr` is
// nil. If some error occurred when processing the request, then it is returned.
Get(kind EntryKind, hash string) (rdr io.ReadCloser, sizeBytes int64, err error)

// Contains returns true if the `hash` key exists in the cache.
Contains(kind EntryKind, hash string) (ok bool)

// MaxSize returns the maximum cache size in bytes.
MaxSize() int64

// Return the current size of the cache in bytes, and the number of
// items stored in the cache.
Stats() (int64, int)
}

// If `hash` refers to a valid ActionResult with all the dependencies
// available in the CAS, return it and its serialized value.
// If not, return nil values.
// If something unexpected went wrong, return an error.
func GetValidatedActionResult(c Cache, hash string) (*pb.ActionResult, []byte, error) {
rdr, sizeBytes, err := c.Get(AC, hash)
if err != nil {
return nil, nil, err
}

if rdr == nil || sizeBytes <= 0 {
return nil, nil, nil // aka "not found"
}

data, err := ioutil.ReadAll(rdr)
if err != nil {
return nil, nil, err
}

result := &pb.ActionResult{}
err = proto.Unmarshal(data, result)
if err != nil {
return nil, nil, err
}

for _, f := range result.OutputFiles {
if len(f.Contents) == 0 && f.Digest.SizeBytes > 0 {
if !c.Contains(CAS, f.Digest.Hash) {
return nil, nil, nil // aka "not found"
}
}
}

for _, d := range result.OutputDirectories {
if !c.Contains(CAS, d.TreeDigest.Hash) {
return nil, nil, nil // aka "not found"
}
}

if result.StdoutDigest != nil && result.StdoutDigest.SizeBytes > 0 {
if !c.Contains(CAS, result.StdoutDigest.Hash) {
return nil, nil, nil // aka "not found"
}
}

if result.StderrDigest != nil && result.StderrDigest.SizeBytes > 0 {
if !c.Contains(CAS, result.StderrDigest.Hash) {
return nil, nil, nil // aka "not found"
}
}
// Cache backends implement this interface, and are optionally used by DiskCache.
type CacheProxy interface {
// Put should make a reasonable effort to proxy this data to the backend.
// This is allowed to fail silently (eg when under heavy load).
Put(kind EntryKind, hash string, size int64, rdr io.Reader)

return result, data, nil
// Get should return the cache item identified by `hash`, or an error
// if something went wrong. If the item was not found, the error will
// be ErrNotFound.
Get(kind EntryKind, hash string) ([]byte, error)
}
2 changes: 2 additions & 0 deletions cache/disk/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//cache:go_default_library",
"@com_github_bazelbuild_remote_apis//build/bazel/remote/execution/v2:go_default_library",
"@com_github_djherbis_atime//:go_default_library",
"@com_github_golang_protobuf//proto:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
],
Expand Down
123 changes: 107 additions & 16 deletions cache/disk/disk.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package disk

import (
"bytes"
"crypto/sha256"
"encoding/hex"
"fmt"
Expand All @@ -17,6 +18,9 @@ import (
"github.com/djherbis/atime"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

pb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
"github.com/golang/protobuf/proto"
)

var (
Expand All @@ -41,16 +45,18 @@ func (i *lruItem) Size() int64 {
return i.size
}

// diskCache is an implementation of the cache backed by files on a filesystem.
type diskCache struct {
dir string
// DiskCache is filesystem-based cache, with an optional backend proxy.
type DiskCache struct {
dir string
proxy cache.CacheProxy

mu *sync.Mutex
lru SizedLRU
}

// New returns a new instance of a filesystem-based cache rooted at `dir`,
// with a maximum size of `maxSizeBytes` bytes.
func New(dir string, maxSizeBytes int64) cache.Cache {
// with a maximum size of `maxSizeBytes` bytes and an optional backend `proxy`.
func New(dir string, maxSizeBytes int64, proxy cache.CacheProxy) *DiskCache {
// Create the directory structure.
hexLetters := []byte("0123456789abcdef")
for _, c1 := range hexLetters {
Expand Down Expand Up @@ -82,10 +88,11 @@ func New(dir string, maxSizeBytes int64) cache.Cache {
}
}

cache := &diskCache{
dir: filepath.Clean(dir),
mu: &sync.Mutex{},
lru: NewSizedLRU(maxSizeBytes, onEvict),
cache := &DiskCache{
dir: filepath.Clean(dir),
proxy: proxy,
mu: &sync.Mutex{},
lru: NewSizedLRU(maxSizeBytes, onEvict),
}

err := cache.migrateDirectories()
Expand All @@ -101,7 +108,7 @@ func New(dir string, maxSizeBytes int64) cache.Cache {
return cache
}

func (c *diskCache) migrateDirectories() error {
func (c *DiskCache) migrateDirectories() error {
err := migrateDirectory(filepath.Join(c.dir, cache.AC.String()))
if err != nil {
return err
Expand Down Expand Up @@ -132,7 +139,7 @@ func migrateDirectory(dir string) error {
// loadExistingFiles lists all files in the cache directory, and adds them to the
// LRU index so that they can be served. Files are sorted by access time first,
// so that the eviction behavior is preserved across server restarts.
func (c *diskCache) loadExistingFiles() error {
func (c *DiskCache) loadExistingFiles() error {
log.Printf("Loading existing files in %s.\n", c.dir)

// Walk the directory tree
Expand Down Expand Up @@ -170,7 +177,10 @@ func (c *diskCache) loadExistingFiles() error {
return nil
}

func (c *diskCache) Put(kind cache.EntryKind, hash string, expectedSize int64, r io.Reader) error {
// Put stores a stream of `size` bytes from `rdr` into the cache. If `hash` is
// not the empty string, and the contents don't match it, a non-nil error is
// returned.
func (c *DiskCache) Put(kind cache.EntryKind, hash string, expectedSize int64, r io.Reader) error {
c.mu.Lock()

key := cacheKey(kind, hash)
Expand Down Expand Up @@ -270,12 +280,32 @@ func (c *diskCache) Put(kind cache.EntryKind, hash string, expectedSize int64, r
shouldCommit = true
}

if c.proxy != nil {
// TODO: buffer in memory, avoid a filesystem round-trip!
fr, err := os.Open(filePath)
if err != nil {
c.proxy.Put(kind, hash, expectedSize, fr)
}
}

return err
}

func (c *diskCache) Get(kind cache.EntryKind, hash string) (rdr io.ReadCloser, sizeBytes int64, err error) {
// Get returns an io.ReadCloser with the content of the cache item stored under `hash`
// and the number of bytes that can be read from it. If the item is not found, `rdr` is
// nil. If some error occurred when processing the request, then it is returned.
func (c *DiskCache) Get(kind cache.EntryKind, hash string) (rdr io.ReadCloser, sizeBytes int64, err error) {
if !c.Contains(kind, hash) {
cacheMisses.Inc()

if c.proxy != nil {
data, err := c.proxy.Get(kind, hash)
if err == nil {
err = c.Put(kind, hash, int64(len(data)), bytes.NewReader(data))
return ioutil.NopCloser(bytes.NewReader(data)), int64(len(data)), nil
}
}

return nil, 0, nil
}

Expand All @@ -298,21 +328,28 @@ func (c *diskCache) Get(kind cache.EntryKind, hash string) (rdr io.ReadCloser, s
return rdr, sizeBytes, nil
}

func (c *diskCache) Contains(kind cache.EntryKind, hash string) (ok bool) {
// Contains returns true if the `hash` key exists in the cache.
func (c *DiskCache) Contains(kind cache.EntryKind, hash string) (ok bool) {
c.mu.Lock()
defer c.mu.Unlock()

val, found := c.lru.Get(cacheKey(kind, hash))
// Uncommitted (i.e. uploading items) should be reported as not ok
return found && val.(*lruItem).committed

// TODO: consider querying the proxy (if there is one)?
// The proxy backends have not attempted to do this so far.
}

func (c *diskCache) MaxSize() int64 {
// MaxSize returns the maximum cache size in bytes.
func (c *DiskCache) MaxSize() int64 {
// The underlying value is never modified, no need to lock.
return c.lru.MaxSize()
}

func (c *diskCache) Stats() (currentSize int64, numItems int) {
// Return the current size of the cache in bytes, and the number of
// items stored in the cache.
func (c *DiskCache) Stats() (currentSize int64, numItems int) {
c.mu.Lock()
defer c.mu.Unlock()

Expand All @@ -335,3 +372,57 @@ func cacheKey(kind cache.EntryKind, hash string) string {
func cacheFilePath(kind cache.EntryKind, cacheDir string, hash string) string {
return filepath.Join(cacheDir, cacheKey(kind, hash))
}

// If `hash` refers to a valid ActionResult with all the dependencies
// available in the CAS, return it and its serialized value.
// If not, return nil values.
// If something unexpected went wrong, return an error.
func (c *DiskCache) GetValidatedActionResult(hash string) (*pb.ActionResult, []byte, error) {
rdr, sizeBytes, err := c.Get(cache.AC, hash)
if err != nil {
return nil, nil, err
}

if rdr == nil || sizeBytes <= 0 {
return nil, nil, nil // aka "not found"
}

data, err := ioutil.ReadAll(rdr)
if err != nil {
return nil, nil, err
}

result := &pb.ActionResult{}
err = proto.Unmarshal(data, result)
if err != nil {
return nil, nil, err
}

for _, f := range result.OutputFiles {
if len(f.Contents) == 0 && f.Digest.SizeBytes > 0 {
if !c.Contains(cache.CAS, f.Digest.Hash) {
return nil, nil, nil // aka "not found"
}
}
}

for _, d := range result.OutputDirectories {
if !c.Contains(cache.CAS, d.TreeDigest.Hash) {
return nil, nil, nil // aka "not found"
}
}

if result.StdoutDigest != nil && result.StdoutDigest.SizeBytes > 0 {
if !c.Contains(cache.CAS, result.StdoutDigest.Hash) {
return nil, nil, nil // aka "not found"
}
}

if result.StderrDigest != nil && result.StderrDigest.SizeBytes > 0 {
if !c.Contains(cache.CAS, result.StderrDigest.Hash) {
return nil, nil, nil // aka "not found"
}
}

return result, data, nil
}
Loading

0 comments on commit 369ddc3

Please sign in to comment.