Skip to content

Commit

Permalink
Clean up Node#close. (elastic#39317)
Browse files Browse the repository at this point in the history
`Node#close` is pretty hard to rely on today:
 - it might swallow exceptions
 - it waits for 10 seconds for threads to terminate but doesn't signal anything
   if threads are still not terminated after 10 seconds

This commit makes `IOException`s propagated and splits `Node#close` into
`Node#close` and `Node#awaitClose` so that the decision what to do if a node
takes too long to close can be done on top of `Node#close`.

It also adds synchronization to lifecycle transitions to make them atomic. I
don't think it is a source of problems today, but it makes things easier to
reason about.
  • Loading branch information
jpountz authored and Gurkan Kaymak committed May 27, 2019
1 parent c9afd54 commit cc6c964
Show file tree
Hide file tree
Showing 14 changed files with 278 additions and 85 deletions.
14 changes: 14 additions & 0 deletions server/src/main/java/org/elasticsearch/bootstrap/Bootstrap.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
* Internal startup code.
Expand Down Expand Up @@ -183,8 +184,15 @@ public void run() {
IOUtils.close(node, spawner);
LoggerContext context = (LoggerContext) LogManager.getContext(false);
Configurator.shutdown(context);
if (node != null && node.awaitClose(10, TimeUnit.SECONDS) == false) {
throw new IllegalStateException("Node didn't stop within 10 seconds. " +
"Any outstanding requests or tasks might get killed.");
}
} catch (IOException ex) {
throw new ElasticsearchException("failed to stop node", ex);
} catch (InterruptedException e) {
LogManager.getLogger(Bootstrap.class).warn("Thread got interrupted while waiting for the node to shutdown.");
Thread.currentThread().interrupt();
}
}
});
Expand Down Expand Up @@ -267,6 +275,12 @@ private void start() throws NodeValidationException {
static void stop() throws IOException {
try {
IOUtils.close(INSTANCE.node, INSTANCE.spawner);
if (INSTANCE.node != null && INSTANCE.node.awaitClose(10, TimeUnit.SECONDS) == false) {
throw new IllegalStateException("Node didn't stop within 10 seconds. Any outstanding requests or tasks might get killed.");
}
} catch (InterruptedException e) {
LogManager.getLogger(Bootstrap.class).warn("Thread got interrupted while waiting for the node to shutdown.");
Thread.currentThread().interrupt();
} finally {
INSTANCE.keepAliveLatch.countDown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,12 @@

package org.elasticsearch.common.component;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

public abstract class AbstractLifecycleComponent implements LifecycleComponent {
private static final Logger logger = LogManager.getLogger(AbstractLifecycleComponent.class);

protected final Lifecycle lifecycle = new Lifecycle();

Expand All @@ -52,59 +49,64 @@ public void removeLifecycleListener(LifecycleListener listener) {

@Override
public void start() {
if (!lifecycle.canMoveToStarted()) {
return;
}
for (LifecycleListener listener : listeners) {
listener.beforeStart();
}
doStart();
lifecycle.moveToStarted();
for (LifecycleListener listener : listeners) {
listener.afterStart();
synchronized (lifecycle) {
if (!lifecycle.canMoveToStarted()) {
return;
}
for (LifecycleListener listener : listeners) {
listener.beforeStart();
}
doStart();
lifecycle.moveToStarted();
for (LifecycleListener listener : listeners) {
listener.afterStart();
}
}
}

protected abstract void doStart();

@Override
public void stop() {
if (!lifecycle.canMoveToStopped()) {
return;
}
for (LifecycleListener listener : listeners) {
listener.beforeStop();
}
lifecycle.moveToStopped();
doStop();
for (LifecycleListener listener : listeners) {
listener.afterStop();
synchronized (lifecycle) {
if (!lifecycle.canMoveToStopped()) {
return;
}
for (LifecycleListener listener : listeners) {
listener.beforeStop();
}
lifecycle.moveToStopped();
doStop();
for (LifecycleListener listener : listeners) {
listener.afterStop();
}
}
}

protected abstract void doStop();

@Override
public void close() {
if (lifecycle.started()) {
stop();
}
if (!lifecycle.canMoveToClosed()) {
return;
}
for (LifecycleListener listener : listeners) {
listener.beforeClose();
}
lifecycle.moveToClosed();
try {
doClose();
} catch (IOException e) {
// TODO: we need to separate out closing (ie shutting down) services, vs releasing runtime transient
// structures. Shutting down services should use IOUtils.close
logger.warn("failed to close " + getClass().getName(), e);
}
for (LifecycleListener listener : listeners) {
listener.afterClose();
synchronized (lifecycle) {
if (lifecycle.started()) {
stop();
}
if (!lifecycle.canMoveToClosed()) {
return;
}
for (LifecycleListener listener : listeners) {
listener.beforeClose();
}
lifecycle.moveToClosed();
try {
doClose();
} catch (IOException e) {
throw new UncheckedIOException(e);
} finally {
for (LifecycleListener listener : listeners) {
listener.afterClose();
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,22 @@
* }
* </pre>
* <p>
* NOTE: The Lifecycle class is thread-safe. It is also possible to prevent concurrent state transitions
* by locking on the Lifecycle object itself. This is typically useful when chaining multiple transitions.
* <p>
* Note, closed is only allowed to be called when stopped, so make sure to stop the component first.
* Here is how the logic can be applied:
* Here is how the logic can be applied. A lock of the {@code lifecycleState} object is taken so that
* another thread cannot move the state from {@code STOPPED} to {@code STARTED} before it has moved to
* {@code CLOSED}.
* <pre>
* public void close() {
* if (lifecycleState.started()) {
* stop();
* }
* if (!lifecycleState.moveToClosed()) {
* return;
* synchronized (lifecycleState) {
* if (lifecycleState.started()) {
* stop();
* }
* if (!lifecycleState.moveToClosed()) {
* return;
* }
* }
* // perform close logic here
* }
Expand Down Expand Up @@ -116,7 +123,7 @@ public boolean canMoveToStarted() throws IllegalStateException {
}


public boolean moveToStarted() throws IllegalStateException {
public synchronized boolean moveToStarted() throws IllegalStateException {
State localState = this.state;
if (localState == State.INITIALIZED || localState == State.STOPPED) {
state = State.STARTED;
Expand Down Expand Up @@ -145,7 +152,7 @@ public boolean canMoveToStopped() throws IllegalStateException {
throw new IllegalStateException("Can't move to stopped with unknown state");
}

public boolean moveToStopped() throws IllegalStateException {
public synchronized boolean moveToStopped() throws IllegalStateException {
State localState = state;
if (localState == State.STARTED) {
state = State.STOPPED;
Expand All @@ -171,7 +178,7 @@ public boolean canMoveToClosed() throws IllegalStateException {
return true;
}

public boolean moveToClosed() throws IllegalStateException {
public synchronized boolean moveToClosed() throws IllegalStateException {
State localState = state;
if (localState == State.CLOSED) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,18 @@ public void close() {
public StoredContext stashContext() {
final ThreadContextStruct context = threadLocal.get();
threadLocal.set(null);
return () -> threadLocal.set(context);
return () -> {
// If the node and thus the threadLocal get closed while this task
// is still executing, we don't want this runnable to fail with an
// uncaught exception
try {
threadLocal.set(context);
} catch (IllegalStateException e) {
if (isClosed() == false) {
throw e;
}
}
};
}

/**
Expand Down
16 changes: 16 additions & 0 deletions server/src/main/java/org/elasticsearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader.CacheHelper;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.CollectionUtil;
Expand Down Expand Up @@ -200,6 +201,7 @@ public class IndicesService extends AbstractLifecycleComponent
private final Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders;
private final Map<String, Function<IndexSettings, IndexStore>> indexStoreFactories;
final AbstractRefCounted indicesRefCount; // pkg-private for testing
private final CountDownLatch closeLatch = new CountDownLatch(1);

@Override
protected void doStart() {
Expand Down Expand Up @@ -273,6 +275,8 @@ protected void closeInternal() {
indicesQueryCache);
} catch (IOException e) {
throw new UncheckedIOException(e);
} finally {
closeLatch.countDown();
}
}
};
Expand Down Expand Up @@ -311,6 +315,18 @@ protected void doClose() throws IOException {
indicesRefCount.decRef();
}

/**
* Wait for this {@link IndicesService} to be effectively closed. When this returns {@code true}, all shards and shard stores
* are closed and all shard {@link CacheHelper#addClosedListener(org.apache.lucene.index.IndexReader.ClosedListener) closed
* listeners} have run. However some {@link IndexEventListener#onStoreClosed(ShardId) shard closed listeners} might not have
* run.
* @returns true if all shards closed within the given timeout, false otherwise
* @throws InterruptedException if the current thread got interrupted while waiting for shards to close
*/
public boolean awaitClose(long timeout, TimeUnit timeUnit) throws InterruptedException {
return closeLatch.await(timeout, timeUnit);
}

/**
* Returns the node stats indices stats. The {@code includePrevious} flag controls
* if old shards stats will be aggregated as well (only for relevant stats, such as
Expand Down
51 changes: 34 additions & 17 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -783,11 +783,13 @@ private Node stop() {
// In this case the process will be terminated even if the first call to close() has not finished yet.
@Override
public synchronized void close() throws IOException {
if (lifecycle.started()) {
stop();
}
if (!lifecycle.moveToClosed()) {
return;
synchronized (lifecycle) {
if (lifecycle.started()) {
stop();
}
if (!lifecycle.moveToClosed()) {
return;
}
}

logger.info("closing ...");
Expand Down Expand Up @@ -835,21 +837,12 @@ public synchronized void close() throws IOException {
toClose.add(injector.getInstance(ScriptService.class));

toClose.add(() -> stopWatch.stop().start("thread_pool"));
// TODO this should really use ThreadPool.terminate()
toClose.add(() -> injector.getInstance(ThreadPool.class).shutdown());
toClose.add(() -> {
try {
injector.getInstance(ThreadPool.class).awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// ignore
}
});

toClose.add(() -> stopWatch.stop().start("thread_pool_force_shutdown"));
toClose.add(() -> injector.getInstance(ThreadPool.class).shutdownNow());
// Don't call shutdownNow here, it might break ongoing operations on Lucene indices.
// See https://issues.apache.org/jira/browse/LUCENE-7248. We call shutdownNow in
// awaitClose if the node doesn't finish closing within the specified time.
toClose.add(() -> stopWatch.stop());


toClose.add(injector.getInstance(NodeEnvironment.class));
toClose.add(injector.getInstance(PageCacheRecycler.class));

Expand All @@ -860,6 +853,30 @@ public synchronized void close() throws IOException {
logger.info("closed");
}

/**
* Wait for this node to be effectively closed.
*/
// synchronized to prevent running concurrently with close()
public synchronized boolean awaitClose(long timeout, TimeUnit timeUnit) throws InterruptedException {
if (lifecycle.closed() == false) {
// We don't want to shutdown the threadpool or interrupt threads on a node that is not
// closed yet.
throw new IllegalStateException("Call close() first");
}


ThreadPool threadPool = injector.getInstance(ThreadPool.class);
final boolean terminated = ThreadPool.terminate(threadPool, timeout, timeUnit);
if (terminated) {
// All threads terminated successfully. Because search, recovery and all other operations
// that run on shards run in the threadpool, indices should be effectively closed by now.
if (nodeService.awaitClose(0, TimeUnit.MILLISECONDS) == false) {
throw new IllegalStateException("Some shards are still open after the threadpool terminated. " +
"Something is leaking index readers or store references.");
}
}
return terminated;
}

/**
* Returns {@code true} if the node is closed.
Expand Down
9 changes: 9 additions & 0 deletions server/src/main/java/org/elasticsearch/node/NodeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.TimeUnit;

public class NodeService implements Closeable {
private final Settings settings;
Expand Down Expand Up @@ -135,4 +136,12 @@ public void close() throws IOException {
IOUtils.close(indicesService);
}

/**
* Wait for the node to be effectively closed.
* @see IndicesService#awaitClose(long, TimeUnit)
*/
public boolean awaitClose(long timeout, TimeUnit timeUnit) throws InterruptedException {
return indicesService.awaitClose(timeout, timeUnit);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ private InetSocketAddress bindToPort(final String name, final InetAddress hostAd
final AtomicReference<InetSocketAddress> boundSocket = new AtomicReference<>();
closeLock.writeLock().lock();
try {
// No need for locking here since Lifecycle objects can't move from STARTED to INITIALIZED
if (lifecycle.initialized() == false && lifecycle.started() == false) {
throw new IllegalStateException("transport has been stopped");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,10 @@ public void onFailure(Exception e) {

@Override
public void close() {
lifecycle.moveToStopped();
lifecycle.moveToClosed();
synchronized (lifecycle) {
lifecycle.moveToStopped();
lifecycle.moveToClosed();
}
}

private class ScheduledPing extends AbstractLifecycleRunnable {
Expand Down
Loading

0 comments on commit cc6c964

Please sign in to comment.