From eebd78b6ca59fc912391e0e9839876883d000516 Mon Sep 17 00:00:00 2001 From: Kim Minju <111269144+miiiinju1@users.noreply.github.com> Date: Thu, 29 Aug 2024 11:19:16 +0900 Subject: [PATCH] =?UTF-8?q?Feat/#137=20MultiQueue=EB=A5=BC=20=EC=9D=B4?= =?UTF-8?q?=EC=9A=A9=ED=95=98=EC=97=AC=20Controller=EC=97=90=EC=84=9C=20?= =?UTF-8?q?=EC=98=A4=EB=8A=94=20=EB=A1=9C=EA=B7=B8=20=EC=9A=94=EC=B2=AD?= =?UTF-8?q?=EC=9D=98=20=EB=B6=80=ED=95=98=20=EA=B0=90=EC=86=8C=20(#140)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * chore: 임시로 ReentrantLogQueue에서 Primary 제거 * feat: MultiProcessor 추가 - 임시로 여러 큐를 가지는 MultiProcessor를 추가함 * chore: default queue개수 조정 * feat: flatterQueue 싱글 스레드풀 추가 및 queue count default를 3 설정 * chore: default queue사이즈 설정 * refactor: AsyncMultiProcessor 리팩토링 - 내부 로직 일원화 및 린팅을 적용했습니다. - MultiProcessor -> AsyncMultiProcessor 클래스 명 변경을 진행했습니다. * chore: 불필요한 메서드 구현 제거 - EventConsume 구현을 제거했습니다. * chore: 의존성 정리, 사용하지 않는 Component 제거 --------- Co-authored-by: KyungMin Lee --- .../domain/log/application/LogService.java | 6 +- .../domain/log/queue/ReentrantLogQueue.java | 3 +- .../log/repository/AsyncLogProcessor.java | 2 - .../log/repository/AsyncLogRepository.java | 8 +- .../log/repository/AsyncMultiProcessor.java | 80 +++++++++++++++++++ 5 files changed, 88 insertions(+), 11 deletions(-) create mode 100644 logbat/src/main/java/info/logbat/domain/log/repository/AsyncMultiProcessor.java diff --git a/logbat/src/main/java/info/logbat/domain/log/application/LogService.java b/logbat/src/main/java/info/logbat/domain/log/application/LogService.java index 684b7a8..7438fc6 100644 --- a/logbat/src/main/java/info/logbat/domain/log/application/LogService.java +++ b/logbat/src/main/java/info/logbat/domain/log/application/LogService.java @@ -1,7 +1,7 @@ package info.logbat.domain.log.application; +import info.logbat.common.event.EventProducer; import info.logbat.domain.log.domain.Log; -import info.logbat.domain.log.flatter.LogRequestFlatter; import info.logbat.domain.log.presentation.payload.request.CreateLogRequest; import info.logbat.domain.project.application.AppService; import java.util.ArrayList; @@ -15,7 +15,7 @@ @RequiredArgsConstructor public class LogService { - private final LogRequestFlatter logRequestFlatter; + private final EventProducer producer; private final AppService appService; public void saveLogs(String appKey, List requests) { @@ -28,7 +28,7 @@ public void saveLogs(String appKey, List requests) { log.error("Failed to convert request to entity: {}", request, e); } }); - logRequestFlatter.flatten(logs); + producer.produce(logs); } } diff --git a/logbat/src/main/java/info/logbat/domain/log/queue/ReentrantLogQueue.java b/logbat/src/main/java/info/logbat/domain/log/queue/ReentrantLogQueue.java index 64c69e2..d63a7bf 100644 --- a/logbat/src/main/java/info/logbat/domain/log/queue/ReentrantLogQueue.java +++ b/logbat/src/main/java/info/logbat/domain/log/queue/ReentrantLogQueue.java @@ -9,10 +9,9 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Primary; import org.springframework.stereotype.Component; -@Primary +//@Primary @Component public class ReentrantLogQueue implements EventProducer, EventConsumer { diff --git a/logbat/src/main/java/info/logbat/domain/log/repository/AsyncLogProcessor.java b/logbat/src/main/java/info/logbat/domain/log/repository/AsyncLogProcessor.java index 9b7e330..9df8714 100644 --- a/logbat/src/main/java/info/logbat/domain/log/repository/AsyncLogProcessor.java +++ b/logbat/src/main/java/info/logbat/domain/log/repository/AsyncLogProcessor.java @@ -11,13 +11,11 @@ import javax.sql.DataSource; import lombok.extern.slf4j.Slf4j; import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.stereotype.Component; /** * 비동기적으로 로그를 처리하는 클래스입니다. 이 클래스는 로그를 저장하는 비동기 작업을 수행하며, 이를 위해 별도의 스레드 풀을 사용합니다. */ @Slf4j -@Component public class AsyncLogProcessor { // 로그 저장 작업을 수행하는 스레드 풀 diff --git a/logbat/src/main/java/info/logbat/domain/log/repository/AsyncLogRepository.java b/logbat/src/main/java/info/logbat/domain/log/repository/AsyncLogRepository.java index 9e4c5ee..44a8967 100644 --- a/logbat/src/main/java/info/logbat/domain/log/repository/AsyncLogRepository.java +++ b/logbat/src/main/java/info/logbat/domain/log/repository/AsyncLogRepository.java @@ -20,25 +20,25 @@ public class AsyncLogRepository implements LogRepository { private final JdbcTemplate jdbcTemplate; - private final AsyncLogProcessor asyncLogProcessor; + private final AsyncMultiProcessor asyncMultiProcessor; private static final Long DEFAULT_RETURNS = 0L; @PostConstruct public void init() { log.info("AsyncLogRepository is initialized."); - asyncLogProcessor.init(this::saveLogsToDatabase); + asyncMultiProcessor.init(this::saveLogsToDatabase); } + @Deprecated @Override public long save(Log log) { - asyncLogProcessor.submitLog(log); return DEFAULT_RETURNS; } + @Deprecated @Override public List saveAll(List logs) { - asyncLogProcessor.submitLogs(logs); return logs; } diff --git a/logbat/src/main/java/info/logbat/domain/log/repository/AsyncMultiProcessor.java b/logbat/src/main/java/info/logbat/domain/log/repository/AsyncMultiProcessor.java new file mode 100644 index 0000000..6ab3adc --- /dev/null +++ b/logbat/src/main/java/info/logbat/domain/log/repository/AsyncMultiProcessor.java @@ -0,0 +1,80 @@ +package info.logbat.domain.log.repository; + +import com.zaxxer.hikari.HikariDataSource; +import info.logbat.common.event.EventProducer; +import info.logbat.domain.log.queue.ReentrantLogQueue; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Consumer; +import javax.sql.DataSource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Primary; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Component; + +@Slf4j +@Primary +@Component +public class AsyncMultiProcessor implements EventProducer { + + private final List> queues; + private final List flatterExecutors; + private Consumer> saveFunction; + private final int queueCount; + + public AsyncMultiProcessor(@Value("${queue.count:3}") int queueCount, + @Value("${jdbc.async.timeout:5000}") Long timeout, + @Value("${jdbc.async.bulk-size:3000}") Integer bulkSize, JdbcTemplate jdbcTemplate) { + this.queueCount = queueCount; + this.queues = new ArrayList<>(queueCount); + this.flatterExecutors = new ArrayList<>(queueCount); + int poolSize = getPoolSize(jdbcTemplate); + setup(queueCount, timeout, bulkSize, poolSize); + } + + public void init(Consumer> saveFunction) { + this.saveFunction = saveFunction; + } + + @Override + public void produce(List data) { + if (data.isEmpty()) { + return; + } + int selectedQueue = ThreadLocalRandom.current().nextInt(queueCount); + flatterExecutors.get(selectedQueue).execute(() -> queues.get(selectedQueue).produce(data)); + } + + private void setup(int queueCount, Long timeout, Integer bulkSize, int poolSize) { + ExecutorService followerExecutor = Executors.newFixedThreadPool(poolSize); + ReentrantLogQueue queue = new ReentrantLogQueue<>(timeout, bulkSize); + + for (int i = 0; i < queueCount; i++) { + queues.add(queue); + flatterExecutors.add(Executors.newSingleThreadExecutor()); + } + CompletableFuture.runAsync(() -> leaderTask(queue, followerExecutor)); + } + + private void leaderTask(ReentrantLogQueue queue, ExecutorService follower) { + while (!Thread.currentThread().isInterrupted()) { + List element = queue.consume(); + follower.execute(() -> saveFunction.accept(element)); + } + } + + private static int getPoolSize(JdbcTemplate jdbcTemplate) { + DataSource dataSource = jdbcTemplate.getDataSource(); + if (!(dataSource instanceof HikariDataSource)) { + throw new IllegalArgumentException("DataSource is null"); + } + int poolSize = ((HikariDataSource) dataSource).getMaximumPoolSize(); + log.debug("Creating AsyncLogProcessor with pool size: {}", poolSize); + return poolSize * 5 / 10; + } +}