Skip to content

Commit

Permalink
Address concurrency issues in DequeRecycler close
Browse files Browse the repository at this point in the history
This change addresses some concurrency issues that can occur when
closing a DequeRecycler. The first issue that is addressed is a
ConcurrentModificationException that can occur when using a locked
DequeRecycler that is backed by an ArrayDeque. The ArrayDeque is not
thread safe and requires external synchronization. In most cases the
locked DequeRecycler handles this correctly but closing the
DequeRecycler is not protected by the lock, which can lead to the
ConcurrentModificationException if other threads are calling the obtain
method. Additionally, the DequeRecycler close method used an iterator
to go over all entries in the Deque and then later cleared them. The
close method now uses pollFirst in a loop to empty the Deque and still
execute the destroy method.

Finally, the ConcurrentDequeRecycler had an overzealous assertion in
the close method that the size of the Deque is equivalent to the
externally tracked size. This assertion is not always going to be true
due to the nature of the implementation. There is no lock guarding both
the deque and the size value, so there is always a chance that the two
could be wrong depending on ongoing requests. This assertion has been
removed and a comment has been added that mentions there can be some
discrepancies between the actual size of the deque and the externally
tracked size.

These issues may have always been present, but I believe the changes in

Closes elastic#41683
  • Loading branch information
jaymode committed Apr 30, 2019
1 parent 1082da2 commit 2271211
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
*/
public class ConcurrentDequeRecycler<T> extends DequeRecycler<T> {

// we maintain size separately because concurrent deque implementations typically have linear-time size() impls
// we maintain size separately because concurrent deque implementations typically have linear-time size() impls.
// due to the concurrent nature of this class, size may differ from the actual size of the deque depending on
// other concurrent requests and their current state
final AtomicInteger size;

public ConcurrentDequeRecycler(C<T> c, int maxSize) {
Expand All @@ -39,7 +41,6 @@ public ConcurrentDequeRecycler(C<T> c, int maxSize) {

@Override
public void close() {
assert deque.size() == size.get();
super.close();
size.set(0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@


import java.util.Deque;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* A {@link Recycler} implementation based on a {@link Deque}. This implementation is NOT thread-safe.
*/
public class DequeRecycler<T> extends AbstractRecycler<T> {

final AtomicBoolean closed = new AtomicBoolean(false);
final Deque<T> deque;
final int maxSize;

Expand All @@ -38,12 +40,13 @@ public DequeRecycler(C<T> c, Deque<T> queue, int maxSize) {

@Override
public void close() {
// call destroy() for every cached object
for (T t : deque) {
c.destroy(t);
if (closed.compareAndSet(false, true)) {
T t;
while ((t = deque.pollFirst()) != null) {
c.destroy(t);
}
assert deque.size() == 0;
}
// finally get rid of all references
deque.clear();
}

@Override
Expand Down Expand Up @@ -90,7 +93,7 @@ public void close() {
if (value == null) {
throw new IllegalStateException("recycler entry already released...");
}
final boolean recycle = beforeRelease();
final boolean recycle = beforeRelease() && closed.get() == false;
if (recycle) {
c.recycle(value);
deque.addFirst(value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ public static <T> Recycler.Factory<T> dequeFactory(final Recycler.C<T> c, final
}

/**
* Wrap the provided recycler so that calls to {@link Recycler#obtain()} and {@link Recycler.V#close()} are protected by
* a lock.
* Wrap the provided recycler so that calls to {@link Recycler#obtain()}, {@link Recycler#close()} and {@link Recycler.V#close()}
* are protected by a lock.
*/
public static <T> Recycler<T> locked(final Recycler<T> recycler) {
return new FilterRecycler<T>() {
Expand All @@ -79,6 +79,13 @@ public Recycler.V<T> obtain() {
}
}

@Override
public void close() {
synchronized (lock) {
super.close();
}
}

@Override
protected Recycler.V<T> wrap(final Recycler.V<T> delegate) {
return new Recycler.V<T>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,64 @@

package org.elasticsearch.common.recycler;

import org.elasticsearch.common.lease.Releasables;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class LockedRecyclerTests extends AbstractRecyclerTestCase {

@Override
protected Recycler<byte[]> newRecycler(int limit) {
return Recyclers.locked(Recyclers.deque(RECYCLER_C, limit));
}

public void testConcurrentCloseAndObtain() throws Exception {
final int prepopulatedAmount = 1_000_000;
final Recycler<byte[]> recycler = newRecycler(prepopulatedAmount);
final List<Recycler.V<byte[]>> recyclers = new ArrayList<>();
// prepopulate recycler with 1 million entries to ensure we have entries to iterate over
for (int i = 0; i < prepopulatedAmount; i++) {
recyclers.add(recycler.obtain());
}
Releasables.close(recyclers);
final int numberOfProcessors = Runtime.getRuntime().availableProcessors();
final int numberOfThreads = scaledRandomIntBetween((numberOfProcessors + 1) / 2, numberOfProcessors * 3);
final int numberOfIterations = scaledRandomIntBetween(100_000, 1_000_000);
final CountDownLatch latch = new CountDownLatch(1 + numberOfThreads);
List<Thread> threads = new ArrayList<>(numberOfThreads);
for (int i = 0; i < numberOfThreads - 1; i++) {
threads.add(new Thread(() -> {
latch.countDown();
try {
latch.await();
for (int iter = 0; iter < numberOfIterations; iter++) {
Recycler.V<byte[]> o = recycler.obtain();
final byte[] bytes = o.v();
assertNotNull(bytes);
o.close();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}));
}
threads.add(new Thread(() -> {
latch.countDown();
try {
latch.await();
Thread.sleep(randomLongBetween(1, 200));
recycler.close();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}));

threads.forEach(Thread::start);
latch.countDown();
for (Thread t : threads) {
t.join();
}
}
}

0 comments on commit 2271211

Please sign in to comment.