Skip to content

Commit

Permalink
Added getCurrentSQLTime helper to API
Browse files Browse the repository at this point in the history
Redis no longer clogs up the main thread
  • Loading branch information
egg82 committed Nov 9, 2018
1 parent 1d7aa17 commit c3eb9f5
Show file tree
Hide file tree
Showing 11 changed files with 153 additions and 28 deletions.
13 changes: 9 additions & 4 deletions Bukkit/src/main/java/me/egg82/antivpn/AntiVPN.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@
import co.aikar.taskchain.BukkitTaskChainFactory;
import co.aikar.taskchain.TaskChain;
import co.aikar.taskchain.TaskChainFactory;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.*;
import java.util.logging.Level;
import me.egg82.antivpn.commands.AntiVPNCommand;
import me.egg82.antivpn.core.SQLFetchResult;
Expand Down Expand Up @@ -50,6 +49,8 @@
public class AntiVPN {
private final Logger logger = LoggerFactory.getLogger(getClass());

private final ExecutorService singlePool = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("AntiVPN-%d").build());

private TaskChainFactory taskFactory;
private PaperCommandManager commandManager;

Expand Down Expand Up @@ -128,7 +129,7 @@ private void loadServices() {
return;
}

new RedisSubscriber(cachedConfig.getRedisPool(), config.getNode("redis"));
singlePool.submit(() -> new RedisSubscriber(cachedConfig.getRedisPool(), config.getNode("redis")));
ServiceLocator.register(new RabbitMQReceiver(cachedConfig.getRabbitConnectionFactory()));
ServiceLocator.register(new SpigotUpdater(plugin, 58291));
}
Expand Down Expand Up @@ -288,6 +289,10 @@ private void unloadServices() {
try {
rabbitReceiver.close();
} catch (IOException | TimeoutException ignored) {}

if (!singlePool.isShutdown()) {
singlePool.shutdownNow();
}
}

private void log(Level level, String message) {
Expand Down
14 changes: 9 additions & 5 deletions Bungee/src/main/java/me/egg82/antivpn/AntiVPN.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@

import co.aikar.commands.BungeeCommandManager;
import co.aikar.commands.ConditionFailedException;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.*;
import java.util.logging.Level;
import me.egg82.antivpn.commands.AntiVPNCommand;
import me.egg82.antivpn.core.SQLFetchResult;
Expand Down Expand Up @@ -45,6 +43,8 @@
public class AntiVPN {
private final Logger logger = LoggerFactory.getLogger(getClass());

private final ExecutorService singlePool = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("AntiVPN-%d").build());

private BungeeCommandManager commandManager;

private List<BungeeEventSubscriber<?>> events = new ArrayList<>();
Expand Down Expand Up @@ -111,7 +111,7 @@ private void loadServices() {
return;
}

new RedisSubscriber(cachedConfig.getRedisPool(), config.getNode("redis"));
singlePool.submit(() -> new RedisSubscriber(cachedConfig.getRedisPool(), config.getNode("redis")));
ServiceLocator.register(new RabbitMQReceiver(cachedConfig.getRabbitConnectionFactory()));
ServiceLocator.register(new BungeeUpdater(plugin, 58716));
}
Expand Down Expand Up @@ -258,5 +258,9 @@ private void unloadServices() {
try {
rabbitReceiver.close();
} catch (IOException | TimeoutException ignored) {}

if (!singlePool.isShutdown()) {
singlePool.shutdownNow();
}
}
}
30 changes: 30 additions & 0 deletions Common/src/main/java/me/egg82/antivpn/VPNAPI.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import me.egg82.antivpn.enums.SQLType;
import me.egg82.antivpn.extended.CachedConfigValues;
import me.egg82.antivpn.extended.Configuration;
import me.egg82.antivpn.services.InternalAPI;
import me.egg82.antivpn.sql.MySQL;
import me.egg82.antivpn.sql.SQLite;
import me.egg82.antivpn.utils.RabbitMQUtil;
import me.egg82.antivpn.utils.ValidationUtil;
import ninja.egg82.service.ServiceLocator;
Expand All @@ -25,6 +29,32 @@ private VPNAPI() {}

public static VPNAPI getInstance() { return api; }

public long getCurrentSQLTime() {
CachedConfigValues cachedConfig;

try {
cachedConfig = ServiceLocator.get(CachedConfigValues.class);
} catch (IllegalAccessException | InstantiationException | ServiceNotFoundException ex) {
logger.error(ex.getMessage(), ex);
return -1L;
}

try {
if (cachedConfig.getSQLType() == SQLType.MySQL) {
return MySQL.getCurrentTime(cachedConfig.getSQL()).get();
} else if (cachedConfig.getSQLType() == SQLType.SQLite) {
return SQLite.getCurrentTime(cachedConfig.getSQL()).get();
}
} catch (ExecutionException ex) {
logger.error(ex.getMessage(), ex);
} catch (InterruptedException ex) {
logger.error(ex.getMessage(), ex);
Thread.currentThread().interrupt();
}

return -1L;
}

public ImmutableMap<String, Optional<Boolean>> testAllSources(String ip) {
if (ip == null) {
throw new IllegalArgumentException("ip cannot be null.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import me.egg82.antivpn.services.InternalAPI;
import me.egg82.antivpn.services.RabbitMQ;
import me.egg82.antivpn.utils.RabbitMQUtil;
import me.egg82.antivpn.utils.ValidationUtil;
import ninja.egg82.json.JSONUtil;
Expand Down Expand Up @@ -51,12 +53,18 @@ public void handleDelivery(String tag, Envelope envelope, AMQP.BasicProperties p
String ip = (String) obj.get("ip");
boolean value = (Boolean) obj.get("value");
long created = ((Number) obj.get("created")).longValue();
UUID id = UUID.fromString((String) obj.get("id"));

if (!ValidationUtil.isValidIp(ip)) {
logger.warn("non-valid IP sent through RabbitMQ cascade");
return;
}

if (id.equals(RabbitMQ.getServerID())) {
logger.info("ignoring message sent from this server");
return;
}

CachedConfigValues cachedConfig = ServiceLocator.get(CachedConfigValues.class);
Configuration config = ServiceLocator.get(Configuration.class);

Expand All @@ -77,12 +85,18 @@ public void handleDelivery(String tag, Envelope envelope, AMQP.BasicProperties p
String ip = (String) obj.get("ip");
double value = ((Number) obj.get("value")).doubleValue();
long created = ((Number) obj.get("created")).longValue();
UUID id = UUID.fromString((String) obj.get("id"));

if (!ValidationUtil.isValidIp(ip)) {
logger.warn("non-valid IP sent through RabbitMQ consensus");
return;
}

if (id.equals(RabbitMQ.getServerID())) {
logger.info("ignoring message sent from this server");
return;
}

CachedConfigValues cachedConfig = ServiceLocator.get(CachedConfigValues.class);
Configuration config = ServiceLocator.get(Configuration.class);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package me.egg82.antivpn.extended;

import java.util.UUID;
import me.egg82.antivpn.services.InternalAPI;
import me.egg82.antivpn.services.Redis;
import me.egg82.antivpn.utils.RedisUtil;
import me.egg82.antivpn.utils.ValidationUtil;
import ninja.egg82.json.JSONUtil;
Expand Down Expand Up @@ -41,12 +43,18 @@ public void onMessage(String channel, String message) {
String ip = (String) obj.get("ip");
boolean value = (Boolean) obj.get("value");
long created = ((Number) obj.get("created")).longValue();
UUID id = UUID.fromString((String) obj.get("id"));

if (!ValidationUtil.isValidIp(message)) {
logger.warn("non-valid IP sent through Redis pub/sub cascade");
return;
}

if (id.equals(Redis.getServerID())) {
logger.info("ignoring message sent from this server");
return;
}

CachedConfigValues cachedConfig = ServiceLocator.get(CachedConfigValues.class);
Configuration config = ServiceLocator.get(Configuration.class);

Expand All @@ -60,12 +68,18 @@ public void onMessage(String channel, String message) {
String ip = (String) obj.get("ip");
double value = ((Number) obj.get("value")).doubleValue();
long created = ((Number) obj.get("created")).longValue();
UUID id = UUID.fromString((String) obj.get("id"));

if (!ValidationUtil.isValidIp(message)) {
logger.warn("non-valid IP sent through Redis pub/sub consensus");
return;
}

if (id.equals(Redis.getServerID())) {
logger.info("ignoring message sent from this server");
return;
}

CachedConfigValues cachedConfig = ServiceLocator.get(CachedConfigValues.class);
Configuration config = ServiceLocator.get(Configuration.class);

Expand Down
10 changes: 8 additions & 2 deletions Common/src/main/java/me/egg82/antivpn/services/RabbitMQ.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import me.egg82.antivpn.core.ConsensusResult;
Expand All @@ -13,10 +14,13 @@
import org.slf4j.LoggerFactory;

public class RabbitMQ {
private RabbitMQ() {}

private static final Logger logger = LoggerFactory.getLogger(RabbitMQ.class);

private static final UUID serverId = UUID.randomUUID();
public static UUID getServerID() { return serverId; }

private RabbitMQ() {}

public static CompletableFuture<Boolean> broadcast(DataResult sqlResult, long sourceCacheTime, Connection connection) {
return CompletableFuture.supplyAsync(() -> {
try (Channel channel = RabbitMQUtil.getChannel(connection)) {
Expand All @@ -31,6 +35,7 @@ public static CompletableFuture<Boolean> broadcast(DataResult sqlResult, long so
obj.put("ip", sqlResult.getIp());
obj.put("value", sqlResult.getValue());
obj.put("created", sqlResult.getCreated());
obj.put("id", serverId.toString());

channel.exchangeDeclare("antivpn-result", "fanout");
channel.basicPublish("antivpn-result", "", null, obj.toJSONString().getBytes());
Expand Down Expand Up @@ -62,6 +67,7 @@ public static CompletableFuture<Boolean> broadcast(ConsensusResult sqlResult, lo
obj.put("ip", sqlResult.getIp());
obj.put("value", sqlResult.getValue());
obj.put("created", sqlResult.getCreated());
obj.put("id", serverId.toString());

channel.exchangeDeclare("antivpn-consensus", "fanout");
channel.basicPublish("antivpn-result", "", null, obj.toJSONString().getBytes());
Expand Down
12 changes: 10 additions & 2 deletions Common/src/main/java/me/egg82/antivpn/services/Redis.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package me.egg82.antivpn.services;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import me.egg82.antivpn.core.ConsensusResult;
import me.egg82.antivpn.core.DataResult;
Expand All @@ -14,10 +15,13 @@
import redis.clients.jedis.exceptions.JedisException;

public class Redis {
private Redis() {}

private static final Logger logger = LoggerFactory.getLogger(Redis.class);

private static final UUID serverId = UUID.randomUUID();
public static UUID getServerID() { return serverId; }

private Redis() {}

public static CompletableFuture<Boolean> updateFromQueue(SQLFetchResult sqlResult, long sourceCacheTime, JedisPool pool, ConfigurationNode redisConfigNode) {
return CompletableFuture.supplyAsync(() -> {
try (Jedis redis = RedisUtil.getRedis(pool, redisConfigNode)) {
Expand All @@ -39,6 +43,7 @@ public static CompletableFuture<Boolean> updateFromQueue(SQLFetchResult sqlResul
obj.put("ip", result.getIp());
obj.put("value", result.getValue());
obj.put("created", result.getCreated());
obj.put("id", serverId.toString());
redis.publish("antivpn-result", obj.toJSONString());
} else {
redis.publish("antivpn-delete", result.getIp());
Expand All @@ -59,6 +64,7 @@ public static CompletableFuture<Boolean> updateFromQueue(SQLFetchResult sqlResul
obj.put("ip", result.getIp());
obj.put("value", result.getValue());
obj.put("created", result.getCreated());
obj.put("id", serverId.toString());
redis.publish("antivpn-consensus", obj.toJSONString());
} else {
redis.publish("antivpn-delete", result.getIp());
Expand Down Expand Up @@ -99,6 +105,7 @@ public static CompletableFuture<Boolean> update(DataResult sqlResult, long sourc
obj.put("ip", sqlResult.getIp());
obj.put("value", sqlResult.getValue());
obj.put("created", sqlResult.getCreated());
obj.put("id", serverId.toString());
redis.publish("antivpn-result", obj.toJSONString());
} else {
redis.publish("antivpn-delete", sqlResult.getIp());
Expand Down Expand Up @@ -133,6 +140,7 @@ public static CompletableFuture<Boolean> update(ConsensusResult sqlResult, long
obj.put("ip", sqlResult.getIp());
obj.put("value", sqlResult.getValue());
obj.put("created", sqlResult.getCreated());
obj.put("id", serverId.toString());
redis.publish("antivpn-consensus", obj.toJSONString());
} else {
redis.publish("antivpn-delete", sqlResult.getIp());
Expand Down
16 changes: 16 additions & 0 deletions Common/src/main/java/me/egg82/antivpn/sql/MySQL.java
Original file line number Diff line number Diff line change
Expand Up @@ -338,4 +338,20 @@ public static CompletableFuture<ConsensusResult> update(SQL sql, ConfigurationNo
return result;
});
}

public static CompletableFuture<Long> getCurrentTime(SQL sql) {
return CompletableFuture.supplyAsync(() -> {
try {
SQLQueryResult query = sql.query("SELECT CURRENT_TIMESTAMP();");

for (Object[] o : query.getData()) {
return ((Timestamp) o[0]).getTime();
}
} catch (SQLException | ClassCastException ex) {
logger.error(ex.getMessage(), ex);
}

return -1L;
});
}
}
Loading

0 comments on commit c3eb9f5

Please sign in to comment.