Skip to content

Commit

Permalink
feat: send & receive jedis messages as async
Browse files Browse the repository at this point in the history
  • Loading branch information
Rishon committed Jun 12, 2024
1 parent bce9c43 commit 8093ead
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 23 deletions.
File renamed without changes.
58 changes: 35 additions & 23 deletions src/main/java/dev/rishon/sync/jedis/JedisManager.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import dev.rishon.sync.jedis.packet.IPacket
import dev.rishon.sync.utils.LoggerUtil
import redis.clients.jedis.JedisPool
import redis.clients.jedis.JedisPubSub
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Executors


class JedisManager(redisData: RedisData) {
Expand All @@ -17,6 +20,7 @@ class JedisManager(redisData: RedisData) {
private val jedisPool: JedisPool = redisData.jedisPool!!
private val gson: Gson = GsonBuilder().registerTypeAdapter(IPacket::class.java, IPacketDeserializer()).create()
private var jedisPubSub: JedisPubSub? = null
private val executorService = Executors.newCachedThreadPool()

init {
instance = this
Expand All @@ -26,14 +30,18 @@ class JedisManager(redisData: RedisData) {
this.jedisPubSub = object : JedisPubSub() {
override fun onMessage(channel: String, message: String) {
if (channel != mainChannel) return
try {
val jsonObject = gson.fromJson(message, JsonObject::class.java)
val packetClassName = jsonObject.remove("sync-packet").asString
val packetClass = Class.forName(packetClassName)
val packet: IPacket = gson.fromJson(jsonObject, packetClass) as IPacket
packet.onReceive()
} catch (ignored: Exception) {
}

CompletableFuture.runAsync({
try {
val jsonObject = gson.fromJson(message, JsonObject::class.java)
val packetClassName = jsonObject.remove("sync-packet").asString
val packetClass = Class.forName(packetClassName)
val packet: IPacket = gson.fromJson(jsonObject, packetClass) as IPacket
packet.onReceive()
} catch (exception: Exception) {
LoggerUtil.error("Failed to receive packet: $message")
}
}, executorService)
}
}
jedisSubscriber.subscribe(this.jedisPubSub, this.mainChannel)
Expand All @@ -46,24 +54,28 @@ class JedisManager(redisData: RedisData) {
}

fun sendPacket(packet: IPacket) {
val fieldMap: MutableMap<String, Any> = HashMap()
for (field in packet.javaClass.getDeclaredFields()) {
field.setAccessible(true)
fieldMap[field.name] = field.get(packet)
}
CompletableFuture.supplyAsync({
val fieldMap = ConcurrentHashMap<String, Any>()

for (field in packet.javaClass.getDeclaredFields()) {
field.setAccessible(true)
fieldMap[field.name] = field.get(packet)
}

try {
val jsonObject = gson.fromJson(gson.toJson(fieldMap), JsonObject::class.java)
jsonObject.addProperty("sync-packet", packet.javaClass.getName())
jsonObject.addProperty("instance", SyncAPI.getAPI().getInstanceID())
fieldMap
}, executorService).thenAcceptAsync({ fieldMap ->
try {
val jsonObject = gson.fromJson(gson.toJson(fieldMap), JsonObject::class.java)
jsonObject.addProperty("sync-packet", packet::class.java.name)
jsonObject.addProperty("instance", SyncAPI.getAPI().getInstanceID())

this.jedisPool.resource.use { jedis ->
jedis.publish(this.mainChannel, jsonObject.toString())
jedis.close()
jedisPool.resource.use { jedis ->
jedis.publish(mainChannel, jsonObject.toString())
}
} catch (exception: Exception) {
LoggerUtil.error("Failed to send packet: ${packet::class.java.simpleName}")
}
} catch (e: Exception) {
LoggerUtil.error("Failed to send packet: ${packet.javaClass.simpleName}")
}
}, executorService)
}

companion object {
Expand Down

0 comments on commit 8093ead

Please sign in to comment.