Skip to content

Commit

Permalink
Implement the CustomEndpointPlugin (#1122)
Browse files Browse the repository at this point in the history
  • Loading branch information
aaron-congo authored Oct 18, 2024
1 parent ed3bbe7 commit b367f4e
Show file tree
Hide file tree
Showing 45 changed files with 2,064 additions and 157 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/#semantic-versioning-200).

### :magic_wand: Added
- Custom Endpoint Plugin. See [UsingTheCustomEndpointPlugin.md](https://github.com/aws/aws-advanced-jdbc-wrapper/blob/main/docs/using-the-jdbc-driver/using-plugins/UsingTheCustomEndpointPlugin.md).

### :bug: Fixed
- Use the cluster URL as the default cluster ID ([PR #1131](https://github.com/aws/aws-advanced-jdbc-wrapper/pull/1131)).
- Fix logic in SlidingExpirationCache and SlidingExpirationCacheWithCleanupThread ([PR #1142](https://github.com/aws/aws-advanced-jdbc-wrapper/pull/1142)).
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package software.amazon.jdbc;

import java.util.Collections;
import java.util.Set;
import org.checkerframework.checker.nullness.qual.Nullable;
import software.amazon.jdbc.util.Utils;

/**
* Represents the allowed and blocked hosts for connections.
*/
public class AllowedAndBlockedHosts {
@Nullable private final Set<String> allowedHostIds;
@Nullable private final Set<String> blockedHostIds;

/**
* Constructs an AllowedAndBlockedHosts instance with the specified allowed and blocked host IDs.
*
* @param allowedHostIds The set of allowed host IDs for connections. If null or empty, all host IDs that are not in
* {@code blockedHostIds} are allowed.
* @param blockedHostIds The set of blocked host IDs for connections. If null or empty, all host IDs in
* {@code allowedHostIds} are allowed. If {@code allowedHostIds} is also null or empty, there
* are no restrictions on which hosts are allowed.
*/
public AllowedAndBlockedHosts(@Nullable Set<String> allowedHostIds, @Nullable Set<String> blockedHostIds) {
this.allowedHostIds = Utils.isNullOrEmpty(allowedHostIds) ? null : Collections.unmodifiableSet(allowedHostIds);
this.blockedHostIds = Utils.isNullOrEmpty(blockedHostIds) ? null : Collections.unmodifiableSet(blockedHostIds);
}

/**
* Returns the set of allowed host IDs for connections. If null or empty, all host IDs that are not in
* {@code blockedHostIds} are allowed.
*
* @return the set of allowed host IDs for connections.
*/
@Nullable
public Set<String> getAllowedHostIds() {
return this.allowedHostIds;
}

/**
* Returns the set of blocked host IDs for connections. If null or empty, all host IDs in {@code allowedHostIds} are
* allowed. If {@code allowedHostIds} is also null or empty, there are no restrictions on which hosts are allowed.
*
* @return the set of blocked host IDs for connections.
*/
@Nullable
public Set<String> getBlockedHostIds() {
return this.blockedHostIds;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import software.amazon.jdbc.plugin.DriverMetaDataConnectionPluginFactory;
import software.amazon.jdbc.plugin.ExecutionTimeConnectionPluginFactory;
import software.amazon.jdbc.plugin.LogQueryConnectionPluginFactory;
import software.amazon.jdbc.plugin.customendpoint.CustomEndpointPluginFactory;
import software.amazon.jdbc.plugin.dev.DeveloperConnectionPluginFactory;
import software.amazon.jdbc.plugin.efm.HostMonitoringConnectionPluginFactory;
import software.amazon.jdbc.plugin.failover.FailoverConnectionPluginFactory;
Expand Down Expand Up @@ -63,6 +64,7 @@ public class ConnectionPluginChainBuilder {
put("executionTime", ExecutionTimeConnectionPluginFactory.class);
put("logQuery", LogQueryConnectionPluginFactory.class);
put("dataCache", DataCacheConnectionPluginFactory.class);
put("customEndpoint", CustomEndpointPluginFactory.class);
put("efm", HostMonitoringConnectionPluginFactory.class);
put("efm2", software.amazon.jdbc.plugin.efm2.HostMonitoringConnectionPluginFactory.class);
put("failover", FailoverConnectionPluginFactory.class);
Expand Down Expand Up @@ -93,6 +95,7 @@ public class ConnectionPluginChainBuilder {
{
put(DriverMetaDataConnectionPluginFactory.class, 100);
put(DataCacheConnectionPluginFactory.class, 200);
put(CustomEndpointPluginFactory.class, 380);
put(AuroraInitialConnectionStrategyPluginFactory.class, 390);
put(AuroraConnectionTrackerPluginFactory.class, 400);
put(AuroraStaleDnsPluginFactory.class, 500);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import software.amazon.jdbc.plugin.DefaultConnectionPlugin;
import software.amazon.jdbc.plugin.ExecutionTimeConnectionPlugin;
import software.amazon.jdbc.plugin.LogQueryConnectionPlugin;
import software.amazon.jdbc.plugin.customendpoint.CustomEndpointPlugin;
import software.amazon.jdbc.plugin.efm.HostMonitoringConnectionPlugin;
import software.amazon.jdbc.plugin.failover.FailoverConnectionPlugin;
import software.amazon.jdbc.plugin.federatedauth.FederatedAuthPlugin;
Expand All @@ -50,6 +51,7 @@
import software.amazon.jdbc.util.AsynchronousMethodsHelper;
import software.amazon.jdbc.util.Messages;
import software.amazon.jdbc.util.SqlMethodAnalyzer;
import software.amazon.jdbc.util.Utils;
import software.amazon.jdbc.util.WrapperUtils;
import software.amazon.jdbc.util.telemetry.TelemetryContext;
import software.amazon.jdbc.util.telemetry.TelemetryFactory;
Expand Down Expand Up @@ -85,6 +87,7 @@ public class ConnectionPluginManager implements CanReleaseResources, Wrapper {
put(FastestResponseStrategyPlugin.class, "plugin:fastestResponseStrategy");
put(DefaultConnectionPlugin.class, "plugin:targetDriver");
put(AuroraInitialConnectionStrategyPlugin.class, "plugin:initialConnection");
put(CustomEndpointPlugin.class, "plugin:customEndpoint");
}
};

Expand Down Expand Up @@ -493,7 +496,7 @@ public HostSpec getHostSpecByStrategy(List<HostSpec> hosts, HostRole role, Strin

if (isSubscribed) {
try {
final HostSpec host = hosts == null || hosts.isEmpty()
final HostSpec host = Utils.isNullOrEmpty(hosts)
? plugin.getHostSpecByStrategy(role, strategy)
: plugin.getHostSpecByStrategy(hosts, role, strategy);

Expand Down
21 changes: 21 additions & 0 deletions wrapper/src/main/java/software/amazon/jdbc/PluginService.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,31 @@ EnumSet<NodeChangeOptions> setCurrentConnection(
@Nullable ConnectionPlugin skipNotificationForThisPlugin)
throws SQLException;

/**
* Get host information for all hosts in the cluster.
*
* @return host information for all hosts in the cluster.
*/
List<HostSpec> getAllHosts();

/**
* Get host information for allowed hosts in the cluster. Certain hosts in the cluster may be disallowed, and these
* hosts will not be returned by this function. For example, if a custom endpoint is being used, hosts outside the
* custom endpoint will not be returned.
*
* @return host information for allowed hosts in the cluster.
*/
List<HostSpec> getHosts();

HostSpec getInitialConnectionHostSpec();

/**
* Set the collection of hosts that should be allowed and/or blocked for connections.
*
* @param allowedAndBlockedHosts An object defining the allowed and blocked sets of hosts.
*/
void setAllowedAndBlockedHosts(AllowedAndBlockedHosts allowedAndBlockedHosts);

/**
* Returns a boolean indicating if the available {@link ConnectionProvider} or
* {@link ConnectionPlugin} instances support the selection of a host with the requested role and
Expand Down
70 changes: 57 additions & 13 deletions wrapper/src/main/java/software/amazon/jdbc/PluginServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Logger;
import java.util.stream.Collectors;
Expand All @@ -52,6 +53,7 @@
import software.amazon.jdbc.targetdriverdialect.TargetDriverDialect;
import software.amazon.jdbc.util.CacheMap;
import software.amazon.jdbc.util.Messages;
import software.amazon.jdbc.util.Utils;
import software.amazon.jdbc.util.telemetry.TelemetryFactory;

public class PluginServiceImpl implements PluginService, CanReleaseResources,
Expand All @@ -66,7 +68,8 @@ public class PluginServiceImpl implements PluginService, CanReleaseResources,
private final String originalUrl;
private final String driverProtocol;
protected volatile HostListProvider hostListProvider;
protected List<HostSpec> hosts = new ArrayList<>();
protected List<HostSpec> allHosts = new ArrayList<>();
protected AtomicReference<AllowedAndBlockedHosts> allowedAndBlockedHosts = new AtomicReference<>();
protected Connection currentConnection;
protected HostSpec currentHostSpec;
protected HostSpec initialConnectionHostSpec;
Expand Down Expand Up @@ -162,10 +165,20 @@ public HostSpec getCurrentHostSpec() {
this.currentHostSpec = this.initialConnectionHostSpec;

if (this.currentHostSpec == null) {
if (this.getHosts().isEmpty()) {
if (this.getAllHosts().isEmpty()) {
throw new RuntimeException(Messages.get("PluginServiceImpl.hostListEmpty"));
}
this.currentHostSpec = this.getWriter(this.getHosts());

this.currentHostSpec = this.getWriter(this.getAllHosts());
if (!this.getHosts().contains(this.currentHostSpec)) {
throw new RuntimeException(
Messages.get("PluginServiceImpl.currentHostNotAllowed",
new Object[] {
currentHostSpec == null ? "<null>" : currentHostSpec.getHost(),
Utils.logTopology(this.getHosts(), "")})
);
}

if (this.currentHostSpec == null) {
this.currentHostSpec = this.getHosts().get(0);
}
Expand All @@ -187,6 +200,11 @@ public HostSpec getInitialConnectionHostSpec() {
return this.initialConnectionHostSpec;
}

@Override
public void setAllowedAndBlockedHosts(AllowedAndBlockedHosts allowedAndBlockedHosts) {
this.allowedAndBlockedHosts.set(allowedAndBlockedHosts);
}

@Override
public boolean acceptsStrategy(HostRole role, String strategy) throws SQLException {
return this.pluginManager.acceptsStrategy(role, strategy);
Expand Down Expand Up @@ -364,9 +382,35 @@ protected EnumSet<NodeChangeOptions> compare(
return changes;
}

@Override
public List<HostSpec> getAllHosts() {
return this.allHosts;
}

@Override
public List<HostSpec> getHosts() {
return this.hosts;
AllowedAndBlockedHosts hostPermissions = this.allowedAndBlockedHosts.get();
if (hostPermissions == null) {
return this.allHosts;
}

List<HostSpec> hosts = this.allHosts;
Set<String> allowedHostIds = hostPermissions.getAllowedHostIds();
Set<String> blockedHostIds = hostPermissions.getBlockedHostIds();

if (!Utils.isNullOrEmpty(allowedHostIds)) {
hosts = hosts.stream()
.filter((hostSpec -> allowedHostIds.contains(hostSpec.getHostId())))
.collect(Collectors.toList());
}

if (!Utils.isNullOrEmpty(blockedHostIds)) {
hosts = hosts.stream()
.filter((hostSpec -> !blockedHostIds.contains(hostSpec.getHostId())))
.collect(Collectors.toList());
}

return hosts;
}

@Override
Expand All @@ -376,7 +420,7 @@ public void setAvailability(final @NonNull Set<String> hostAliases, final @NonNu
return;
}

final List<HostSpec> hostsToChange = this.getHosts().stream()
final List<HostSpec> hostsToChange = this.getAllHosts().stream()
.filter((host) -> hostAliases.contains(host.asAlias())
|| host.getAliases().stream().anyMatch(hostAliases::contains))
.distinct()
Expand Down Expand Up @@ -427,18 +471,18 @@ public HostListProvider getHostListProvider() {
@Override
public void refreshHostList() throws SQLException {
final List<HostSpec> updatedHostList = this.getHostListProvider().refresh();
if (!Objects.equals(updatedHostList, this.hosts)) {
if (!Objects.equals(updatedHostList, this.allHosts)) {
updateHostAvailability(updatedHostList);
setNodeList(this.hosts, updatedHostList);
setNodeList(this.allHosts, updatedHostList);
}
}

@Override
public void refreshHostList(final Connection connection) throws SQLException {
final List<HostSpec> updatedHostList = this.getHostListProvider().refresh(connection);
if (!Objects.equals(updatedHostList, this.hosts)) {
if (!Objects.equals(updatedHostList, this.allHosts)) {
updateHostAvailability(updatedHostList);
setNodeList(this.hosts, updatedHostList);
setNodeList(this.allHosts, updatedHostList);
}
}

Expand All @@ -447,7 +491,7 @@ public void forceRefreshHostList() throws SQLException {
final List<HostSpec> updatedHostList = this.getHostListProvider().forceRefresh();
if (updatedHostList != null) {
updateHostAvailability(updatedHostList);
setNodeList(this.hosts, updatedHostList);
setNodeList(this.allHosts, updatedHostList);
}
}

Expand All @@ -456,7 +500,7 @@ public void forceRefreshHostList(final Connection connection) throws SQLExceptio
final List<HostSpec> updatedHostList = this.getHostListProvider().forceRefresh(connection);
if (updatedHostList != null) {
updateHostAvailability(updatedHostList);
setNodeList(this.hosts, updatedHostList);
setNodeList(this.allHosts, updatedHostList);
}
}

Expand All @@ -476,7 +520,7 @@ public boolean forceRefreshHostList(final boolean shouldVerifyWriter, final long
((BlockingHostListProvider) hostListProvider).forceRefresh(shouldVerifyWriter, timeoutMs);
if (updatedHostList != null) {
updateHostAvailability(updatedHostList);
setNodeList(this.hosts, updatedHostList);
setNodeList(this.allHosts, updatedHostList);
return true;
}
} catch (TimeoutException ex) {
Expand Down Expand Up @@ -520,7 +564,7 @@ void setNodeList(@Nullable final List<HostSpec> oldHosts,
}

if (!changes.isEmpty()) {
this.hosts = newHosts != null ? newHosts : new ArrayList<>();
this.allHosts = newHosts != null ? newHosts : new ArrayList<>();
this.pluginManager.notifyNodeListChanged(changes);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ public List<HostSpec> refresh(final Connection connection) throws SQLException {
: this.hostListProviderService.getCurrentConnection();

final FetchTopologyResult results = getTopology(currentConnection, false);
LOGGER.finest(() -> Utils.logTopology(results.hosts, results.isCachedData ? "[From cache] " : ""));
LOGGER.finest(() -> Utils.logTopology(results.hosts, results.isCachedData ? "[From cache] Topology:" : null));

this.hostList = results.hosts;
return Collections.unmodifiableList(hostList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public <T, E extends Exception> T execute(final Class<T> resultClass, final Clas
}

private void checkWriterChanged() {
final HostSpec hostSpecAfterFailover = this.getWriter(this.pluginService.getHosts());
final HostSpec hostSpecAfterFailover = this.getWriter(this.pluginService.getAllHosts());

if (this.currentWriter == null) {
this.currentWriter = hostSpecAfterFailover;
Expand All @@ -153,7 +153,7 @@ private void checkWriterChanged() {

private void rememberWriter() {
if (this.currentWriter == null || this.needUpdateCurrentWriter) {
this.currentWriter = this.getWriter(this.pluginService.getHosts());
this.currentWriter = this.getWriter(this.pluginService.getAllHosts());
this.needUpdateCurrentWriter = false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ private void delay(final long delayMs) {
}

private HostSpec getWriter() {
for (final HostSpec host : this.pluginService.getHosts()) {
for (final HostSpec host : this.pluginService.getAllHosts()) {
if (host.getRole() == HostRole.WRITER) {
return host;
}
Expand Down Expand Up @@ -380,12 +380,12 @@ private HostSpec getReader(final Properties props) throws SQLException {
}

private boolean hasNoReaders() {
if (this.pluginService.getHosts().isEmpty()) {
if (this.pluginService.getAllHosts().isEmpty()) {
// Topology inconclusive/corrupted.
return false;
}

for (HostSpec hostSpec : this.pluginService.getHosts()) {
for (HostSpec hostSpec : this.pluginService.getAllHosts()) {
if (hostSpec.getRole() == HostRole.WRITER) {
continue;
}
Expand Down
Loading

0 comments on commit b367f4e

Please sign in to comment.