Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persist proxies from config files #4407

Merged
merged 3 commits into from
Jul 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 115 additions & 48 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,9 @@ type Agent struct {

// proxyManager is the proxy process manager for managed Connect proxies.
proxyManager *proxy.Manager

// proxyLock protects proxy information in the local state from concurrent modification
proxyLock sync.Mutex
}

func New(c *config.RuntimeConfig) (*Agent, error) {
Expand Down Expand Up @@ -1616,16 +1619,21 @@ func (a *Agent) purgeService(serviceID string) error {
type persistedProxy struct {
ProxyToken string
Proxy *structs.ConnectManagedProxy

// Set to true when the proxy information originated from the agents configuration
// as opposed to API registration.
FromFile bool
}

// persistProxy saves a proxy definition to a JSON file in the data dir
func (a *Agent) persistProxy(proxy *local.ManagedProxy) error {
func (a *Agent) persistProxy(proxy *local.ManagedProxy, FromFile bool) error {
proxyPath := filepath.Join(a.config.DataDir, proxyDir,
stringHash(proxy.Proxy.ProxyService.ID))

wrapped := persistedProxy{
ProxyToken: proxy.ProxyToken,
Proxy: proxy.Proxy,
FromFile: FromFile,
}
encoded, err := json.Marshal(wrapped)
if err != nil {
Expand Down Expand Up @@ -1795,7 +1803,7 @@ func (a *Agent) RemoveService(serviceID string, persist bool) error {
}
}

log.Printf("[DEBUG] agent: removed service %q", serviceID)
a.logger.Printf("[DEBUG] agent: removed service %q", serviceID)
return nil
}

Expand Down Expand Up @@ -2076,7 +2084,9 @@ func (a *Agent) RemoveCheck(checkID types.CheckID, persist bool) error {
return nil
}

// AddProxy adds a new local Connect Proxy instance to be managed by the agent.
// addProxyLocked adds a new local Connect Proxy instance to be managed by the agent.
//
// This assumes that the agent's proxyLock is already held
//
// It REQUIRES that the service that is being proxied is already present in the
// local state. Note that this is only used for agent-managed proxies so we can
Expand All @@ -2091,7 +2101,7 @@ func (a *Agent) RemoveCheck(checkID types.CheckID, persist bool) error {
// definitions from disk; new proxies must leave it blank to get a new token
// assigned. We need to restore from disk to enable to continue authenticating
// running proxies that already had that credential injected.
func (a *Agent) AddProxy(proxy *structs.ConnectManagedProxy, persist bool,
func (a *Agent) addProxyLocked(proxy *structs.ConnectManagedProxy, persist, FromFile bool,
restoredProxyToken string) error {
// Lookup the target service token in state if there is one.
token := a.State.ServiceToken(proxy.TargetServiceID)
Expand Down Expand Up @@ -2143,11 +2153,33 @@ func (a *Agent) AddProxy(proxy *structs.ConnectManagedProxy, persist bool,

// Persist the proxy
if persist && a.config.DataDir != "" {
return a.persistProxy(proxyState)
return a.persistProxy(proxyState, FromFile)
}
return nil
}

// addProxyLocked adds a new local Connect Proxy instance to be managed by the agent.
//
// It REQUIRES that the service that is being proxied is already present in the
// local state. Note that this is only used for agent-managed proxies so we can
// ensure that we always make this true. For externally managed and registered
// proxies we explicitly allow the proxy to be registered first to make
// bootstrap ordering of a new service simpler but the same is not true here
// since this is only ever called when setting up a _managed_ proxy which was
// registered as part of a service registration either from config or HTTP API
// call.
//
// The restoredProxyToken argument should only be used when restoring proxy
// definitions from disk; new proxies must leave it blank to get a new token
// assigned. We need to restore from disk to enable to continue authenticating
// running proxies that already had that credential injected.
func (a *Agent) AddProxy(proxy *structs.ConnectManagedProxy, persist, FromFile bool,
restoredProxyToken string) error {
a.proxyLock.Lock()
defer a.proxyLock.Unlock()
return a.addProxyLocked(proxy, persist, FromFile, restoredProxyToken)
}

// resolveProxyCheckAddress returns the best address to use for a TCP check of
// the proxy's public listener. It expects the input to already have default
// values populated by applyProxyConfigDefaults. It may return an empty string
Expand Down Expand Up @@ -2290,8 +2322,10 @@ func (a *Agent) applyProxyDefaults(proxy *structs.ConnectManagedProxy) error {
return nil
}

// RemoveProxy stops and removes a local proxy instance.
func (a *Agent) RemoveProxy(proxyID string, persist bool) error {
// removeProxyLocked stops and removes a local proxy instance.
//
// It is assumed that this function is called while holding the proxyLock already
func (a *Agent) removeProxyLocked(proxyID string, persist bool) error {
// Validate proxyID
if proxyID == "" {
return fmt.Errorf("proxyID missing")
Expand All @@ -2316,6 +2350,13 @@ func (a *Agent) RemoveProxy(proxyID string, persist bool) error {
return nil
}

// RemoveProxy stops and removes a local proxy instance.
func (a *Agent) RemoveProxy(proxyID string, persist bool) error {
a.proxyLock.Lock()
defer a.proxyLock.Unlock()
return a.removeProxyLocked(proxyID, persist)
}

// verifyProxyToken takes a token and attempts to verify it against the
// targetService name. If targetProxy is specified, then the local proxy token
// must exactly match the given proxy ID. cert, config, etc.).
Expand Down Expand Up @@ -2782,33 +2823,21 @@ func (a *Agent) unloadChecks() error {
return nil
}

// loadProxies will load connect proxy definitions from configuration and
// persisted definitions on disk, and load them into the local agent.
func (a *Agent) loadProxies(conf *config.RuntimeConfig) error {
for _, svc := range conf.Services {
if svc.Connect != nil {
proxy, err := svc.ConnectManagedProxy()
if err != nil {
return fmt.Errorf("failed adding proxy: %s", err)
}
if proxy == nil {
continue
}
if err := a.AddProxy(proxy, false, ""); err != nil {
return fmt.Errorf("failed adding proxy: %s", err)
}
}
}
// loadPersistedProxies will load connect proxy definitions from their
// persisted state on disk and return a slice of them
//
// This does not add them to the local
func (a *Agent) loadPersistedProxies() (map[string]persistedProxy, error) {
persistedProxies := make(map[string]persistedProxy)

// Load any persisted proxies
proxyDir := filepath.Join(a.config.DataDir, proxyDir)
files, err := ioutil.ReadDir(proxyDir)
if err != nil {
if os.IsNotExist(err) {
return nil
if !os.IsNotExist(err) {
return nil, fmt.Errorf("Failed reading proxies dir %q: %s", proxyDir, err)
}
return fmt.Errorf("Failed reading proxies dir %q: %s", proxyDir, err)
}

for _, fi := range files {
// Skip all dirs
if fi.IsDir() {
Expand All @@ -2817,55 +2846,93 @@ func (a *Agent) loadProxies(conf *config.RuntimeConfig) error {

// Skip all partially written temporary files
if strings.HasSuffix(fi.Name(), "tmp") {
a.logger.Printf("[WARN] agent: Ignoring temporary proxy file %v", fi.Name())
continue
return nil, fmt.Errorf("Ignoring temporary proxy file %v", fi.Name())
}

// Open the file for reading
file := filepath.Join(proxyDir, fi.Name())
fh, err := os.Open(file)
if err != nil {
return fmt.Errorf("failed opening proxy file %q: %s", file, err)
return nil, fmt.Errorf("failed opening proxy file %q: %s", file, err)
}

// Read the contents into a buffer
buf, err := ioutil.ReadAll(fh)
fh.Close()
if err != nil {
return fmt.Errorf("failed reading proxy file %q: %s", file, err)
return nil, fmt.Errorf("failed reading proxy file %q: %s", file, err)
}

// Try decoding the proxy definition
var p persistedProxy
if err := json.Unmarshal(buf, &p); err != nil {
a.logger.Printf("[ERR] agent: Failed decoding proxy file %q: %s", file, err)
continue
return nil, fmt.Errorf("Failed decoding proxy file %q: %s", file, err)
}
proxyID := p.Proxy.ProxyService.ID
svcID := p.Proxy.TargetServiceID

persistedProxies[svcID] = p
}

return persistedProxies, nil
}

// loadProxies will load connect proxy definitions from configuration and
// persisted definitions on disk, and load them into the local agent.
func (a *Agent) loadProxies(conf *config.RuntimeConfig) error {
a.proxyLock.Lock()
defer a.proxyLock.Unlock()

persistedProxies, persistenceErr := a.loadPersistedProxies()

if a.State.Proxy(proxyID) != nil {
// Purge previously persisted proxy. This allows config to be preferred
// over services persisted from the API.
a.logger.Printf("[DEBUG] agent: proxy %q exists, not restoring from %q",
proxyID, file)
for _, svc := range conf.Services {
if svc.Connect != nil {
proxy, err := svc.ConnectManagedProxy()
if err != nil {
return fmt.Errorf("failed adding proxy: %s", err)
}
if proxy == nil {
continue
}
restoredToken := ""
if persisted, ok := persistedProxies[proxy.TargetServiceID]; ok {
restoredToken = persisted.ProxyToken
}

if err := a.addProxyLocked(proxy, true, true, restoredToken); err != nil {
return fmt.Errorf("failed adding proxy: %s", err)
}
}
}

for _, persisted := range persistedProxies {
proxyID := persisted.Proxy.ProxyService.ID
if persisted.FromFile && a.State.Proxy(proxyID) == nil {
// Purge proxies that were configured previously but are no longer in the config
a.logger.Printf("[DEBUG] agent: purging stale persisted proxy %q", proxyID)
if err := a.purgeProxy(proxyID); err != nil {
return fmt.Errorf("failed purging proxy %q: %s", proxyID, err)
return fmt.Errorf("failed purging proxy %q: %v", proxyID, err)
}
} else {
a.logger.Printf("[DEBUG] agent: restored proxy definition %q from %q",
proxyID, file)
if err := a.AddProxy(p.Proxy, false, p.ProxyToken); err != nil {
return fmt.Errorf("failed adding proxy %q: %s", proxyID, err)
} else if !persisted.FromFile {
if a.State.Proxy(proxyID) == nil {
a.logger.Printf("[DEBUG] agent: restored proxy definition %q", proxyID)
if err := a.addProxyLocked(persisted.Proxy, false, false, persisted.ProxyToken); err != nil {
return fmt.Errorf("failed adding proxy %q: %v", proxyID, err)
}
} else {
a.logger.Printf("[WARN] agent: proxy definition %q was overwritten by a proxy definition within a config file", proxyID)
}
}
}
return nil

return persistenceErr
}

// unloadProxies will deregister all proxies known to the local agent.
func (a *Agent) unloadProxies() error {
a.proxyLock.Lock()
defer a.proxyLock.Unlock()
for id := range a.State.Proxies() {
if err := a.RemoveProxy(id, false); err != nil {
if err := a.removeProxyLocked(id, false); err != nil {
return fmt.Errorf("Failed deregistering proxy '%s': %s", id, err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion agent/agent_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ func (s *HTTPServer) AgentRegisterService(resp http.ResponseWriter, req *http.Re
}
// Add proxy (which will add proxy service so do it before we trigger sync)
if proxy != nil {
if err := s.agent.AddProxy(proxy, true, ""); err != nil {
if err := s.agent.AddProxy(proxy, true, false, ""); err != nil {
return nil, err
}
}
Expand Down
Loading