Skip to content

Commit

Permalink
implement cache server
Browse files Browse the repository at this point in the history
  • Loading branch information
BenTheElder committed Feb 10, 2018
1 parent fb8f5ec commit 8b52fdb
Show file tree
Hide file tree
Showing 5 changed files with 309 additions and 0 deletions.
1 change: 1 addition & 0 deletions experiment/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ filegroup(
"//experiment/cherrypicker:all-srcs",
"//experiment/coverage:all-srcs",
"//experiment/manual-trigger:all-srcs",
"//experiment/nursery:all-srcs",
"//experiment/refresh:all-srcs",
"//experiment/tracer:all-srcs",
],
Expand Down
35 changes: 35 additions & 0 deletions experiment/nursery/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library")

go_library(
name = "go_default_library",
srcs = ["main.go"],
importpath = "k8s.io/test-infra/experiment/nursery",
visibility = ["//visibility:private"],
deps = [
"//experiment/nursery/diskcache:go_default_library",
"//vendor/github.com/sirupsen/logrus:go_default_library",
],
)

go_binary(
name = "nursery",
embed = [":go_default_library"],
visibility = ["//visibility:public"],
)

filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)

filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//experiment/nursery/diskcache:all-srcs",
],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)
23 changes: 23 additions & 0 deletions experiment/nursery/diskcache/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "go_default_library",
srcs = ["cache.go"],
importpath = "k8s.io/test-infra/experiment/nursery/diskcache",
visibility = ["//visibility:public"],
deps = ["//vendor/github.com/sirupsen/logrus:go_default_library"],
)

filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)

filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)
131 changes: 131 additions & 0 deletions experiment/nursery/diskcache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// cache implements cache storage (currently disk backed) for use in nursery
package diskcache

import (
"crypto/sha256"
"encoding/hex"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"

log "github.com/sirupsen/logrus"
)

// Key is a key in the cache, must be filesystem / URL safe
type Key = string

// ReadHandler should be implemeted by cache users for use with Cache.Get
type ReadHandler func(exists bool, contents io.ReadSeeker) error

// Cache implements disk backed cache storage
type Cache struct {
diskRoot string
}

func NewCache(diskRoot string) *Cache {
return &Cache{
diskRoot,
}
}

func (c *Cache) keyToPath(key Key) string {
return filepath.Join(c.diskRoot, key)
}

// DiskRoot returns the root directory containing all on-disk cache entries
func (c *Cache) DiskRoot() string {
return c.diskRoot
}

func exists(path string) bool {
_, err := os.Stat(path)
return !os.IsNotExist(err)
}

func ensureDir(dir string) {
if exists(dir) {
return
}
log.WithError(os.MkdirAll(dir, os.FileMode(0744))).Infof("MkDirAll(%s)", dir)
}

// Put copies the content reader until the end into the cache at key
// if contentSHA256 is not "" then the contents will only be stored in the
// cache if the content's hex string SHA256 matches
func (c *Cache) Put(key Key, content io.Reader, contentSHA256 string) error {
// make sure directory exists
path := c.keyToPath(key)
dir := filepath.Dir(path)
ensureDir(dir)
// create a temp file to get the content on disk
temp, err := ioutil.TempFile(dir, "temp-put")
if err != nil {
return fmt.Errorf("failed to create cache entry: %v", err)
}
// fast path copying when not hashing content
if contentSHA256 == "" {
io.Copy(temp, content)
if err != nil {
os.Remove(temp.Name())
return fmt.Errorf("failed to copy into cache entry: %v", err)
}
} else {
hasher := sha256.New()
_, err = io.Copy(io.MultiWriter(temp, hasher), content)
if err != nil {
os.Remove(temp.Name())
return fmt.Errorf("failed to copy into cache entry: %v", err)
}
actualContentSHA256 := hex.EncodeToString(hasher.Sum(nil))
if actualContentSHA256 != contentSHA256 {
os.Remove(temp.Name())
return fmt.Errorf(
"hashes did not match for '%s', given: '%s' actual: '%s",
key, contentSHA256, actualContentSHA256)
}
}
// move the content to the key location
err = temp.Sync()
if err != nil {
os.Remove(temp.Name())
return fmt.Errorf("failed to sync cache entry: %v", err)
}
temp.Close()
err = os.Rename(temp.Name(), path)
if err != nil {
os.Remove(temp.Name())
return fmt.Errorf("failed to insert contents into cache: %v", err)
}
return nil
}

// Get provides your readHandler with the contents at key
func (c *Cache) Get(key Key, readHandler ReadHandler) error {
path := c.keyToPath(key)
if !exists(path) {
return readHandler(false, nil)
}
f, err := os.Open(path)
if err != nil {
return fmt.Errorf("failed to get key: %v", err)
}
return readHandler(true, f)
}
119 changes: 119 additions & 0 deletions experiment/nursery/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// nursery implements a bazel remote cache service [1]
// supporting arbitrarily many workspaces stored within the same
// top level directory
// the first path segment in eqach {PUT,GET} request is mapped to an individual
// workspace cache, the remaining segments should follow [2].
//
// [1] https://docs.bazel.build/versions/master/remote-caching.html
// [2] https://docs.bazel.build/versions/master/remote-caching.html#http-caching-protocol
package main

import (
"errors"
"flag"
"io"
"net/http"
"os"
"strings"
"time"

"k8s.io/test-infra/experiment/nursery/diskcache"

log "github.com/sirupsen/logrus"
)

var dir = flag.String("dir", "", "location to store cache entries on disk")
var host = flag.String("host", "", "host address to listen on")
var port = flag.Int("port", 8080, "port to listen on")

func init() {
log.SetFormatter(&log.TextFormatter{})
log.SetOutput(os.Stdout)
}

func main() {
// TODO(bentheelder): bound cache size / convert to LRU
// TODO(bentheelder): improve logging
flag.Parse()
if *dir == "" {
log.Fatal("--dir must be set!")
}
cache := diskcache.NewCache(*dir)
http.Handle("/", cacheHandler(cache))
log.WithError(http.ListenAndServe(":8080", nil)).Fatal("ListenAndServe returned.")
}

var errNotFound = errors.New("entry not found")

func cacheHandler(cache *diskcache.Cache) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// parse and validate path
// the last segment should be a hash, and the
// the second to last segment should be "ac" or "cas"
parts := strings.Split(r.URL.Path, "/")
if len(parts) < 3 {
log.Warn("received an invalid request at path: %v", r.URL.Path)
http.Error(w, "invalid location", http.StatusBadRequest)
return
}
hash := parts[len(parts)-1]
acOrCAS := parts[len(parts)-2]
if acOrCAS != "ac" && acOrCAS != "cas" {
log.Warn("received an invalid request at path: %v", r.URL.Path)
http.Error(w, "invalid location", http.StatusBadRequest)
return
}
// actually handle request depending on method
switch m := r.Method; m {
// handle retreival
case http.MethodGet:
err := cache.Get(r.URL.Path, func(exists bool, contents io.ReadSeeker) error {
if !exists {
return errNotFound
}
http.ServeContent(w, r, "", time.Time{}, contents)
return nil
})
if err != nil {
// file not present
if err == errNotFound {
http.Error(w, err.Error(), http.StatusNotFound)
return
}
// unkown error
log.WithError(err).Error("error getting key")
http.Error(w, err.Error(), http.StatusNotFound)
return
}
// handle upload
case http.MethodPut:
// only hash CAS
if acOrCAS != "cas" {
hash = ""
}
err := cache.Put(r.URL.Path, r.Body, hash)
if err != nil {
log.WithError(err).Errorf("Failed to put: %v", r.URL.Path)
}
default:
log.Warn("received an invalid request method: %v", r.Method)
http.Error(w, "unsupported method", http.StatusBadRequest)
}
})
}

0 comments on commit 8b52fdb

Please sign in to comment.