Skip to content

Commit

Permalink
chore: force refresh on failed socket connections (#25)
Browse files Browse the repository at this point in the history
This commit moves the ConnectionInfoCache behind an interface and makes
it available through a factory, such that we have a clean way to test
that force refresh is called when a socket connection fails.
  • Loading branch information
enocom authored Jul 7, 2023
1 parent 2e666bf commit 2a08b11
Show file tree
Hide file tree
Showing 9 changed files with 394 additions and 142 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,132 +13,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.alloydb;

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.alloydb.v1beta.InstanceName;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import dev.failsafe.RateLimiter;
import java.security.KeyPair;
import java.security.cert.CertificateException;
import java.time.Instant;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

class ConnectionInfoCache {

private final ScheduledExecutorService executor;
private final ConnectionInfoRepository connectionInfoRepo;
private final InstanceName instanceName;
private final KeyPair clientConnectorKeyPair;
private final RateLimiter<Object> rateLimiter;
private final Object connectionInfoLock = new Object();
private final RefreshCalculator refreshCalculator;

@GuardedBy("connectionInfoLock")
private Future<ConnectionInfo> current;

@GuardedBy("connectionInfoLock")
private Future<ConnectionInfo> next;

ConnectionInfoCache(
ScheduledExecutorService executor,
ConnectionInfoRepository connectionInfoRepo,
InstanceName instanceName,
KeyPair clientConnectorKeyPair,
RefreshCalculator refreshCalculator,
RateLimiter<Object> rateLimiter) {
this.executor = executor;
this.connectionInfoRepo = connectionInfoRepo;
this.instanceName = instanceName;
this.clientConnectorKeyPair = clientConnectorKeyPair;
this.refreshCalculator = refreshCalculator;
this.rateLimiter = rateLimiter;
synchronized (connectionInfoLock) {
// Assign to current and next to avoid null references.
this.current = executor.submit(this::performRefresh);
this.next = this.current;
}
}

/** Returns the most recent connection info. */
ConnectionInfo getConnectionInfo() {
Future<ConnectionInfo> connectionInfoFuture;

synchronized (connectionInfoLock) {
connectionInfoFuture = this.current;
}

try {
return Uninterruptibles.getUninterruptibly(connectionInfoFuture);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}

/**
* Retrieves connection info for the instance and schedules a refresh operation for the next
* connection info.
*/
private ConnectionInfo performRefresh()
throws CertificateException, ExecutionException, InterruptedException {
// Rate limit the speed of refresh operations.
this.rateLimiter.acquirePermit();

try {
ConnectionInfo connectionInfo =
this.connectionInfoRepo.getConnectionInfo(this.instanceName, this.clientConnectorKeyPair);

synchronized (connectionInfoLock) {
current = Futures.immediateFuture(connectionInfo);
next =
executor.schedule(
this::performRefresh,
refreshCalculator.calculateSecondsUntilNextRefresh(
Instant.now(), connectionInfo.getClientCertificateExpiration()),
TimeUnit.SECONDS);
}

return connectionInfo;
} catch (CertificateException | ExecutionException | InterruptedException e) {
// For known exceptions, schedule a refresh immediately.
synchronized (connectionInfoLock) {
next = executor.submit(this::performRefresh);
}
throw e;
} catch (RuntimeException e) {
// If the exception is an ApiException, schedule a refresh immediately.
// Otherwise, just throw the exception.
Throwable cause = e.getCause();
if (cause instanceof ApiException) {
synchronized (connectionInfoLock) {
next = executor.submit(this::performRefresh);
}
}
throw e;
}
}
/**
* ConnectionInfoCache is the interface for accessing cached connection info. When connection info
* causes a connection to fail, forceRefresh is available to invalidate the cache and fetch new
* connection info.
*/
interface ConnectionInfoCache {
ConnectionInfo getConnectionInfo();

/**
* Schedules a refresh to start immediately or if a refresh is already scheduled, makes it
* available for getConnectionInfo().
*/
void forceRefresh() {
synchronized (connectionInfoLock) {
// If a scheduled refresh hasn't started, perform one immediately.
next.cancel(false);
if (next.isCancelled()) {
current = executor.submit(this::performRefresh);
next = current;
} else {
// Otherwise it's already running, so just move next to current.
current = next;
}
}
}
void forceRefresh();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.alloydb;

import com.google.cloud.alloydb.v1beta.InstanceName;
import dev.failsafe.RateLimiter;
import java.security.KeyPair;
import java.util.concurrent.ScheduledExecutorService;

interface ConnectionInfoCacheFactory {

ConnectionInfoCache create(
ScheduledExecutorService executor,
ConnectionInfoRepository connectionInfoRepo,
InstanceName instanceName,
KeyPair clientConnectorKeyPair,
RefreshCalculator refreshCalculator,
RateLimiter<Object> rateLimiter);
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,18 @@ class Connector {
private final ScheduledExecutorService executor;
private final ConnectionInfoRepository connectionInfoRepo;
private final KeyPair clientConnectorKeyPair;
private final ConnectionInfoCacheFactory connectionInfoCacheFactory;
private final ConcurrentHashMap<InstanceName, ConnectionInfoCache> instances;

Connector(
ScheduledExecutorService executor,
ConnectionInfoRepository connectionInfoRepo,
KeyPair clientConnectorKeyPair) {
KeyPair clientConnectorKeyPair,
ConnectionInfoCacheFactory connectionInfoCacheFactory) {
this.executor = executor;
this.connectionInfoRepo = connectionInfoRepo;
this.clientConnectorKeyPair = clientConnectorKeyPair;
this.connectionInfoCacheFactory = connectionInfoCacheFactory;
this.instances = new ConcurrentHashMap<>();
}

Expand All @@ -79,7 +82,7 @@ Socket connect(InstanceName instanceName) throws IOException {
k -> {
RateLimiter<Object> rateLimiter =
RateLimiter.burstyBuilder(RATE_LIMIT_BURST_SIZE, RATE_LIMIT_DURATION).build();
return new ConnectionInfoCache(
return connectionInfoCacheFactory.create(
this.executor,
this.connectionInfoRepo,
instanceName,
Expand Down Expand Up @@ -107,8 +110,12 @@ Socket connect(InstanceName instanceName) throws IOException {
socket.connect(new InetSocketAddress(connectionInfo.getIpAddress(), SERVER_SIDE_PROXY_PORT));
socket.startHandshake();
return socket;
} catch (Exception e) {
// TODO: force refresh connection info when handshake fails.
} catch (IOException e) {
connectionInfoCache.forceRefresh();
// The Socket methods above will throw an IOException or a SocketException (subclass of
// IOException). Catch that exception, trigger a refresh, and then throw it again so
// the caller sees the problem, but the connector will have a refreshed certificate on the
// next invocation.
throw e;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.alloydb;

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.alloydb.v1beta.InstanceName;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import dev.failsafe.RateLimiter;
import java.security.KeyPair;
import java.security.cert.CertificateException;
import java.time.Instant;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
* DefaultConnectionInfoCache is the cache used by default to hold connection info. In testing, this
* class may be replaced with alternative implementations of ConnectionInfoCache.
*/
class DefaultConnectionInfoCache implements ConnectionInfoCache {

private final ScheduledExecutorService executor;
private final ConnectionInfoRepository connectionInfoRepo;
private final InstanceName instanceName;
private final KeyPair clientConnectorKeyPair;
private final RateLimiter<Object> rateLimiter;
private final Object connectionInfoLock = new Object();
private final RefreshCalculator refreshCalculator;

@GuardedBy("connectionInfoLock")
private Future<ConnectionInfo> current;

@GuardedBy("connectionInfoLock")
private Future<ConnectionInfo> next;

DefaultConnectionInfoCache(
ScheduledExecutorService executor,
ConnectionInfoRepository connectionInfoRepo,
InstanceName instanceName,
KeyPair clientConnectorKeyPair,
RefreshCalculator refreshCalculator,
RateLimiter<Object> rateLimiter) {
this.executor = executor;
this.connectionInfoRepo = connectionInfoRepo;
this.instanceName = instanceName;
this.clientConnectorKeyPair = clientConnectorKeyPair;
this.refreshCalculator = refreshCalculator;
this.rateLimiter = rateLimiter;
synchronized (connectionInfoLock) {
// Assign to current and next to avoid null references.
this.current = executor.submit(this::performRefresh);
this.next = this.current;
}
}

/** Returns the most recent connection info. */
public ConnectionInfo getConnectionInfo() {
Future<ConnectionInfo> connectionInfoFuture;

synchronized (connectionInfoLock) {
connectionInfoFuture = this.current;
}

try {
return Uninterruptibles.getUninterruptibly(connectionInfoFuture);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}

/**
* Retrieves connection info for the instance and schedules a refresh operation for the next
* connection info.
*/
private ConnectionInfo performRefresh()
throws CertificateException, ExecutionException, InterruptedException {
// Rate limit the speed of refresh operations.
this.rateLimiter.acquirePermit();

try {
ConnectionInfo connectionInfo =
this.connectionInfoRepo.getConnectionInfo(this.instanceName, this.clientConnectorKeyPair);

synchronized (connectionInfoLock) {
current = Futures.immediateFuture(connectionInfo);
next =
executor.schedule(
this::performRefresh,
refreshCalculator.calculateSecondsUntilNextRefresh(
Instant.now(), connectionInfo.getClientCertificateExpiration()),
TimeUnit.SECONDS);
}

return connectionInfo;
} catch (CertificateException | ExecutionException | InterruptedException e) {
// For known exceptions, schedule a refresh immediately.
synchronized (connectionInfoLock) {
next = executor.submit(this::performRefresh);
}
throw e;
} catch (RuntimeException e) {
// If the exception is an ApiException, schedule a refresh immediately
// before re-throwing the exception.
Throwable cause = e.getCause();
if (cause instanceof ApiException) {
synchronized (connectionInfoLock) {
next = executor.submit(this::performRefresh);
}
}
throw e;
}
}

/**
* Schedules a refresh to start immediately or if a refresh is already scheduled, makes it
* available for getConnectionInfo().
*/
public void forceRefresh() {
synchronized (connectionInfoLock) {
// If a scheduled refresh hasn't started, perform one immediately.
next.cancel(false);
if (next.isCancelled()) {
current = executor.submit(this::performRefresh);
next = current;
} else {
// Otherwise it's already running, so just move next to current.
current = next;
}
}
}
}
Loading

0 comments on commit 2a08b11

Please sign in to comment.