Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve shutdown process #5905

Merged
merged 4 commits into from
Dec 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion common/src/main/java/bisq/common/ClockWatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ public void start() {
}

public void shutDown() {
timer.stop();
if (timer != null) {
timer.stop();
}
timer = null;
counter = 0;
}
Expand Down
72 changes: 31 additions & 41 deletions core/src/main/java/bisq/core/app/BisqExecutable.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@
import com.google.inject.Injector;

import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -233,6 +236,16 @@ public void gracefulShutDown(ResultHandler resultHandler) {
System.exit(EXIT_SUCCESS);
}

// We do not use the UserThread to avoid that the timeout would not get triggered in case the UserThread
// would get blocked by a shutdown routine.
new Timer().schedule(new TimerTask() {
@Override
public void run() {
log.warn("Graceful shutdown not completed in 10 sec. We trigger our timeout handler.");
flushAndExit(resultHandler, EXIT_SUCCESS);
}
}, 10000);

try {
injector.getInstance(ClockWatcher.class).shutDown();
injector.getInstance(OpenBsqSwapOfferService.class).shutDown();
Expand All @@ -250,58 +263,35 @@ public void gracefulShutDown(ResultHandler resultHandler) {
injector.getInstance(BtcWalletService.class).shutDown();
injector.getInstance(BsqWalletService.class).shutDown();

// We need to shutdown BitcoinJ before the P2PService as it uses Tor.
// We need to shut down BitcoinJ before the P2PService as it uses Tor.
WalletsSetup walletsSetup = injector.getInstance(WalletsSetup.class);
walletsSetup.shutDownComplete.addListener((ov, o, n) -> {
log.info("WalletsSetup shutdown completed");

injector.getInstance(P2PService.class).shutDown(() -> {
log.info("P2PService shutdown completed");
module.close(injector);
if (!hasDowngraded) {
// If user tried to downgrade we do not write the persistable data to avoid data corruption
PersistenceManager.flushAllDataToDiskAtShutdown(() -> {
log.info("Graceful shutdown completed. Exiting now.");
resultHandler.handleResult();
UserThread.runAfter(() -> System.exit(EXIT_SUCCESS), 1);
});
} else {
UserThread.runAfter(() -> System.exit(EXIT_SUCCESS), 1);
}
flushAndExit(resultHandler, EXIT_SUCCESS);
});
});
walletsSetup.shutDown();

});

// Wait max 20 sec.
UserThread.runAfter(() -> {
log.warn("Graceful shut down not completed in 20 sec. We trigger our timeout handler.");
if (!hasDowngraded) {
// If user tried to downgrade we do not write the persistable data to avoid data corruption
PersistenceManager.flushAllDataToDiskAtShutdown(() -> {
log.info("Graceful shutdown resulted in a timeout. Exiting now.");
resultHandler.handleResult();
UserThread.runAfter(() -> System.exit(EXIT_SUCCESS), 1);
});
} else {
UserThread.runAfter(() -> System.exit(EXIT_SUCCESS), 1);
}

}, 20);
} catch (Throwable t) {
log.error("App shutdown failed with exception {}", t.toString());
t.printStackTrace();
if (!hasDowngraded) {
// If user tried to downgrade we do not write the persistable data to avoid data corruption
PersistenceManager.flushAllDataToDiskAtShutdown(() -> {
log.info("Graceful shutdown resulted in an error. Exiting now.");
resultHandler.handleResult();
UserThread.runAfter(() -> System.exit(EXIT_FAILURE), 1);
});
} else {
UserThread.runAfter(() -> System.exit(EXIT_FAILURE), 1);
}
log.error("App shutdown failed with an exception", t);
flushAndExit(resultHandler, EXIT_FAILURE);
}
}

private void flushAndExit(ResultHandler resultHandler, int status) {
if (!hasDowngraded) {
// If user tried to downgrade we do not write the persistable data to avoid data corruption
log.info("PersistenceManager flushAllDataToDiskAtShutdown started");
PersistenceManager.flushAllDataToDiskAtShutdown(() -> {
log.info("Graceful shutdown completed. Exiting now.");
resultHandler.handleResult();
UserThread.runAfter(() -> System.exit(status), 100, TimeUnit.MILLISECONDS);
});
} else {
UserThread.runAfter(() -> System.exit(status), 100, TimeUnit.MILLISECONDS);
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/bisq/core/btc/setup/WalletsSetup.java
Original file line number Diff line number Diff line change
Expand Up @@ -334,9 +334,9 @@ public void failed(@NotNull Service.State from, @NotNull Throwable failure) {
}

public void shutDown() {
log.info("walletsSetup shutDown started");
if (walletConfig != null) {
try {
log.info("walletConfig shutDown started");
walletConfig.stopAsync();
walletConfig.awaitTerminated(1, TimeUnit.SECONDS);
log.info("walletConfig shutDown completed");
Expand Down
6 changes: 3 additions & 3 deletions p2p/src/main/java/bisq/network/p2p/P2PService.java
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ public void onAllServicesInitialized() {
}

public void shutDown(Runnable shutDownCompleteHandler) {
log.info("P2PService shutdown started");
shutDownResultHandlers.add(shutDownCompleteHandler);

// We need to make sure queued up messages are flushed out before we continue shut down other network
Expand All @@ -198,6 +199,7 @@ public void shutDown(Runnable shutDownCompleteHandler) {
}

private void doShutDown() {
log.info("P2PService doShutDown started");
if (p2PDataStorage != null) {
p2PDataStorage.shutDown();
}
Expand All @@ -223,9 +225,7 @@ private void doShutDown() {
}

if (networkNode != null) {
networkNode.shutDown(() -> {
shutDownResultHandlers.forEach(Runnable::run);
});
networkNode.shutDown(() -> shutDownResultHandlers.forEach(Runnable::run));
} else {
shutDownResultHandlers.forEach(Runnable::run);
}
Expand Down
15 changes: 10 additions & 5 deletions p2p/src/main/java/bisq/network/p2p/network/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -459,8 +459,9 @@ public void shutDown(CloseConnectionReason closeConnectionReason) {
}

public void shutDown(CloseConnectionReason closeConnectionReason, @Nullable Runnable shutDownCompleteHandler) {
log.debug("shutDown: nodeAddressOpt={}, closeConnectionReason={}",
this.peersNodeAddressOptional.orElse(null), closeConnectionReason);
log.info("Connection shutdown started");
log.debug("shutDown: peersNodeAddressOptional={}, closeConnectionReason={}",
peersNodeAddressOptional, closeConnectionReason);

connectionState.shutDown();

Expand Down Expand Up @@ -503,7 +504,8 @@ public void shutDown(CloseConnectionReason closeConnectionReason, @Nullable Runn
}

private void doShutDown(CloseConnectionReason closeConnectionReason, @Nullable Runnable shutDownCompleteHandler) {
// Use UserThread.execute as its not clear if that is called from a non-UserThread
log.info("Connection doShutDown started");
// Use UserThread.execute as it's not clear if that is called from a non-UserThread
UserThread.execute(() -> connectionListener.onDisconnect(closeConnectionReason, this));
try {
socket.close();
Expand All @@ -525,10 +527,10 @@ private void doShutDown(CloseConnectionReason closeConnectionReason, @Nullable R
}

//noinspection UnstableApiUsage
MoreExecutors.shutdownAndAwaitTermination(singleThreadExecutor, 500, TimeUnit.MILLISECONDS);
MoreExecutors.shutdownAndAwaitTermination(singleThreadExecutor, 100, TimeUnit.MILLISECONDS);

log.debug("Connection shutdown complete {}", this);
// Use UserThread.execute as its not clear if that is called from a non-UserThread
// Use UserThread.execute as it's not clear if that is called from a non-UserThread
if (shutDownCompleteHandler != null)
UserThread.execute(shutDownCompleteHandler);
}
Expand Down Expand Up @@ -725,6 +727,9 @@ public void run() {
}

if (proto == null) {
if (stopped) {
return;
}
if (protoInputStream.read() == -1) {
log.warn("proto is null because protoInputStream.read()=-1 (EOF). That is expected if client got stopped without proper shutdown.");
} else {
Expand Down
3 changes: 2 additions & 1 deletion p2p/src/main/java/bisq/network/p2p/network/NetworkNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ public Set<NodeAddress> getNodeAddressesOfConfirmedConnections() {


public void shutDown(Runnable shutDownCompleteHandler) {
log.info("NetworkNode shutdown started");
if (!shutDownInProgress) {
shutDownInProgress = true;
if (server != null) {
Expand All @@ -354,7 +355,7 @@ public void shutDown(Runnable shutDownCompleteHandler) {
log.info("Shutdown completed due timeout");
shutDownCompleteHandler.run();
}
}, 3);
}, 1500, TimeUnit.MILLISECONDS);

allConnections.forEach(c -> c.shutDown(CloseConnectionReason.APP_SHUT_DOWN,
() -> {
Expand Down
3 changes: 2 additions & 1 deletion p2p/src/main/java/bisq/network/p2p/network/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,11 @@ public void run() {
}

public void shutDown() {
log.info("Server shutdown started");
if (!stopped) {
stopped = true;

connections.stream().forEach(c -> c.shutDown(CloseConnectionReason.APP_SHUT_DOWN));
connections.forEach(connection -> connection.shutDown(CloseConnectionReason.APP_SHUT_DOWN));

try {
if (!serverSocket.isClosed())
Expand Down
116 changes: 35 additions & 81 deletions p2p/src/main/java/bisq/network/p2p/network/TorNetworkNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,6 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;

import org.fxmisc.easybind.EasyBind;
import org.fxmisc.easybind.monadic.MonadicBinding;

import javafx.beans.property.BooleanProperty;
import javafx.beans.property.SimpleBooleanProperty;

import java.security.SecureRandom;

import java.net.Socket;
Expand All @@ -59,28 +53,22 @@

import static com.google.common.base.Preconditions.checkArgument;

// Run in UserThread
public class TorNetworkNode extends NetworkNode {

private static final Logger log = LoggerFactory.getLogger(TorNetworkNode.class);

private static final int MAX_RESTART_ATTEMPTS = 5;
private static final long SHUT_DOWN_TIMEOUT = 5;

private static final long SHUT_DOWN_TIMEOUT = 2;

private HiddenServiceSocket hiddenServiceSocket;
private Timer shutDownTimeoutTimer;
private int restartCounter;
@SuppressWarnings("FieldCanBeLocal")
private MonadicBinding<Boolean> allShutDown;
private Tor tor;

private TorMode torMode;

private boolean streamIsolation;

private Socks5Proxy socksProxy;
private ListenableFuture<Void> torStartupFuture;
private boolean shutDownInProgress;


///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
Expand Down Expand Up @@ -117,7 +105,7 @@ public void start(@Nullable SetupListener setupListener) {
protected Socket createSocket(NodeAddress peerNodeAddress) throws IOException {
checkArgument(peerNodeAddress.getHostName().endsWith(".onion"), "PeerAddress is not an onion address");
// If streamId is null stream isolation gets deactivated.
// Hidden services use stream isolation by default so we pass null.
// Hidden services use stream isolation by default, so we pass null.
return new TorSocket(peerNodeAddress.getHostName(), peerNodeAddress.getPort(), null);
}

Expand Down Expand Up @@ -150,77 +138,43 @@ public Socks5Proxy getSocksProxy() {
}

public void shutDown(@Nullable Runnable shutDownCompleteHandler) {
if (allShutDown != null) {
log.warn("We got called shutDown again and ignore it.");
log.info("TorNetworkNode shutdown started");
if (shutDownInProgress) {
log.warn("We got shutDown already called");
return;
}
// this one is executed synchronously
BooleanProperty networkNodeShutDown = networkNodeShutDown();
// this one is committed as a thread to the executor
BooleanProperty torNetworkNodeShutDown = torNetworkNodeShutDown();
BooleanProperty shutDownTimerTriggered = shutDownTimerTriggered();
// Need to store allShutDown to not get garbage collected
allShutDown = EasyBind.combine(torNetworkNodeShutDown, networkNodeShutDown, shutDownTimerTriggered,
(a, b, c) -> (a && b) || c);
allShutDown.subscribe((observable, oldValue, newValue) -> {
if (newValue) {
shutDownTimeoutTimer.stop();
long ts = System.currentTimeMillis();
try {
MoreExecutors.shutdownAndAwaitTermination(executorService, 500, TimeUnit.MILLISECONDS);
log.debug("Shutdown executorService done after {} ms.", System.currentTimeMillis() - ts);
} catch (Throwable t) {
log.error("Shutdown executorService failed with exception: {}", t.getMessage());
t.printStackTrace();
} finally {
if (shutDownCompleteHandler != null)
shutDownCompleteHandler.run();
}
}
});
}

private BooleanProperty torNetworkNodeShutDown() {
BooleanProperty done = new SimpleBooleanProperty();
try {
tor = Tor.getDefault();
if (tor != null) {
log.info("Tor has been created already so we can shut it down.");
tor.shutdown();
tor = null;
log.info("Tor shut down completed");
} else {
log.info("Tor has not been created yet. We cancel the torStartupFuture.");
if (torStartupFuture != null) {
torStartupFuture.cancel(true);
}
log.info("torStartupFuture cancelled");
}
} catch (Throwable e) {
log.error("Shutdown torNetworkNode failed with exception: {}", e.getMessage());
e.printStackTrace();

} finally {
// We need to delay as otherwise our listener would not get called if shutdown completes in synchronous manner
UserThread.execute(() -> done.set(true));
}
return done;
}

private BooleanProperty networkNodeShutDown() {
BooleanProperty done = new SimpleBooleanProperty();
// We need to delay as otherwise our listener would not get called if shutdown completes in synchronous manner
UserThread.execute(() -> super.shutDown(() -> done.set(true)));
return done;
}
shutDownInProgress = true;

private BooleanProperty shutDownTimerTriggered() {
BooleanProperty done = new SimpleBooleanProperty();
shutDownTimeoutTimer = UserThread.runAfter(() -> {
log.error("A timeout occurred at shutDown");
done.set(true);
if (shutDownCompleteHandler != null)
shutDownCompleteHandler.run();
}, SHUT_DOWN_TIMEOUT);
return done;

// Shutdown networkNode first
super.shutDown(() -> {
try {
tor = Tor.getDefault();
if (tor != null) {
tor.shutdown();
tor = null;
log.info("Tor shutdown completed");
} else {
log.info("Tor has not been created yet. We cancel the torStartupFuture.");
if (torStartupFuture != null) {
torStartupFuture.cancel(true);
}
log.info("torStartupFuture cancelled");
}
MoreExecutors.shutdownAndAwaitTermination(executorService, 100, TimeUnit.MILLISECONDS);
} catch (Throwable e) {
log.error("Shutdown torNetworkNode failed with exception", e);
} finally {
shutDownTimeoutTimer.stop();
if (shutDownCompleteHandler != null)
shutDownCompleteHandler.run();
}
});
}


Expand Down
Loading