diff --git a/logbat/src/main/java/info/logbat/domain/log/queue/LogQueue.java b/logbat/src/main/java/info/logbat/domain/log/queue/LogQueue.java index b74b655..19415d6 100644 --- a/logbat/src/main/java/info/logbat/domain/log/queue/LogQueue.java +++ b/logbat/src/main/java/info/logbat/domain/log/queue/LogQueue.java @@ -7,6 +7,7 @@ import java.util.List; import java.util.concurrent.locks.LockSupport; import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Primary; import org.springframework.stereotype.Component; /** @@ -20,6 +21,7 @@ * * @param 이 큐에 저장되는 요소의 타입 */ +@Primary @Component public class LogQueue implements EventProducer, EventConsumer { diff --git a/logbat/src/main/java/info/logbat/domain/log/queue/SingleLinkLogQueue.java b/logbat/src/main/java/info/logbat/domain/log/queue/SingleLinkLogQueue.java new file mode 100644 index 0000000..058be56 --- /dev/null +++ b/logbat/src/main/java/info/logbat/domain/log/queue/SingleLinkLogQueue.java @@ -0,0 +1,56 @@ +package info.logbat.domain.log.queue; + +import info.logbat.common.event.EventConsumer; +import info.logbat.common.event.EventProducer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.List; + +@Component +public class SingleLinkLogQueue implements EventProducer, EventConsumer { + + private final SingleLinkedList queue = new SingleLinkedList<>(); + private final long timeout; + private final int bulkSize; + + public SingleLinkLogQueue(@Value("${jdbc.async.timeout}") Long timeout, + @Value("${jdbc.async.bulk-size}") Integer bulkSize) { + this.timeout = timeout; + this.bulkSize = bulkSize; + } + + /* + * Consumer should be one thread + */ + @Override + public List consume() { + List result = new ArrayList<>(bulkSize); + + final long endTime = System.currentTimeMillis() + timeout; + + while (result.isEmpty()) { + T data = queue.poll(); + if (data != null) { + result.add(data); + } + if (result.size() >= bulkSize) { + break; + } + if (System.currentTimeMillis() >= endTime) { + break; + } + } + + return result; + } + + /* + * Producer should be one thread + */ + @Override + public void produce(List data) { + queue.addAll(data); + } +} diff --git a/logbat/src/main/java/info/logbat/domain/log/queue/SingleLinkedList.java b/logbat/src/main/java/info/logbat/domain/log/queue/SingleLinkedList.java new file mode 100644 index 0000000..92a6650 --- /dev/null +++ b/logbat/src/main/java/info/logbat/domain/log/queue/SingleLinkedList.java @@ -0,0 +1,66 @@ +package info.logbat.domain.log.queue; + +public class SingleLinkedList { + + Node first; + Node lastHolder; + + public SingleLinkedList() { + lastHolder = new Node<>(null, null); + first = lastHolder; + } + + private void linkLast(E e) { + Node newLastHolder = new Node<>(null, null); + + // 1. add next node + lastHolder.next = newLastHolder; + // 2. set item + lastHolder.item = e; + + lastHolder = newLastHolder; + } + + private E unlinkFirst() { + // 1. get first element + E element = first.item; + + // first == lastHolder + if (first.item == null) { + return null; + } + + // 2. get next node + // if element is not null, next node should not be null + Node next = first.next; + first.item = null; + first.next = null; // help GC + first = next; + return element; + } + + + public boolean isEmpty() { + return first.item == null; + } + + public E poll() { + return unlinkFirst(); + } + + public void addAll(Iterable c) { + for (E e : c) { + linkLast(e); + } + } + + private static class Node { + volatile E item; + volatile Node next; + + Node(E element, Node next) { + this.item = element; + this.next = next; + } + } +}