From c3eb9f54e8df56500d9c95d23e1866f146ac5ece Mon Sep 17 00:00:00 2001 From: egg82 Date: Thu, 8 Nov 2018 23:13:23 -0700 Subject: [PATCH] Added getCurrentSQLTime helper to API Redis no longer clogs up the main thread --- .../main/java/me/egg82/antivpn/AntiVPN.java | 13 ++++-- .../main/java/me/egg82/antivpn/AntiVPN.java | 14 +++--- .../main/java/me/egg82/antivpn/VPNAPI.java | 30 +++++++++++++ .../antivpn/extended/RabbitMQReceiver.java | 14 ++++++ .../antivpn/extended/RedisSubscriber.java | 14 ++++++ .../me/egg82/antivpn/services/RabbitMQ.java | 10 ++++- .../java/me/egg82/antivpn/services/Redis.java | 12 ++++- .../main/java/me/egg82/antivpn/sql/MySQL.java | 16 +++++++ .../java/me/egg82/antivpn/sql/SQLite.java | 44 ++++++++++++++----- README.md | 1 + .../main/java/me/egg82/antivpn/AntiVPN.java | 13 ++++-- 11 files changed, 153 insertions(+), 28 deletions(-) diff --git a/Bukkit/src/main/java/me/egg82/antivpn/AntiVPN.java b/Bukkit/src/main/java/me/egg82/antivpn/AntiVPN.java index 3e9870bc..06f9587d 100644 --- a/Bukkit/src/main/java/me/egg82/antivpn/AntiVPN.java +++ b/Bukkit/src/main/java/me/egg82/antivpn/AntiVPN.java @@ -5,13 +5,12 @@ import co.aikar.taskchain.BukkitTaskChainFactory; import co.aikar.taskchain.TaskChain; import co.aikar.taskchain.TaskChainFactory; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Optional; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.*; import java.util.logging.Level; import me.egg82.antivpn.commands.AntiVPNCommand; import me.egg82.antivpn.core.SQLFetchResult; @@ -50,6 +49,8 @@ public class AntiVPN { private final Logger logger = LoggerFactory.getLogger(getClass()); + private final ExecutorService singlePool = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("AntiVPN-%d").build()); + private TaskChainFactory taskFactory; private PaperCommandManager commandManager; @@ -128,7 +129,7 @@ private void loadServices() { return; } - new RedisSubscriber(cachedConfig.getRedisPool(), config.getNode("redis")); + singlePool.submit(() -> new RedisSubscriber(cachedConfig.getRedisPool(), config.getNode("redis"))); ServiceLocator.register(new RabbitMQReceiver(cachedConfig.getRabbitConnectionFactory())); ServiceLocator.register(new SpigotUpdater(plugin, 58291)); } @@ -288,6 +289,10 @@ private void unloadServices() { try { rabbitReceiver.close(); } catch (IOException | TimeoutException ignored) {} + + if (!singlePool.isShutdown()) { + singlePool.shutdownNow(); + } } private void log(Level level, String message) { diff --git a/Bungee/src/main/java/me/egg82/antivpn/AntiVPN.java b/Bungee/src/main/java/me/egg82/antivpn/AntiVPN.java index 3582f40d..f2db61d6 100644 --- a/Bungee/src/main/java/me/egg82/antivpn/AntiVPN.java +++ b/Bungee/src/main/java/me/egg82/antivpn/AntiVPN.java @@ -2,14 +2,12 @@ import co.aikar.commands.BungeeCommandManager; import co.aikar.commands.ConditionFailedException; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Optional; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ForkJoinPool; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.*; import java.util.logging.Level; import me.egg82.antivpn.commands.AntiVPNCommand; import me.egg82.antivpn.core.SQLFetchResult; @@ -45,6 +43,8 @@ public class AntiVPN { private final Logger logger = LoggerFactory.getLogger(getClass()); + private final ExecutorService singlePool = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("AntiVPN-%d").build()); + private BungeeCommandManager commandManager; private List> events = new ArrayList<>(); @@ -111,7 +111,7 @@ private void loadServices() { return; } - new RedisSubscriber(cachedConfig.getRedisPool(), config.getNode("redis")); + singlePool.submit(() -> new RedisSubscriber(cachedConfig.getRedisPool(), config.getNode("redis"))); ServiceLocator.register(new RabbitMQReceiver(cachedConfig.getRabbitConnectionFactory())); ServiceLocator.register(new BungeeUpdater(plugin, 58716)); } @@ -258,5 +258,9 @@ private void unloadServices() { try { rabbitReceiver.close(); } catch (IOException | TimeoutException ignored) {} + + if (!singlePool.isShutdown()) { + singlePool.shutdownNow(); + } } } diff --git a/Common/src/main/java/me/egg82/antivpn/VPNAPI.java b/Common/src/main/java/me/egg82/antivpn/VPNAPI.java index a727ae9b..0f9ff5ce 100644 --- a/Common/src/main/java/me/egg82/antivpn/VPNAPI.java +++ b/Common/src/main/java/me/egg82/antivpn/VPNAPI.java @@ -4,10 +4,14 @@ import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.Optional; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; +import me.egg82.antivpn.enums.SQLType; import me.egg82.antivpn.extended.CachedConfigValues; import me.egg82.antivpn.extended.Configuration; import me.egg82.antivpn.services.InternalAPI; +import me.egg82.antivpn.sql.MySQL; +import me.egg82.antivpn.sql.SQLite; import me.egg82.antivpn.utils.RabbitMQUtil; import me.egg82.antivpn.utils.ValidationUtil; import ninja.egg82.service.ServiceLocator; @@ -25,6 +29,32 @@ private VPNAPI() {} public static VPNAPI getInstance() { return api; } + public long getCurrentSQLTime() { + CachedConfigValues cachedConfig; + + try { + cachedConfig = ServiceLocator.get(CachedConfigValues.class); + } catch (IllegalAccessException | InstantiationException | ServiceNotFoundException ex) { + logger.error(ex.getMessage(), ex); + return -1L; + } + + try { + if (cachedConfig.getSQLType() == SQLType.MySQL) { + return MySQL.getCurrentTime(cachedConfig.getSQL()).get(); + } else if (cachedConfig.getSQLType() == SQLType.SQLite) { + return SQLite.getCurrentTime(cachedConfig.getSQL()).get(); + } + } catch (ExecutionException ex) { + logger.error(ex.getMessage(), ex); + } catch (InterruptedException ex) { + logger.error(ex.getMessage(), ex); + Thread.currentThread().interrupt(); + } + + return -1L; + } + public ImmutableMap> testAllSources(String ip) { if (ip == null) { throw new IllegalArgumentException("ip cannot be null."); diff --git a/Common/src/main/java/me/egg82/antivpn/extended/RabbitMQReceiver.java b/Common/src/main/java/me/egg82/antivpn/extended/RabbitMQReceiver.java index 6e3eda4a..5bcee345 100644 --- a/Common/src/main/java/me/egg82/antivpn/extended/RabbitMQReceiver.java +++ b/Common/src/main/java/me/egg82/antivpn/extended/RabbitMQReceiver.java @@ -2,8 +2,10 @@ import com.rabbitmq.client.*; import java.io.IOException; +import java.util.UUID; import java.util.concurrent.TimeoutException; import me.egg82.antivpn.services.InternalAPI; +import me.egg82.antivpn.services.RabbitMQ; import me.egg82.antivpn.utils.RabbitMQUtil; import me.egg82.antivpn.utils.ValidationUtil; import ninja.egg82.json.JSONUtil; @@ -51,12 +53,18 @@ public void handleDelivery(String tag, Envelope envelope, AMQP.BasicProperties p String ip = (String) obj.get("ip"); boolean value = (Boolean) obj.get("value"); long created = ((Number) obj.get("created")).longValue(); + UUID id = UUID.fromString((String) obj.get("id")); if (!ValidationUtil.isValidIp(ip)) { logger.warn("non-valid IP sent through RabbitMQ cascade"); return; } + if (id.equals(RabbitMQ.getServerID())) { + logger.info("ignoring message sent from this server"); + return; + } + CachedConfigValues cachedConfig = ServiceLocator.get(CachedConfigValues.class); Configuration config = ServiceLocator.get(Configuration.class); @@ -77,12 +85,18 @@ public void handleDelivery(String tag, Envelope envelope, AMQP.BasicProperties p String ip = (String) obj.get("ip"); double value = ((Number) obj.get("value")).doubleValue(); long created = ((Number) obj.get("created")).longValue(); + UUID id = UUID.fromString((String) obj.get("id")); if (!ValidationUtil.isValidIp(ip)) { logger.warn("non-valid IP sent through RabbitMQ consensus"); return; } + if (id.equals(RabbitMQ.getServerID())) { + logger.info("ignoring message sent from this server"); + return; + } + CachedConfigValues cachedConfig = ServiceLocator.get(CachedConfigValues.class); Configuration config = ServiceLocator.get(Configuration.class); diff --git a/Common/src/main/java/me/egg82/antivpn/extended/RedisSubscriber.java b/Common/src/main/java/me/egg82/antivpn/extended/RedisSubscriber.java index f9cd622c..475b2070 100644 --- a/Common/src/main/java/me/egg82/antivpn/extended/RedisSubscriber.java +++ b/Common/src/main/java/me/egg82/antivpn/extended/RedisSubscriber.java @@ -1,6 +1,8 @@ package me.egg82.antivpn.extended; +import java.util.UUID; import me.egg82.antivpn.services.InternalAPI; +import me.egg82.antivpn.services.Redis; import me.egg82.antivpn.utils.RedisUtil; import me.egg82.antivpn.utils.ValidationUtil; import ninja.egg82.json.JSONUtil; @@ -41,12 +43,18 @@ public void onMessage(String channel, String message) { String ip = (String) obj.get("ip"); boolean value = (Boolean) obj.get("value"); long created = ((Number) obj.get("created")).longValue(); + UUID id = UUID.fromString((String) obj.get("id")); if (!ValidationUtil.isValidIp(message)) { logger.warn("non-valid IP sent through Redis pub/sub cascade"); return; } + if (id.equals(Redis.getServerID())) { + logger.info("ignoring message sent from this server"); + return; + } + CachedConfigValues cachedConfig = ServiceLocator.get(CachedConfigValues.class); Configuration config = ServiceLocator.get(Configuration.class); @@ -60,12 +68,18 @@ public void onMessage(String channel, String message) { String ip = (String) obj.get("ip"); double value = ((Number) obj.get("value")).doubleValue(); long created = ((Number) obj.get("created")).longValue(); + UUID id = UUID.fromString((String) obj.get("id")); if (!ValidationUtil.isValidIp(message)) { logger.warn("non-valid IP sent through Redis pub/sub consensus"); return; } + if (id.equals(Redis.getServerID())) { + logger.info("ignoring message sent from this server"); + return; + } + CachedConfigValues cachedConfig = ServiceLocator.get(CachedConfigValues.class); Configuration config = ServiceLocator.get(Configuration.class); diff --git a/Common/src/main/java/me/egg82/antivpn/services/RabbitMQ.java b/Common/src/main/java/me/egg82/antivpn/services/RabbitMQ.java index a69c8733..75ba5498 100644 --- a/Common/src/main/java/me/egg82/antivpn/services/RabbitMQ.java +++ b/Common/src/main/java/me/egg82/antivpn/services/RabbitMQ.java @@ -3,6 +3,7 @@ import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeoutException; import me.egg82.antivpn.core.ConsensusResult; @@ -13,10 +14,13 @@ import org.slf4j.LoggerFactory; public class RabbitMQ { - private RabbitMQ() {} - private static final Logger logger = LoggerFactory.getLogger(RabbitMQ.class); + private static final UUID serverId = UUID.randomUUID(); + public static UUID getServerID() { return serverId; } + + private RabbitMQ() {} + public static CompletableFuture broadcast(DataResult sqlResult, long sourceCacheTime, Connection connection) { return CompletableFuture.supplyAsync(() -> { try (Channel channel = RabbitMQUtil.getChannel(connection)) { @@ -31,6 +35,7 @@ public static CompletableFuture broadcast(DataResult sqlResult, long so obj.put("ip", sqlResult.getIp()); obj.put("value", sqlResult.getValue()); obj.put("created", sqlResult.getCreated()); + obj.put("id", serverId.toString()); channel.exchangeDeclare("antivpn-result", "fanout"); channel.basicPublish("antivpn-result", "", null, obj.toJSONString().getBytes()); @@ -62,6 +67,7 @@ public static CompletableFuture broadcast(ConsensusResult sqlResult, lo obj.put("ip", sqlResult.getIp()); obj.put("value", sqlResult.getValue()); obj.put("created", sqlResult.getCreated()); + obj.put("id", serverId.toString()); channel.exchangeDeclare("antivpn-consensus", "fanout"); channel.basicPublish("antivpn-result", "", null, obj.toJSONString().getBytes()); diff --git a/Common/src/main/java/me/egg82/antivpn/services/Redis.java b/Common/src/main/java/me/egg82/antivpn/services/Redis.java index 1dbb1b24..77c1512b 100644 --- a/Common/src/main/java/me/egg82/antivpn/services/Redis.java +++ b/Common/src/main/java/me/egg82/antivpn/services/Redis.java @@ -1,5 +1,6 @@ package me.egg82.antivpn.services; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import me.egg82.antivpn.core.ConsensusResult; import me.egg82.antivpn.core.DataResult; @@ -14,10 +15,13 @@ import redis.clients.jedis.exceptions.JedisException; public class Redis { - private Redis() {} - private static final Logger logger = LoggerFactory.getLogger(Redis.class); + private static final UUID serverId = UUID.randomUUID(); + public static UUID getServerID() { return serverId; } + + private Redis() {} + public static CompletableFuture updateFromQueue(SQLFetchResult sqlResult, long sourceCacheTime, JedisPool pool, ConfigurationNode redisConfigNode) { return CompletableFuture.supplyAsync(() -> { try (Jedis redis = RedisUtil.getRedis(pool, redisConfigNode)) { @@ -39,6 +43,7 @@ public static CompletableFuture updateFromQueue(SQLFetchResult sqlResul obj.put("ip", result.getIp()); obj.put("value", result.getValue()); obj.put("created", result.getCreated()); + obj.put("id", serverId.toString()); redis.publish("antivpn-result", obj.toJSONString()); } else { redis.publish("antivpn-delete", result.getIp()); @@ -59,6 +64,7 @@ public static CompletableFuture updateFromQueue(SQLFetchResult sqlResul obj.put("ip", result.getIp()); obj.put("value", result.getValue()); obj.put("created", result.getCreated()); + obj.put("id", serverId.toString()); redis.publish("antivpn-consensus", obj.toJSONString()); } else { redis.publish("antivpn-delete", result.getIp()); @@ -99,6 +105,7 @@ public static CompletableFuture update(DataResult sqlResult, long sourc obj.put("ip", sqlResult.getIp()); obj.put("value", sqlResult.getValue()); obj.put("created", sqlResult.getCreated()); + obj.put("id", serverId.toString()); redis.publish("antivpn-result", obj.toJSONString()); } else { redis.publish("antivpn-delete", sqlResult.getIp()); @@ -133,6 +140,7 @@ public static CompletableFuture update(ConsensusResult sqlResult, long obj.put("ip", sqlResult.getIp()); obj.put("value", sqlResult.getValue()); obj.put("created", sqlResult.getCreated()); + obj.put("id", serverId.toString()); redis.publish("antivpn-consensus", obj.toJSONString()); } else { redis.publish("antivpn-delete", sqlResult.getIp()); diff --git a/Common/src/main/java/me/egg82/antivpn/sql/MySQL.java b/Common/src/main/java/me/egg82/antivpn/sql/MySQL.java index f5ad2d3a..19f0e2c3 100644 --- a/Common/src/main/java/me/egg82/antivpn/sql/MySQL.java +++ b/Common/src/main/java/me/egg82/antivpn/sql/MySQL.java @@ -338,4 +338,20 @@ public static CompletableFuture update(SQL sql, ConfigurationNo return result; }); } + + public static CompletableFuture getCurrentTime(SQL sql) { + return CompletableFuture.supplyAsync(() -> { + try { + SQLQueryResult query = sql.query("SELECT CURRENT_TIMESTAMP();"); + + for (Object[] o : query.getData()) { + return ((Timestamp) o[0]).getTime(); + } + } catch (SQLException | ClassCastException ex) { + logger.error(ex.getMessage(), ex); + } + + return -1L; + }); + } } diff --git a/Common/src/main/java/me/egg82/antivpn/sql/SQLite.java b/Common/src/main/java/me/egg82/antivpn/sql/SQLite.java index 0e1931d5..f9929135 100644 --- a/Common/src/main/java/me/egg82/antivpn/sql/SQLite.java +++ b/Common/src/main/java/me/egg82/antivpn/sql/SQLite.java @@ -5,7 +5,6 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import me.egg82.antivpn.core.ConsensusResult; import me.egg82.antivpn.core.DataResult; import me.egg82.antivpn.core.SQLFetchResult; @@ -82,7 +81,7 @@ public static CompletableFuture loadInfo(SQL sql, ConfigurationN // Grab all data and convert to more useful object types String ip = (String) o[0]; boolean value = (((Number) o[1]).intValue() == 0) ? false : true; - long created = Timestamp.valueOf((String) o[2]).getTime(); + long created = getTime(o[2]).getTime(); data.add(new DataResult(ip, value, created)); } @@ -105,7 +104,7 @@ public static CompletableFuture loadInfo(SQL sql, ConfigurationN // Grab all data and convert to more useful object types String ip = (String) o[0]; double value = ((Number) o[1]).doubleValue(); - long created = Timestamp.valueOf((String) o[2]).getTime(); + long created = getTime(o[2]).getTime(); consensus.add(new ConsensusResult(ip, value, created)); } @@ -143,7 +142,7 @@ public static CompletableFuture getResult(String ip, SQL sql, Config for (Object[] o : query.getData()) { // Grab all data and convert to more useful object types boolean value = ((Number) o[0]).intValue() != 0; - long created = Timestamp.valueOf((String) o[1]).getTime(); + long created = getTime(o[1]).getTime(); result = new DataResult(ip, value, created); } @@ -168,7 +167,7 @@ public static CompletableFuture getConsensus(String ip, SQL sql for (Object[] o : query.getData()) { // Grab all data and convert to more useful object types double value = ((Number) o[0]).doubleValue(); - long created = Timestamp.valueOf((String) o[1]).getTime(); + long created = getTime(o[1]).getTime(); result = new ConsensusResult(ip, value, created); } @@ -190,11 +189,10 @@ public static CompletableFuture update(SQL sql, ConfigurationNode st sql.execute("INSERT OR REPLACE INTO `" + tablePrefix.substring(0, tablePrefix.length() - 1) + "` (`ip`, `value`) VALUES (?, ?);", ip, (value) ? 1 : 0); SQLQueryResult query = sql.query("SELECT `created` FROM `" + tablePrefix.substring(0, tablePrefix.length() - 1) + "` WHERE `ip`=?;", ip); - Timestamp sqlCreated = null; - Timestamp updated = new Timestamp(System.currentTimeMillis()); + Timestamp sqlCreated; for (Object[] o : query.getData()) { - sqlCreated = Timestamp.valueOf((String) o[0]); + sqlCreated = getTime(o[0]); result = new DataResult(ip, value, sqlCreated.getTime()); } } catch (SQLException | ClassCastException ex) { @@ -215,11 +213,10 @@ public static CompletableFuture update(SQL sql, ConfigurationNo sql.execute("INSERT OR REPLACE INTO `" + tablePrefix + "consensus` (`ip`, `value`) VALUES (?, ?);", ip, value); SQLQueryResult query = sql.query("SELECT `created` FROM `" + tablePrefix + "consensus` WHERE `ip`=?;", ip); - Timestamp sqlCreated = null; - Timestamp updated = new Timestamp(System.currentTimeMillis()); + Timestamp sqlCreated; for (Object[] o : query.getData()) { - sqlCreated = Timestamp.valueOf((String) o[0]); + sqlCreated = getTime(o[0]); result = new ConsensusResult(ip, value, sqlCreated.getTime()); } } catch (SQLException | ClassCastException ex) { @@ -266,4 +263,29 @@ public static CompletableFuture delete(String ip, SQL sql, ConfigurationNo } }); } + + public static CompletableFuture getCurrentTime(SQL sql) { + return CompletableFuture.supplyAsync(() -> { + try { + SQLQueryResult query = sql.query("SELECT CURRENT_TIMESTAMP;"); + + for (Object[] o : query.getData()) { + return getTime(o[0]).getTime(); + } + } catch (SQLException | ClassCastException ex) { + logger.error(ex.getMessage(), ex); + } + + return -1L; + }); + } + + private static Timestamp getTime(Object o) { + if (o instanceof String) { + return Timestamp.valueOf((String) o); + } else if (o instanceof Number) { + return new Timestamp(((Number) o).longValue()); + } + return null; + } } diff --git a/README.md b/README.md index ba7e5537..b093232d 100644 --- a/README.md +++ b/README.md @@ -47,6 +47,7 @@ https://www.myget.org/feed/egg82-java/package/maven/ninja.egg82.plugins/AntiVPN ```Java VPNAPI.getInstance(); ... +long getCurrentSQLTime(); boolean cascade(String ip, [boolean expensive]); ImmutableMap> testAllSources(String ip); // WARNING: Does not cache results double consensus(String ip, [boolean expensive]); diff --git a/Velocity/src/main/java/me/egg82/antivpn/AntiVPN.java b/Velocity/src/main/java/me/egg82/antivpn/AntiVPN.java index 5ccfb0b0..2b76a700 100644 --- a/Velocity/src/main/java/me/egg82/antivpn/AntiVPN.java +++ b/Velocity/src/main/java/me/egg82/antivpn/AntiVPN.java @@ -2,6 +2,7 @@ import co.aikar.commands.ConditionFailedException; import co.aikar.commands.VelocityCommandManager; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.velocitypowered.api.event.PostOrder; import com.velocitypowered.api.event.connection.PostLoginEvent; import com.velocitypowered.api.plugin.PluginDescription; @@ -9,9 +10,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ForkJoinPool; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.*; import me.egg82.antivpn.commands.AntiVPNCommand; import me.egg82.antivpn.core.SQLFetchResult; import me.egg82.antivpn.enums.SQLType; @@ -38,6 +37,8 @@ public class AntiVPN { private final Logger logger = LoggerFactory.getLogger(getClass()); + private final ExecutorService singlePool = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("AntiVPN-%d").build()); + private VelocityCommandManager commandManager; private List> events = new ArrayList<>(); @@ -104,7 +105,7 @@ private void loadServices() { return; } - new RedisSubscriber(cachedConfig.getRedisPool(), config.getNode("redis")); + singlePool.submit(() -> new RedisSubscriber(cachedConfig.getRedisPool(), config.getNode("redis"))); ServiceLocator.register(new RabbitMQReceiver(cachedConfig.getRabbitConnectionFactory())); } @@ -231,5 +232,9 @@ private void unloadServices() { try { rabbitReceiver.close(); } catch (IOException | TimeoutException ignored) {} + + if (!singlePool.isShutdown()) { + singlePool.shutdownNow(); + } } }