Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize thread locks and expiring cache for Enhanced Monitoring plugin #356

Merged
merged 8 commits into from
Feb 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jobs:
- name: 'Set up Temp AWS Credentials'
run: |
creds=($(aws sts get-session-token \
--duration-seconds 3600 \
--duration-seconds 7200 \
--query 'Credentials.[AccessKeyId, SecretAccessKey, SessionToken]' \
--output text \
| xargs));
Expand Down
138 changes: 138 additions & 0 deletions src/main/core-api/java/com/mysql/cj/util/CacheMap.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License, version 2.0
* (GPLv2), as published by the Free Software Foundation, with the
* following additional permissions:
*
* This program is distributed with certain software that is licensed
* under separate terms, as designated in a particular file or component
* or in the license documentation. Without limiting your rights under
* the GPLv2, the authors of this program hereby grant you an additional
* permission to link the program and your derivative works with the
* separately licensed software that they have included with the program.
*
* Without limiting the foregoing grant of rights under the GPLv2 and
* additional permission as to separately licensed software, this
* program is also subject to the Universal FOSS Exception, version 1.0,
* a copy of which can be found along with its FAQ at
* http://oss.oracle.com/licenses/universal-foss-exception.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU General Public License, version 2.0, for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see
* http://www.gnu.org/licenses/gpl-2.0.html.
*/

package com.mysql.cj.util;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class CacheMap<K,V> {

private final Map<K, CacheItem<V>> cache = new ConcurrentHashMap<>();
private final long cleanupIntervalNanos = TimeUnit.MINUTES.toNanos(10);
private final AtomicLong cleanupTimeNanos = new AtomicLong(System.nanoTime() + cleanupIntervalNanos);

public CacheMap() {
}

public V get(final K key) {
CacheItem<V> cacheItem = cache.computeIfPresent(key, (kk, vv) -> vv.isExpired() ? null : vv);
return cacheItem == null ? null : cacheItem.item;
}

public V get(final K key, final V defaultItemValue, long itemExpirationNano) {
CacheItem<V> cacheItem = cache.compute(key,
(kk, vv) -> (vv == null || vv.isExpired())
? new CacheItem<>(defaultItemValue, System.nanoTime() + itemExpirationNano)
: vv);
return cacheItem.item;
}
Comment on lines +49 to +59
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these methods can be simplified if you add a getter for cacheItem.item:

  public V getItem() {
      return isExpired() ? null : item;
    }


public void put(final K key, final V item, long itemExpirationNano) {
cache.put(key, new CacheItem<>(item, System.nanoTime() + itemExpirationNano));
cleanUp();
}

public void putIfAbsent(final K key, final V item, long itemExpirationNano) {
cache.putIfAbsent(key, new CacheItem<>(item, System.nanoTime() + itemExpirationNano));
cleanUp();
}

public void remove(final K key) {
cache.remove(key);
cleanUp();
}

public void clear() {
cache.clear();
}

public int size() { return this.cache.size(); }

private void cleanUp() {
if (this.cleanupTimeNanos.get() < System.nanoTime()) {
this.cleanupTimeNanos.set(System.nanoTime() + cleanupIntervalNanos);
cache.forEach((key, value) -> {
if (value == null || value.isExpired()) {
cache.remove(key);
}
});
}
}

private static class CacheItem<V> {
final V item;
final long expirationTime;

public CacheItem(V item, long expirationTime) {
this.item = item;
this.expirationTime = expirationTime;
}

boolean isExpired() {
return System.nanoTime() > expirationTime;
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((item == null) ? 0 : item.hashCode());
return result;
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
CacheItem<?> other = (CacheItem<?>) obj;
if (item == null) {
return other.item == null;
} else {
return item.equals(other.item);
}
}

@Override
public String toString() {
return "CacheItem [item=" + item + ", expirationTime=" + expirationTime + "]";
}
}
}
sergiyvamz marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,6 @@ ConnectionFeatureNotAvailableException.0=Feature not available in this distribut
IllegalArgumentException.NullParameter=Parameter ''{0}'' must not be null.
InvalidLoadBalanceStrategy=Invalid load balancing strategy ''{0}''.
DefaultMonitorService.EmptyNodeKeys=Empty NodeKey set passed into DefaultMonitorService. Set should not be empty.
DefaultMonitorService.NoMonitorForContext=Can't find monitor for context passed into DefaultMonitorService.
DefaultMonitorService.InvalidContext=Invalid context passed into DefaultMonitorService. Could not find any NodeKey from context.
DefaultMonitorService.InvalidNodeKey=Invalid node key passed into DefaultMonitorService. No existing monitor for the given set of node keys.

Expand Down
51 changes: 27 additions & 24 deletions src/main/user-impl/java/com/mysql/cj/jdbc/ha/ConnectionProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public class ConnectionProxy implements ICurrentConnectionProvider, InvocationHa
protected ConnectionPluginManager pluginManager = null;
private HostInfo currentHostInfo;
private JdbcConnection currentConnection;
private Class<?> currentConnectionClass;

public ConnectionProxy(ConnectionUrl connectionUrl) throws SQLException {
this(connectionUrl, null);
Expand All @@ -98,6 +99,7 @@ public ConnectionProxy(ConnectionUrl connectionUrl, JdbcConnection connection) t
throws SQLException {
this.currentHostInfo = connectionUrl.getMainHost();
this.currentConnection = connection;
this.currentConnectionClass = connection == null ? null : connection.getClass();

initLogger(connectionUrl);
initSettings(connectionUrl);
Expand Down Expand Up @@ -175,36 +177,37 @@ public void setCurrentConnection(JdbcConnection connection, HostInfo info) {
}

this.currentConnection = connection;
this.currentConnectionClass = connection == null ? null : connection.getClass();
this.currentHostInfo = info;
}

@Override
public synchronized Object invoke(Object proxy, Method method, Object[] args)
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
final String methodName = method.getName();

if (isDirectExecute(methodName)) {
return executeMethodDirectly(methodName, args);
}

Object[] argsCopy = args == null ? null : Arrays.copyOf(args, args.length);

try {
Object result = this.pluginManager.execute(
this.currentConnection.getClass(),
methodName,
() -> method.invoke(currentConnection, args),
argsCopy);
return proxyIfReturnTypeIsJdbcInterface(method.getReturnType(), result);
} catch (Exception e) {
// Check if the captured exception must be wrapped by an unchecked exception.
Class<?>[] declaredExceptions = method.getExceptionTypes();
for (Class<?> declaredException : declaredExceptions) {
if (declaredException.isAssignableFrom(e.getClass())) {
throw e;
synchronized (currentConnection) {
congoamz marked this conversation as resolved.
Show resolved Hide resolved
try {
Object result = this.pluginManager.execute(
this.currentConnectionClass,
methodName,
() -> method.invoke(currentConnection, args),
args);
return proxyIfReturnTypeIsJdbcInterface(method.getReturnType(), result);
} catch (Exception e) {
// Check if the captured exception must be wrapped by an unchecked exception.
Class<?>[] declaredExceptions = method.getExceptionTypes();
for (Class<?> declaredException : declaredExceptions) {
if (declaredException.isAssignableFrom(e.getClass())) {
throw e;
}
}
throw new IllegalStateException(e.getMessage(), e);
}
throw new IllegalStateException(e.getMessage(), e);
}
}

Expand Down Expand Up @@ -301,10 +304,12 @@ private boolean isDirectExecute(String methodName) {
* Proxy class to intercept and deal with errors that may occur in any object bound to the current connection.
*/
class JdbcInterfaceProxy implements InvocationHandler {
Object invokeOn;
private final Object invokeOn;
private final Class<?> invokeOnClass;

JdbcInterfaceProxy(Object toInvokeOn) {
this.invokeOn = toInvokeOn;
this.invokeOnClass = toInvokeOn == null ? null : toInvokeOn.getClass();
}

/**
Expand All @@ -329,21 +334,19 @@ private Object executeMethodDirectly(String methodName, Object[] args) {
return null;
}

public synchronized Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
final String methodName = method.getName();
if (isDirectExecute(methodName)) {
return executeMethodDirectly(methodName, args);
}

Object[] argsCopy = args == null ? null : Arrays.copyOf(args, args.length);

synchronized(ConnectionProxy.this) {
synchronized(this.invokeOn) {
Object result =
ConnectionProxy.this.pluginManager.execute(
this.invokeOn.getClass(),
this.invokeOnClass,
methodName,
() -> method.invoke(this.invokeOn, args),
argsCopy);
args);
return proxyIfReturnTypeIsJdbcInterface(method.getReturnType(), result);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import com.mysql.cj.jdbc.JdbcConnection;
import com.mysql.cj.log.Log;

import java.util.Iterator;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.Executors;

Expand All @@ -51,6 +51,8 @@ public class DefaultMonitorService implements IMonitorService {

private final Log logger;
final IMonitorInitializer monitorInitializer;
private Set<String> cachedMonitorNodeKeys = null;
private IMonitor cachedMonitor = null;

public DefaultMonitorService(Log logger) {
this(
Expand Down Expand Up @@ -96,11 +98,21 @@ public MonitorConnectionContext startMonitoring(
throw new IllegalArgumentException(warning);
}

final IMonitor monitor = getMonitor(nodeKeys, hostInfo, propertySet);
IMonitor monitor;
if (this.cachedMonitor == null
|| this.cachedMonitorNodeKeys == null
|| !this.cachedMonitorNodeKeys.equals(nodeKeys)) {

monitor = getMonitor(nodeKeys, hostInfo, propertySet);
this.cachedMonitor = monitor;
this.cachedMonitorNodeKeys = Collections.unmodifiableSet(nodeKeys);
} else {
monitor = this.cachedMonitor;
}

final MonitorConnectionContext context = new MonitorConnectionContext(
monitor,
connectionToAbort,
nodeKeys,
logger,
failureDetectionTimeMillis,
failureDetectionIntervalMillis,
Expand All @@ -118,20 +130,8 @@ public void stopMonitoring(MonitorConnectionContext context) {
return;
}

context.invalidate();

// Any 1 node is enough to find the monitor containing the context
// All nodes will map to the same monitor
IMonitor monitor;
for (Iterator<String> it = context.getNodeKeys().iterator(); it.hasNext();) {
String nodeKey = it.next();
monitor = this.threadContainer.getMonitor(nodeKey);
if (monitor != null) {
monitor.stopMonitoring(context);
return;
}
}
logger.logTrace(Messages.getString("DefaultMonitorService.NoMonitorForContext"));
IMonitor monitor = context.getMonitor();
monitor.stopMonitoring(context);
}

@Override
Expand Down
Loading