Skip to content

Commit

Permalink
chore: add socket factory (#26)
Browse files Browse the repository at this point in the history
This commit adds the SocketFactory wrapper with a ConnectorRegistry
singleton. The ConnectorRegistry is a singleton to make it possible for
external callers to close the associated connector.

In addition, the singleton assures that multiple socket factory
initializations will all re-use the same connector internally.
  • Loading branch information
enocom authored Jul 11, 2023
1 parent 2a08b11 commit 59e19bd
Show file tree
Hide file tree
Showing 12 changed files with 441 additions and 96 deletions.
2 changes: 1 addition & 1 deletion .envrc.example
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
export ALLOYDB_DB="some-db"
export ALLOYDB_USER="some-user"
export ALLOYDB_PASS="some-password"
export ALLOYDB_INSTANCE_URI="projects/<PROJECT>/locations/<REGION>/clusters/<CLUSTER>/instances/<INSTANCE>"
export ALLOYDB_INSTANCE_NAME="projects/<PROJECT>/locations/<REGION>/clusters/<CLUSTER>/instances/<INSTANCE>"
4 changes: 2 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -240,15 +240,15 @@ jobs:
uses: google-github-actions/get-secretmanager-secrets@7fced8b6579c75d7c465165b38ec29175d9a469c # v1.0.0
with:
secrets: |-
ALLOYDB_INSTANCE_URI:${{ secrets.GOOGLE_CLOUD_PROJECT }}/ALLOYDB_INSTANCE_URI
ALLOYDB_INSTANCE_NAME:${{ secrets.GOOGLE_CLOUD_PROJECT }}/ALLOYDB_INSTANCE_URI
ALLOYDB_CLUSTER_PASS:${{ secrets.GOOGLE_CLOUD_PROJECT }}/ALLOYDB_CLUSTER_PASS
- name: Run tests
env:
ALLOYDB_DB: 'postgres'
ALLOYDB_USER: 'postgres'
ALLOYDB_PASS: '${{ steps.secrets.outputs.ALLOYDB_CLUSTER_PASS }}'
ALLOYDB_INSTANCE_URI: '${{ steps.secrets.outputs.ALLOYDB_INSTANCE_URI }}'
ALLOYDB_INSTANCE_NAME: '${{ steps.secrets.outputs.ALLOYDB_INSTANCE_NAME }}'
JOB_TYPE: integration
run: .kokoro/build.sh
shell: bash
Expand Down
26 changes: 26 additions & 0 deletions alloydb-jdbc-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,17 @@
<groupId>org.codehaus.mojo</groupId>
<artifactId>flatten-maven-plugin</artifactId>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<configuration>
<ignoredDependencies>
<!-- The postgres driver must be provided by client application and is otherwise unused here. -->
<ignoredDependency>org.postgresql:postgresql:*</ignoredDependency>
</ignoredDependencies>
</configuration>
</plugin>
</plugins>
</build>

Expand Down Expand Up @@ -104,6 +115,13 @@
<version>31.1-jre</version>
</dependency>

<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.6.0</version>
<scope>provided</scope>
</dependency>

<!-- Test dependencies -->
<dependency>
<groupId>junit</groupId>
Expand All @@ -119,12 +137,20 @@
<scope>test</scope>
</dependency>

<!-- Using this for the deterministic scheduler class only -->
<dependency>
<groupId>org.jmock</groupId>
<artifactId>jmock</artifactId>
<version>2.12.0</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>4.0.3</version>
<scope>test</scope>
</dependency>

</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.alloydb;

import com.google.common.base.Objects;
import java.security.cert.X509Certificate;
import java.time.Instant;
import java.util.List;
Expand Down Expand Up @@ -68,26 +69,15 @@ public boolean equals(Object o) {
}

ConnectionInfo that = (ConnectionInfo) o;

if (!ipAddress.equals(that.ipAddress)) {
return false;
}
if (!instanceUid.equals(that.instanceUid)) {
return false;
}
if (!clientCertificate.equals(that.clientCertificate)) {
return false;
}
return certificateChain.equals(that.certificateChain);
return Objects.equal(ipAddress, that.ipAddress)
&& Objects.equal(instanceUid, that.instanceUid)
&& Objects.equal(clientCertificate, that.clientCertificate)
&& Objects.equal(certificateChain, that.certificateChain);
}

@Override
public int hashCode() {
int result = ipAddress.hashCode();
result = 31 * result + instanceUid.hashCode();
result = 31 * result + clientCertificate.hashCode();
result = 31 * result + certificateChain.hashCode();
return result;
return Objects.hashCode(ipAddress, instanceUid, clientCertificate, certificateChain);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.google.cloud.alloydb;

import com.google.cloud.alloydb.v1beta.InstanceName;
import com.google.common.base.Objects;
import dev.failsafe.RateLimiter;
import java.io.IOException;
import java.net.InetSocketAddress;
Expand Down Expand Up @@ -67,57 +68,13 @@ class Connector {
ScheduledExecutorService executor,
ConnectionInfoRepository connectionInfoRepo,
KeyPair clientConnectorKeyPair,
ConnectionInfoCacheFactory connectionInfoCacheFactory) {
ConnectionInfoCacheFactory connectionInfoCacheFactory,
ConcurrentHashMap<InstanceName, ConnectionInfoCache> instances) {
this.executor = executor;
this.connectionInfoRepo = connectionInfoRepo;
this.clientConnectorKeyPair = clientConnectorKeyPair;
this.connectionInfoCacheFactory = connectionInfoCacheFactory;
this.instances = new ConcurrentHashMap<>();
}

Socket connect(InstanceName instanceName) throws IOException {
ConnectionInfoCache connectionInfoCache =
instances.computeIfAbsent(
instanceName,
k -> {
RateLimiter<Object> rateLimiter =
RateLimiter.burstyBuilder(RATE_LIMIT_BURST_SIZE, RATE_LIMIT_DURATION).build();
return connectionInfoCacheFactory.create(
this.executor,
this.connectionInfoRepo,
instanceName,
this.clientConnectorKeyPair,
new RefreshCalculator(),
rateLimiter);
});

ConnectionInfo connectionInfo = connectionInfoCache.getConnectionInfo();

try {
SSLSocket socket =
buildSocket(
connectionInfo.getClientCertificate(),
connectionInfo.getCertificateChain(),
this.clientConnectorKeyPair.getPrivate());

// Use the instance's IP address as a HostName.
SSLParameters sslParameters = socket.getSSLParameters();
sslParameters.setServerNames(
Collections.singletonList(new SNIHostName(connectionInfo.getIpAddress())));

socket.setKeepAlive(true);
socket.setTcpNoDelay(true);
socket.connect(new InetSocketAddress(connectionInfo.getIpAddress(), SERVER_SIDE_PROXY_PORT));
socket.startHandshake();
return socket;
} catch (IOException e) {
connectionInfoCache.forceRefresh();
// The Socket methods above will throw an IOException or a SocketException (subclass of
// IOException). Catch that exception, trigger a refresh, and then throw it again so
// the caller sees the problem, but the connector will have a refreshed certificate on the
// next invocation.
throw e;
}
this.instances = instances;
}

private static SSLSocket buildSocket(
Expand Down Expand Up @@ -182,4 +139,75 @@ private static KeyManager[] initializeKeyManager(
keyManagerFactory.init(clientAuthenticationKeyStore, new char[0] /* no password */);
return keyManagerFactory.getKeyManagers();
}

Socket connect(InstanceName instanceName) throws IOException {
ConnectionInfoCache connectionInfoCache =
instances.computeIfAbsent(
instanceName,
k -> {
RateLimiter<Object> rateLimiter =
RateLimiter.burstyBuilder(RATE_LIMIT_BURST_SIZE, RATE_LIMIT_DURATION).build();
return connectionInfoCacheFactory.create(
this.executor,
this.connectionInfoRepo,
instanceName,
this.clientConnectorKeyPair,
new RefreshCalculator(),
rateLimiter);
});

ConnectionInfo connectionInfo = connectionInfoCache.getConnectionInfo();

try {
SSLSocket socket =
buildSocket(
connectionInfo.getClientCertificate(),
connectionInfo.getCertificateChain(),
this.clientConnectorKeyPair.getPrivate());

// Use the instance's IP address as a HostName.
SSLParameters sslParameters = socket.getSSLParameters();
sslParameters.setServerNames(
Collections.singletonList(new SNIHostName(connectionInfo.getIpAddress())));

socket.setKeepAlive(true);
socket.setTcpNoDelay(true);
socket.connect(new InetSocketAddress(connectionInfo.getIpAddress(), SERVER_SIDE_PROXY_PORT));
socket.startHandshake();
return socket;
} catch (IOException e) {
connectionInfoCache.forceRefresh();
// The Socket methods above will throw an IOException or a SocketException (subclass of
// IOException). Catch that exception, trigger a refresh, and then throw it again so
// the caller sees the problem, but the connector will have a refreshed certificate on the
// next invocation.
throw e;
}
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Connector that = (Connector) o;
return Objects.equal(executor, that.executor)
&& Objects.equal(connectionInfoRepo, that.connectionInfoRepo)
&& Objects.equal(clientConnectorKeyPair, that.clientConnectorKeyPair)
&& Objects.equal(connectionInfoCacheFactory, that.connectionInfoCacheFactory)
&& Objects.equal(instances, that.instances);
}

@Override
public int hashCode() {
return Objects.hashCode(
executor,
connectionInfoRepo,
clientConnectorKeyPair,
connectionInfoCacheFactory,
instances);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.alloydb;

import com.google.cloud.alloydb.v1beta.AlloyDBAdminClient;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

/**
* ConnectorRegistry is a singleton that creates a single Executor, KeyPair, and AlloyDB Admin
* Client for the lifetime of the SocketFactory. When callers are finished with the Connector, they
* should use the ConnectorRegistry to shut down all the associated resources.
*/
public enum ConnectorRegistry implements Closeable {
INSTANCE;

private final ScheduledExecutorService executor;
private final AlloyDBAdminClient alloyDBAdminClient;
private final Connector connector;

ConnectorRegistry() {
this.executor = Executors.newScheduledThreadPool(2);
try {
alloyDBAdminClient = AlloyDBAdminClient.create();
} catch (IOException e) {
throw new RuntimeException(e);
}
this.connector =
new Connector(
executor,
new DefaultConnectionInfoRepository(executor, alloyDBAdminClient),
RsaKeyPairGenerator.generateKeyPair(),
new DefaultConnectionInfoCacheFactory(),
new ConcurrentHashMap<>());
}

Connector getConnector() {
return this.connector;
}

@Override
public void close() {
this.executor.shutdown();
this.alloyDBAdminClient.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,18 @@
import java.security.KeyPairGenerator;
import java.security.NoSuchAlgorithmException;

public class RsaKeyPairGenerator {
public static final KeyPair TEST_KEY_PAIR = generateKeyPair();
class RsaKeyPairGenerator {

public static KeyPair generateKeyPair() {
public static final int DEFAULT_KEY_SIZE = 2048;

static KeyPair generateKeyPair() {
KeyPairGenerator generator;
try {
generator = java.security.KeyPairGenerator.getInstance("RSA");
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException("Missing RSA generator");
}
generator.initialize(2048);
generator.initialize(DEFAULT_KEY_SIZE);

return generator.generateKeyPair();
}
Expand Down
Loading

0 comments on commit 59e19bd

Please sign in to comment.