Skip to content

Commit

Permalink
Fis/#139 LinkedList 자료구조 개선 (#141)
Browse files Browse the repository at this point in the history
* feat: single linked list 추가

- 생산자 소비자가 병렬적으로 호출할 수 있는  single linked list 추가

* feat: SingleLinkLogQueue 추가

- SingleLinkedList를 활용한 LogQueue 추가
  • Loading branch information
LuizyHub authored Aug 29, 2024
1 parent e3857a3 commit 10cf89a
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.List;
import java.util.concurrent.locks.LockSupport;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;

/**
Expand All @@ -20,6 +21,7 @@
*
* @param <T> 이 큐에 저장되는 요소의 타입
*/
@Primary
@Component
public class LogQueue<T> implements EventProducer<T>, EventConsumer<T> {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package info.logbat.domain.log.queue;

import info.logbat.common.event.EventConsumer;
import info.logbat.common.event.EventProducer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;

@Component
public class SingleLinkLogQueue<T> implements EventProducer<T>, EventConsumer<T> {

private final SingleLinkedList<T> queue = new SingleLinkedList<>();
private final long timeout;
private final int bulkSize;

public SingleLinkLogQueue(@Value("${jdbc.async.timeout}") Long timeout,
@Value("${jdbc.async.bulk-size}") Integer bulkSize) {
this.timeout = timeout;
this.bulkSize = bulkSize;
}

/*
* Consumer should be one thread
*/
@Override
public List<T> consume() {
List<T> result = new ArrayList<>(bulkSize);

final long endTime = System.currentTimeMillis() + timeout;

while (result.isEmpty()) {
T data = queue.poll();
if (data != null) {
result.add(data);
}
if (result.size() >= bulkSize) {
break;
}
if (System.currentTimeMillis() >= endTime) {
break;
}
}

return result;
}

/*
* Producer should be one thread
*/
@Override
public void produce(List<T> data) {
queue.addAll(data);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package info.logbat.domain.log.queue;

public class SingleLinkedList<E> {

Node<E> first;
Node<E> lastHolder;

public SingleLinkedList() {
lastHolder = new Node<>(null, null);
first = lastHolder;
}

private void linkLast(E e) {
Node<E> newLastHolder = new Node<>(null, null);

// 1. add next node
lastHolder.next = newLastHolder;
// 2. set item
lastHolder.item = e;

lastHolder = newLastHolder;
}

private E unlinkFirst() {
// 1. get first element
E element = first.item;

// first == lastHolder
if (first.item == null) {
return null;
}

// 2. get next node
// if element is not null, next node should not be null
Node<E> next = first.next;
first.item = null;
first.next = null; // help GC
first = next;
return element;
}


public boolean isEmpty() {
return first.item == null;
}

public E poll() {
return unlinkFirst();
}

public void addAll(Iterable<? extends E> c) {
for (E e : c) {
linkLast(e);
}
}

private static class Node<E> {
volatile E item;
volatile Node<E> next;

Node(E element, Node<E> next) {
this.item = element;
this.next = next;
}
}
}

0 comments on commit 10cf89a

Please sign in to comment.