Skip to content

Commit

Permalink
chore: add connector (#19)
Browse files Browse the repository at this point in the history
The Connector class manages the creation of TLS sockets using
information retrieved from the connection info repository.

The Connector also caches connection info to minimize the interactions
with the AlloyDB API.
  • Loading branch information
enocom authored Jul 6, 2023
1 parent bb32c00 commit 2e666bf
Show file tree
Hide file tree
Showing 7 changed files with 324 additions and 3 deletions.
3 changes: 3 additions & 0 deletions .envrc.example
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
export ALLOYDB_DB="some-db"
export ALLOYDB_USER="some-user"
export ALLOYDB_PASS="some-password"
export ALLOYDB_INSTANCE_URI="projects/<PROJECT>/locations/<REGION>/clusters/<CLUSTER>/instances/<INSTANCE>"
11 changes: 9 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ jobs:
- run: .kokoro/build.sh
env:
JOB_TYPE: clirr

# graalvm17:
# # run job on proper workflow event triggers (skip job for pull_request event from forks and only run pull_request_target for "tests: run" label)
# if: "${{ (github.event.action != 'labeled' && github.event.pull_request.head.repo.full_name == github.event.pull_request.base.repo.full_name) || github.event.label.name == 'tests: run' }}"
Expand Down Expand Up @@ -180,11 +181,12 @@ jobs:
# chmod +x ./flakybot
# ./flakybot --repo ${{github.repository}} --commit_hash ${{github.sha}} --build_url https://github.com/${{github.repository}}/actions/runs/${{github.run_id}}
#

unitsAndE2e:
# run job on proper workflow event triggers (skip job for pull_request event from forks and only run pull_request_target for "tests: run" label)
if: "${{ (github.event.action != 'labeled' && github.event.pull_request.head.repo.full_name == github.event.pull_request.base.repo.full_name) || github.event.label.name == 'tests: run' }}"
name: units + e2e
runs-on: ubuntu-latest
runs-on: [self-hosted, linux, x64]
permissions:
contents: 'read'
id-token: 'write'
Expand Down Expand Up @@ -218,7 +220,12 @@ jobs:
uses: actions/setup-java@5ffc13f4174014e2d4d4572b3d74c3fa61aeb2c2 # v3.11.1
with:
distribution: 'zulu'
java-version: 17
java-version: 8

- name: Setup Maven Action
uses: s4u/setup-maven-action@94605e0cdfe442da48d603db22bbf4c7d203c076
with:
java-version: 8

- id: 'auth'
name: Authenticate to Google Cloud
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.alloydb;

import com.google.api.gax.rpc.FixedHeaderProvider;
import com.google.cloud.alloydb.v1beta.AlloyDBAdminClient;
import com.google.cloud.alloydb.v1beta.AlloyDBAdminSettings;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.Map;

/**
* The AlloyDB Admin Client Factory encapsulates configuration of the client and is the single way
* to create new clients in the Connector.
*/
class AlloyDBAdminClientFactory {

private static final String DEFAULT_ENDPOINT = "alloydb.googleapis.com:443";

static AlloyDBAdminClient create() throws IOException {
AlloyDBAdminSettings.Builder settingsBuilder = AlloyDBAdminSettings.newBuilder();

Map<String, String> headers =
ImmutableMap.<String, String>builder()
.put("user-agent", "alloydb-java-connector/" + Version.VERSION)
.build();

AlloyDBAdminSettings alloyDBAdminSettings =
settingsBuilder
.setEndpoint(DEFAULT_ENDPOINT)
.setHeaderProvider(FixedHeaderProvider.create(headers))
.build();

return AlloyDBAdminClient.create(alloyDBAdminSettings);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.alloydb;

import com.google.cloud.alloydb.v1beta.InstanceName;
import dev.failsafe.RateLimiter;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.security.GeneralSecurityException;
import java.security.KeyPair;
import java.security.KeyStore;
import java.security.KeyStore.PasswordProtection;
import java.security.KeyStore.PrivateKeyEntry;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.PrivateKey;
import java.security.SecureRandom;
import java.security.UnrecoverableKeyException;
import java.security.cert.Certificate;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SNIHostName;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;
import javax.net.ssl.SSLSocket;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;

class Connector {

private static final String TLS_1_3 = "TLSv1.3";
private static final String X_509 = "X.509";
private static final Duration RATE_LIMIT_DURATION = Duration.ofSeconds(30);
private static final int RATE_LIMIT_BURST_SIZE = 2;
private static final int SERVER_SIDE_PROXY_PORT = 5433;
private static final String ROOT_CA_CERT = "rootCaCert";
private static final String CLIENT_CERT = "clientCert";
private final ScheduledExecutorService executor;
private final ConnectionInfoRepository connectionInfoRepo;
private final KeyPair clientConnectorKeyPair;
private final ConcurrentHashMap<InstanceName, ConnectionInfoCache> instances;

Connector(
ScheduledExecutorService executor,
ConnectionInfoRepository connectionInfoRepo,
KeyPair clientConnectorKeyPair) {
this.executor = executor;
this.connectionInfoRepo = connectionInfoRepo;
this.clientConnectorKeyPair = clientConnectorKeyPair;
this.instances = new ConcurrentHashMap<>();
}

Socket connect(InstanceName instanceName) throws IOException {
ConnectionInfoCache connectionInfoCache =
instances.computeIfAbsent(
instanceName,
k -> {
RateLimiter<Object> rateLimiter =
RateLimiter.burstyBuilder(RATE_LIMIT_BURST_SIZE, RATE_LIMIT_DURATION).build();
return new ConnectionInfoCache(
this.executor,
this.connectionInfoRepo,
instanceName,
this.clientConnectorKeyPair,
new RefreshCalculator(),
rateLimiter);
});

ConnectionInfo connectionInfo = connectionInfoCache.getConnectionInfo();

try {
SSLSocket socket =
buildSocket(
connectionInfo.getClientCertificate(),
connectionInfo.getCertificateChain(),
this.clientConnectorKeyPair.getPrivate());

// Use the instance's IP address as a HostName.
SSLParameters sslParameters = socket.getSSLParameters();
sslParameters.setServerNames(
Collections.singletonList(new SNIHostName(connectionInfo.getIpAddress())));

socket.setKeepAlive(true);
socket.setTcpNoDelay(true);
socket.connect(new InetSocketAddress(connectionInfo.getIpAddress(), SERVER_SIDE_PROXY_PORT));
socket.startHandshake();
return socket;
} catch (Exception e) {
// TODO: force refresh connection info when handshake fails.
throw e;
}
}

private static SSLSocket buildSocket(
X509Certificate clientCertificate,
List<X509Certificate> certificateChain,
PrivateKey privateKey) {
try {
// First initialize a KeyManager with the ephemeral certificate
// (including the chain of trust to the root CA cert) and the connector's private key.
KeyManager[] keyManagers =
initializeKeyManager(clientCertificate, certificateChain, privateKey);

// Next, initialize a TrustManager with the root CA certificate.
TrustManager[] trustManagers = initializeTrustManager(certificateChain);

// Now, create a TLS 1.3 SSLContext initialized with the KeyManager and the TrustManager,
// and create the SSL Socket.
SSLContext sslContext = SSLContext.getInstance(TLS_1_3);
sslContext.init(keyManagers, trustManagers, new SecureRandom());
return (SSLSocket) sslContext.getSocketFactory().createSocket();
} catch (GeneralSecurityException | IOException ex) {
throw new RuntimeException("Unable to create an SSL Context for the instance.", ex);
}
}

private static TrustManager[] initializeTrustManager(List<X509Certificate> certificateChain)
throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException {
KeyStore trustedKeyStore = KeyStore.getInstance(KeyStore.getDefaultType());
trustedKeyStore.load(
null, // don't load the key store from an input stream
null // there is no password
);
trustedKeyStore.setCertificateEntry(
ROOT_CA_CERT,
certificateChain.get(certificateChain.size() - 1) // root CA cert is last in the chain
);
TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(X_509);
trustManagerFactory.init(trustedKeyStore);
return trustManagerFactory.getTrustManagers();
}

private static KeyManager[] initializeKeyManager(
X509Certificate clientCertificate,
List<X509Certificate> certificateChain,
PrivateKey privateKey)
throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException,
UnrecoverableKeyException {
KeyStore clientAuthenticationKeyStore = KeyStore.getInstance(KeyStore.getDefaultType());
clientAuthenticationKeyStore.load(
null, // don't load the key store from an input stream
null // there is no password
);
List<Certificate> chain = new ArrayList<>();
chain.add(clientCertificate);
chain.addAll(certificateChain);
Certificate[] chainArray = chain.toArray(new Certificate[] {});
PrivateKeyEntry privateKeyEntry = new PrivateKeyEntry(privateKey, chainArray);
clientAuthenticationKeyStore.setEntry(
CLIENT_CERT, privateKeyEntry, new PasswordProtection(new char[0]) /* no password */);
KeyManagerFactory keyManagerFactory =
KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
keyManagerFactory.init(clientAuthenticationKeyStore, new char[0] /* no password */);
return keyManagerFactory.getKeyManagers();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.alloydb;

class Version {
// {x-version-update-start:alloydb-jdbc-connector:current}
static final String VERSION = "0.0.1-SNAPSHOT";
// {x-version-update-end}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.alloydb;

import static com.google.common.truth.Truth.assertThat;

import com.google.cloud.alloydb.v1beta.AlloyDBAdminClient;
import com.google.cloud.alloydb.v1beta.InstanceName;
import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import javax.net.ssl.SSLSocket;
import org.junit.Before;
import org.junit.Test;

public class ITConnectorTest {

private String instanceUri;

@Before
public void setUp() {
instanceUri = System.getenv("ALLOYDB_INSTANCE_URI");
}

@Test
public void testConnect_createsSocketConnection() throws IOException {
SSLSocket socket = null;
ScheduledThreadPoolExecutor executor = null;
try (AlloyDBAdminClient alloyDBAdminClient = AlloyDBAdminClientFactory.create()) {
executor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(2);
ConnectionInfoRepository connectionInfoRepository =
new DefaultConnectionInfoRepository(executor, alloyDBAdminClient);
Connector connector =
new Connector(executor, connectionInfoRepository, RsaKeyPairGenerator.generateKeyPair());

socket = (SSLSocket) connector.connect(InstanceName.parse(instanceUri));

assertThat(socket.getKeepAlive()).isTrue();
assertThat(socket.getTcpNoDelay()).isTrue();
} finally {
if (socket != null) {
socket.close();
}
if (executor != null) {
executor.shutdown();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void setUp() throws Exception {

keyPair = generator.generateKeyPair();
executor = Executors.newSingleThreadExecutor();
alloyDBAdminClient = AlloyDBAdminClient.create();
alloyDBAdminClient = AlloyDBAdminClientFactory.create();

defaultConnectionInfoRepository =
new DefaultConnectionInfoRepository(executor, alloyDBAdminClient);
Expand Down

0 comments on commit 2e666bf

Please sign in to comment.