Skip to content

Commit

Permalink
Merge pull request #5905 from chimp1984/improve_shutdown_process
Browse files Browse the repository at this point in the history
Improve shutdown process
  • Loading branch information
ripcurlx authored Dec 8, 2021
2 parents 226a81e + 3d40b3b commit ff38097
Show file tree
Hide file tree
Showing 9 changed files with 92 additions and 134 deletions.
4 changes: 3 additions & 1 deletion common/src/main/java/bisq/common/ClockWatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ public void start() {
}

public void shutDown() {
timer.stop();
if (timer != null) {
timer.stop();
}
timer = null;
counter = 0;
}
Expand Down
72 changes: 31 additions & 41 deletions core/src/main/java/bisq/core/app/BisqExecutable.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@
import com.google.inject.Injector;

import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -233,6 +236,16 @@ public void gracefulShutDown(ResultHandler resultHandler) {
System.exit(EXIT_SUCCESS);
}

// We do not use the UserThread to avoid that the timeout would not get triggered in case the UserThread
// would get blocked by a shutdown routine.
new Timer().schedule(new TimerTask() {
@Override
public void run() {
log.warn("Graceful shutdown not completed in 10 sec. We trigger our timeout handler.");
flushAndExit(resultHandler, EXIT_SUCCESS);
}
}, 10000);

try {
injector.getInstance(ClockWatcher.class).shutDown();
injector.getInstance(OpenBsqSwapOfferService.class).shutDown();
Expand All @@ -250,58 +263,35 @@ public void gracefulShutDown(ResultHandler resultHandler) {
injector.getInstance(BtcWalletService.class).shutDown();
injector.getInstance(BsqWalletService.class).shutDown();

// We need to shutdown BitcoinJ before the P2PService as it uses Tor.
// We need to shut down BitcoinJ before the P2PService as it uses Tor.
WalletsSetup walletsSetup = injector.getInstance(WalletsSetup.class);
walletsSetup.shutDownComplete.addListener((ov, o, n) -> {
log.info("WalletsSetup shutdown completed");

injector.getInstance(P2PService.class).shutDown(() -> {
log.info("P2PService shutdown completed");
module.close(injector);
if (!hasDowngraded) {
// If user tried to downgrade we do not write the persistable data to avoid data corruption
PersistenceManager.flushAllDataToDiskAtShutdown(() -> {
log.info("Graceful shutdown completed. Exiting now.");
resultHandler.handleResult();
UserThread.runAfter(() -> System.exit(EXIT_SUCCESS), 1);
});
} else {
UserThread.runAfter(() -> System.exit(EXIT_SUCCESS), 1);
}
flushAndExit(resultHandler, EXIT_SUCCESS);
});
});
walletsSetup.shutDown();

});

// Wait max 20 sec.
UserThread.runAfter(() -> {
log.warn("Graceful shut down not completed in 20 sec. We trigger our timeout handler.");
if (!hasDowngraded) {
// If user tried to downgrade we do not write the persistable data to avoid data corruption
PersistenceManager.flushAllDataToDiskAtShutdown(() -> {
log.info("Graceful shutdown resulted in a timeout. Exiting now.");
resultHandler.handleResult();
UserThread.runAfter(() -> System.exit(EXIT_SUCCESS), 1);
});
} else {
UserThread.runAfter(() -> System.exit(EXIT_SUCCESS), 1);
}

}, 20);
} catch (Throwable t) {
log.error("App shutdown failed with exception {}", t.toString());
t.printStackTrace();
if (!hasDowngraded) {
// If user tried to downgrade we do not write the persistable data to avoid data corruption
PersistenceManager.flushAllDataToDiskAtShutdown(() -> {
log.info("Graceful shutdown resulted in an error. Exiting now.");
resultHandler.handleResult();
UserThread.runAfter(() -> System.exit(EXIT_FAILURE), 1);
});
} else {
UserThread.runAfter(() -> System.exit(EXIT_FAILURE), 1);
}
log.error("App shutdown failed with an exception", t);
flushAndExit(resultHandler, EXIT_FAILURE);
}
}

private void flushAndExit(ResultHandler resultHandler, int status) {
if (!hasDowngraded) {
// If user tried to downgrade we do not write the persistable data to avoid data corruption
log.info("PersistenceManager flushAllDataToDiskAtShutdown started");
PersistenceManager.flushAllDataToDiskAtShutdown(() -> {
log.info("Graceful shutdown completed. Exiting now.");
resultHandler.handleResult();
UserThread.runAfter(() -> System.exit(status), 100, TimeUnit.MILLISECONDS);
});
} else {
UserThread.runAfter(() -> System.exit(status), 100, TimeUnit.MILLISECONDS);
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/bisq/core/btc/setup/WalletsSetup.java
Original file line number Diff line number Diff line change
Expand Up @@ -334,9 +334,9 @@ public void failed(@NotNull Service.State from, @NotNull Throwable failure) {
}

public void shutDown() {
log.info("walletsSetup shutDown started");
if (walletConfig != null) {
try {
log.info("walletConfig shutDown started");
walletConfig.stopAsync();
walletConfig.awaitTerminated(1, TimeUnit.SECONDS);
log.info("walletConfig shutDown completed");
Expand Down
6 changes: 3 additions & 3 deletions p2p/src/main/java/bisq/network/p2p/P2PService.java
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ public void onAllServicesInitialized() {
}

public void shutDown(Runnable shutDownCompleteHandler) {
log.info("P2PService shutdown started");
shutDownResultHandlers.add(shutDownCompleteHandler);

// We need to make sure queued up messages are flushed out before we continue shut down other network
Expand All @@ -198,6 +199,7 @@ public void shutDown(Runnable shutDownCompleteHandler) {
}

private void doShutDown() {
log.info("P2PService doShutDown started");
if (p2PDataStorage != null) {
p2PDataStorage.shutDown();
}
Expand All @@ -223,9 +225,7 @@ private void doShutDown() {
}

if (networkNode != null) {
networkNode.shutDown(() -> {
shutDownResultHandlers.forEach(Runnable::run);
});
networkNode.shutDown(() -> shutDownResultHandlers.forEach(Runnable::run));
} else {
shutDownResultHandlers.forEach(Runnable::run);
}
Expand Down
15 changes: 10 additions & 5 deletions p2p/src/main/java/bisq/network/p2p/network/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -459,8 +459,9 @@ public void shutDown(CloseConnectionReason closeConnectionReason) {
}

public void shutDown(CloseConnectionReason closeConnectionReason, @Nullable Runnable shutDownCompleteHandler) {
log.debug("shutDown: nodeAddressOpt={}, closeConnectionReason={}",
this.peersNodeAddressOptional.orElse(null), closeConnectionReason);
log.info("Connection shutdown started");
log.debug("shutDown: peersNodeAddressOptional={}, closeConnectionReason={}",
peersNodeAddressOptional, closeConnectionReason);

connectionState.shutDown();

Expand Down Expand Up @@ -503,7 +504,8 @@ public void shutDown(CloseConnectionReason closeConnectionReason, @Nullable Runn
}

private void doShutDown(CloseConnectionReason closeConnectionReason, @Nullable Runnable shutDownCompleteHandler) {
// Use UserThread.execute as its not clear if that is called from a non-UserThread
log.info("Connection doShutDown started");
// Use UserThread.execute as it's not clear if that is called from a non-UserThread
UserThread.execute(() -> connectionListener.onDisconnect(closeConnectionReason, this));
try {
socket.close();
Expand All @@ -525,10 +527,10 @@ private void doShutDown(CloseConnectionReason closeConnectionReason, @Nullable R
}

//noinspection UnstableApiUsage
MoreExecutors.shutdownAndAwaitTermination(singleThreadExecutor, 500, TimeUnit.MILLISECONDS);
MoreExecutors.shutdownAndAwaitTermination(singleThreadExecutor, 100, TimeUnit.MILLISECONDS);

log.debug("Connection shutdown complete {}", this);
// Use UserThread.execute as its not clear if that is called from a non-UserThread
// Use UserThread.execute as it's not clear if that is called from a non-UserThread
if (shutDownCompleteHandler != null)
UserThread.execute(shutDownCompleteHandler);
}
Expand Down Expand Up @@ -725,6 +727,9 @@ public void run() {
}

if (proto == null) {
if (stopped) {
return;
}
if (protoInputStream.read() == -1) {
log.warn("proto is null because protoInputStream.read()=-1 (EOF). That is expected if client got stopped without proper shutdown.");
} else {
Expand Down
3 changes: 2 additions & 1 deletion p2p/src/main/java/bisq/network/p2p/network/NetworkNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ public Set<NodeAddress> getNodeAddressesOfConfirmedConnections() {


public void shutDown(Runnable shutDownCompleteHandler) {
log.info("NetworkNode shutdown started");
if (!shutDownInProgress) {
shutDownInProgress = true;
if (server != null) {
Expand All @@ -354,7 +355,7 @@ public void shutDown(Runnable shutDownCompleteHandler) {
log.info("Shutdown completed due timeout");
shutDownCompleteHandler.run();
}
}, 3);
}, 1500, TimeUnit.MILLISECONDS);

allConnections.forEach(c -> c.shutDown(CloseConnectionReason.APP_SHUT_DOWN,
() -> {
Expand Down
3 changes: 2 additions & 1 deletion p2p/src/main/java/bisq/network/p2p/network/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,11 @@ public void run() {
}

public void shutDown() {
log.info("Server shutdown started");
if (!stopped) {
stopped = true;

connections.stream().forEach(c -> c.shutDown(CloseConnectionReason.APP_SHUT_DOWN));
connections.forEach(connection -> connection.shutDown(CloseConnectionReason.APP_SHUT_DOWN));

try {
if (!serverSocket.isClosed())
Expand Down
116 changes: 35 additions & 81 deletions p2p/src/main/java/bisq/network/p2p/network/TorNetworkNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,6 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;

import org.fxmisc.easybind.EasyBind;
import org.fxmisc.easybind.monadic.MonadicBinding;

import javafx.beans.property.BooleanProperty;
import javafx.beans.property.SimpleBooleanProperty;

import java.security.SecureRandom;

import java.net.Socket;
Expand All @@ -59,28 +53,22 @@

import static com.google.common.base.Preconditions.checkArgument;

// Run in UserThread
public class TorNetworkNode extends NetworkNode {

private static final Logger log = LoggerFactory.getLogger(TorNetworkNode.class);

private static final int MAX_RESTART_ATTEMPTS = 5;
private static final long SHUT_DOWN_TIMEOUT = 5;

private static final long SHUT_DOWN_TIMEOUT = 2;

private HiddenServiceSocket hiddenServiceSocket;
private Timer shutDownTimeoutTimer;
private int restartCounter;
@SuppressWarnings("FieldCanBeLocal")
private MonadicBinding<Boolean> allShutDown;
private Tor tor;

private TorMode torMode;

private boolean streamIsolation;

private Socks5Proxy socksProxy;
private ListenableFuture<Void> torStartupFuture;
private boolean shutDownInProgress;


///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
Expand Down Expand Up @@ -117,7 +105,7 @@ public void start(@Nullable SetupListener setupListener) {
protected Socket createSocket(NodeAddress peerNodeAddress) throws IOException {
checkArgument(peerNodeAddress.getHostName().endsWith(".onion"), "PeerAddress is not an onion address");
// If streamId is null stream isolation gets deactivated.
// Hidden services use stream isolation by default so we pass null.
// Hidden services use stream isolation by default, so we pass null.
return new TorSocket(peerNodeAddress.getHostName(), peerNodeAddress.getPort(), null);
}

Expand Down Expand Up @@ -150,77 +138,43 @@ public Socks5Proxy getSocksProxy() {
}

public void shutDown(@Nullable Runnable shutDownCompleteHandler) {
if (allShutDown != null) {
log.warn("We got called shutDown again and ignore it.");
log.info("TorNetworkNode shutdown started");
if (shutDownInProgress) {
log.warn("We got shutDown already called");
return;
}
// this one is executed synchronously
BooleanProperty networkNodeShutDown = networkNodeShutDown();
// this one is committed as a thread to the executor
BooleanProperty torNetworkNodeShutDown = torNetworkNodeShutDown();
BooleanProperty shutDownTimerTriggered = shutDownTimerTriggered();
// Need to store allShutDown to not get garbage collected
allShutDown = EasyBind.combine(torNetworkNodeShutDown, networkNodeShutDown, shutDownTimerTriggered,
(a, b, c) -> (a && b) || c);
allShutDown.subscribe((observable, oldValue, newValue) -> {
if (newValue) {
shutDownTimeoutTimer.stop();
long ts = System.currentTimeMillis();
try {
MoreExecutors.shutdownAndAwaitTermination(executorService, 500, TimeUnit.MILLISECONDS);
log.debug("Shutdown executorService done after {} ms.", System.currentTimeMillis() - ts);
} catch (Throwable t) {
log.error("Shutdown executorService failed with exception: {}", t.getMessage());
t.printStackTrace();
} finally {
if (shutDownCompleteHandler != null)
shutDownCompleteHandler.run();
}
}
});
}

private BooleanProperty torNetworkNodeShutDown() {
BooleanProperty done = new SimpleBooleanProperty();
try {
tor = Tor.getDefault();
if (tor != null) {
log.info("Tor has been created already so we can shut it down.");
tor.shutdown();
tor = null;
log.info("Tor shut down completed");
} else {
log.info("Tor has not been created yet. We cancel the torStartupFuture.");
if (torStartupFuture != null) {
torStartupFuture.cancel(true);
}
log.info("torStartupFuture cancelled");
}
} catch (Throwable e) {
log.error("Shutdown torNetworkNode failed with exception: {}", e.getMessage());
e.printStackTrace();

} finally {
// We need to delay as otherwise our listener would not get called if shutdown completes in synchronous manner
UserThread.execute(() -> done.set(true));
}
return done;
}

private BooleanProperty networkNodeShutDown() {
BooleanProperty done = new SimpleBooleanProperty();
// We need to delay as otherwise our listener would not get called if shutdown completes in synchronous manner
UserThread.execute(() -> super.shutDown(() -> done.set(true)));
return done;
}
shutDownInProgress = true;

private BooleanProperty shutDownTimerTriggered() {
BooleanProperty done = new SimpleBooleanProperty();
shutDownTimeoutTimer = UserThread.runAfter(() -> {
log.error("A timeout occurred at shutDown");
done.set(true);
if (shutDownCompleteHandler != null)
shutDownCompleteHandler.run();
}, SHUT_DOWN_TIMEOUT);
return done;

// Shutdown networkNode first
super.shutDown(() -> {
try {
tor = Tor.getDefault();
if (tor != null) {
tor.shutdown();
tor = null;
log.info("Tor shutdown completed");
} else {
log.info("Tor has not been created yet. We cancel the torStartupFuture.");
if (torStartupFuture != null) {
torStartupFuture.cancel(true);
}
log.info("torStartupFuture cancelled");
}
MoreExecutors.shutdownAndAwaitTermination(executorService, 100, TimeUnit.MILLISECONDS);
} catch (Throwable e) {
log.error("Shutdown torNetworkNode failed with exception", e);
} finally {
shutDownTimeoutTimer.stop();
if (shutDownCompleteHandler != null)
shutDownCompleteHandler.run();
}
});
}


Expand Down
Loading

0 comments on commit ff38097

Please sign in to comment.