Skip to content

✅ 싱글 스레드 환경인데 정합성이 깨진다고?

KyungMin Lee edited this page Sep 1, 2024 · 5 revisions

환경 설명

🧪 MVP2: 서버 최대 부하 찾기 실험 이후 결과 개선 아이디어중 하나로 싱글 스레드 기반의 Producer, Consumer 로직 기반 Lock-Free Queue를 적용하고자 했습니다. 이에 LinkedList를 활용한 Queue를 구현했습니다.

image

간단하게 살펴보자면

  • Flatter(Producer)는 데이터를 Queue에 주입하고, LockSupport.unpark()를 활용해 Thread Block을 해제합니다.

  • Consumer는 Queue에 있는 데이터를 consume 하는데, Queue에 Bulk Size 만큼의 데이터가 존재하지 않으면 LockSupport.park(timeout)을 활용해 Thread Block을 진행합니다. 이후 Queue에 데이터가 쌓이거나 timeout이 지나가면 데이터를 consume합니다.

  • 구현 코드

public class LogQueue<T> implements EventProducer<T>, EventConsumer<T> {

    // T 타입의 요소를 저장하는 list
    private final LinkedList<T> queue;
    // 소비자 스레드가 대기하는 시간 (나노초 단위)
    private final long timeoutNanos;
    // 일괄 처리 크기
    private final int bulkSize;
    // 소비자 스레드, volatile로 선언하여 가시성 보장
    private volatile Thread consumerThread;

    @Override
    public List<T> consume() {
        List<T> result = new ArrayList<>(bulkSize);

        if (queue.size() >= bulkSize) {
            for (int i = 0; i < bulkSize; i++) {
                result.add(queue.poll());
            }
            return result;
        }

        consumerThread = Thread.currentThread();

        do {
            LockSupport.parkNanos(timeoutNanos);
        } while (queue.isEmpty());

        for (int i = 0; i < bulkSize; i++) {
            result.add(queue.poll());
            if (queue.isEmpty()) {
                break;
            }
        }

        consumerThread = null;
        return result;
    }

    @Override
    public void produce(List<T> data) {
        queue.addAll(data);
        if (consumerThread != null && queue.size() >= bulkSize) {
            LockSupport.unpark(consumerThread);
        }
    }
}

🚨 정합성이 깨져요!!

부하 테스트를 진행하며 Heap Memory Size에 대한 모니터링을 진행하던 중 순간적으로 Memory Size가 50% 이상이 감소하는 현상을 발생했습니다.

원인을 파악하는 과정에서 DB를 조회한 결과 500,000개의 데이터를 추가하는 부하 테스트를 진행했는데 495,000개의 데이터만이 저장됨을 확인할 수 있었습니다.

원인은?

로그를 확인하던 중 아래와 같은 로그를 확인할 수 있었습니다.

Caused by: java.lang.IndexOutOfBoundsException: Index: 893, Size: 421
java.base/java.util.LinkedList.checkPositionIndex(LinkedList.java:564)
	at java.base/java.util.LinkedList.addAll(LinkedList.java:410)
	at java.base/java.util.LinkedList.addAll(LinkedList.java:391)

LinkedList.addAll() 과정에서 IndexOutOfBoundsException이 발생했습니다.

왜 예외가 발생했을까?

결론부터 이야기하자면 Thread-Safe 하지 않았습니다.

LinkedList의 구현을 살펴보면

public boolean addAll(Collection<? extends E> c) {
    return addAll(size, c);
}

public boolean addAll(int index, Collection<? extends E> c) {
    checkPositionIndex(index);

    Object[] a = c.toArray();
    int numNew = a.length;
    ...
    return true;
}

private void checkPositionIndex(int index) {
    if (!isPositionIndex(index))
        throw new IndexOutOfBoundsException(outOfBoundsMsg(index));
}

private boolean isPositionIndex(int index) {
    return index >= 0 && index <= size;
}

.addAll(Collection<? extends E> c) -> addAll(int index, Collection<? extends E> c) -> checkPositionIndex(int index) -> isPositionIndex(int index) 과정에서 .consume() 메서드가 실행되 값이 제거된다면 size가 일치하지 않아 예외가 발생하는 것이었습니다.

image

해결하기

근본적인 원인은 Single Thread 2개로 동작한다는 생각에 Queue에 데이터를 주입하는 과정에 대한 스레드 제한을 설정하지 않아 Thread-Safe 하지 않았다는 것이었습니다.

이에 경합상황이 발생하지 않도록 ReentrantLock을 활용한 락방식을 도입해 문제를 해결했습니다.

  • 구현 코드
public class ReentrantLogQueue<T> implements EventProducer<T>, EventConsumer<T> {

    private final LinkedList<T> queue = new LinkedList<>();
    private final long timeout;
    private final int bulkSize;
    private final ReentrantLock bulkLock = new ReentrantLock();
    private final Condition bulkCondition = bulkLock.newCondition();

    public ReentrantLogQueue(@Value("${jdbc.async.timeout}") Long timeout,
        @Value("${jdbc.async.bulk-size}") Integer bulkSize) {
        this.timeout = timeout;
        this.bulkSize = bulkSize;
    }


    @Override
    public List<T> consume() {
        List<T> result = new ArrayList<>();

        try {
            bulkLock.lockInterruptibly();
            // Case1: Full Flush
            if (queue.size() >= bulkSize) {
                for (int i = 0; i < bulkSize; i++) {
                    result.add(queue.poll());
                }
                return result;
            }
            // Else Case: Blocking
            // Blocked while Queue is Not Empty
            do {
                bulkCondition.await(timeout, TimeUnit.MILLISECONDS);
            } while (queue.isEmpty());

            // Bulk Size 만큼 꺼내서 반환
            for (int i = 0; i < bulkSize; i++) {
                result.add(queue.poll());
                if (queue.isEmpty()) {
                    break;
                }
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            bulkLock.unlock();
        }
        return result;
    }

    @Override
    public void produce(List<T> data) {
        bulkLock.lock();
        try {
            queue.addAll(data);
            if (queue.size() >= bulkSize) {
                bulkCondition.signal();
            }
        } finally {
            bulkLock.unlock();
        }
    }

}
Clone this wiki locally