Skip to content

Commit

Permalink
Merge pull request #5935 from jmacxx/dao_state_store_and_statistics
Browse files Browse the repository at this point in the history
Improve DAO state store persistence and statistics logging [chimp1984]
  • Loading branch information
ripcurlx authored Jan 3, 2022
2 parents 8d4a785 + 30706f0 commit dde48f7
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public interface Listener {
@Nullable
private Runnable createSnapshotHandler;
// Lookup map
private Map<Integer, DaoStateBlock> daoStateBlockByHeight = new HashMap<>();
private final Map<Integer, DaoStateBlock> daoStateBlockByHeight = new HashMap<>();


///////////////////////////////////////////////////////////////////////////////////////////
Expand Down
1 change: 1 addition & 0 deletions core/src/main/java/bisq/core/dao/node/BsqNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ public void setWarnMessageHandler(@SuppressWarnings("NullableProblems") Consumer

public void shutDown() {
exportJsonFilesService.shutDown();
daoStateSnapshotService.shutDown();
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ public void addListeners() {
public void start() {
}

public void shutDown() {
daoStateStorageService.shutDown();
}

///////////////////////////////////////////////////////////////////////////////////////////
// DaoStateListener
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,18 @@
import javax.inject.Inject;
import javax.inject.Named;

import com.google.common.util.concurrent.MoreExecutors;

import java.io.File;
import java.io.IOException;

import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -53,6 +60,8 @@ public class DaoStateStorageService extends StoreService<DaoStateStore> {
private final BsqBlocksStorageService bsqBlocksStorageService;
private final File storageDir;
private final LinkedList<Block> blocks = new LinkedList<>();
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private Optional<Future<?>> future = Optional.empty();


///////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -94,22 +103,36 @@ public void requestPersistence(protobuf.DaoState daoStateAsProto,
return;
}

new Thread(() -> {
Thread.currentThread().setName("Write-blocks-and-DaoState");
bsqBlocksStorageService.persistBlocks(blocks);
if (future.isPresent() && !future.get().isDone()) {
UserThread.runAfter(() -> requestPersistence(daoStateAsProto, blocks, daoStateHashChain, completeHandler), 2);
return;
}

store.setDaoStateAsProto(daoStateAsProto);
store.setDaoStateHashChain(daoStateHashChain);
long ts = System.currentTimeMillis();
persistenceManager.persistNow(() -> {
// After we have written to disk we remove the daoStateAsProto in the store to avoid that it stays in
// memory there until the next persist call.
log.info("Persist daoState took {} ms", System.currentTimeMillis() - ts);
store.releaseMemory();
GcUtil.maybeReleaseMemory();
UserThread.execute(completeHandler);
});
}).start();
future = Optional.of(executorService.submit(() -> {
try {
Thread.currentThread().setName("Write-blocks-and-DaoState");
bsqBlocksStorageService.persistBlocks(blocks);
store.setDaoStateAsProto(daoStateAsProto);
store.setDaoStateHashChain(daoStateHashChain);
long ts = System.currentTimeMillis();
persistenceManager.persistNow(() -> {
// After we have written to disk we remove the daoStateAsProto in the store to avoid that it stays in
// memory there until the next persist call.
log.info("Persist daoState took {} ms", System.currentTimeMillis() - ts);
store.releaseMemory();
GcUtil.maybeReleaseMemory();
UserThread.execute(completeHandler);
});
} catch (Exception e) {
log.error("Exception at persisting BSQ blocks and DaoState", e);
}
}));
}

public void shutDown() {
executorService.shutdown();
// noinspection UnstableApiUsage
MoreExecutors.shutdownAndAwaitTermination(executorService, 10, TimeUnit.SECONDS);
}

@Override
Expand Down
6 changes: 3 additions & 3 deletions p2p/src/main/java/bisq/network/p2p/network/Statistic.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public class Statistic {
totalReceivedBytesPerSec.set(((double) totalReceivedBytes.get()) / passed);
}, 1);

// We log statistics every 5 minutes
// We log statistics every 60 minutes
UserThread.runPeriodically(() -> {
String ls = System.lineSeparator();
log.info("Accumulated network statistics:" + ls +
Expand All @@ -81,14 +81,14 @@ public class Statistic {
"Number of sent messages per sec: {};" + ls +
"Bytes received: {}" + ls +
"Number of received messages/Received messages: {} / {};" + ls +
"Number of received messages per sec: {};" + ls,
"Number of received messages per sec: {}" + ls,
Utilities.readableFileSize(totalSentBytes.get()),
numTotalSentMessages.get(), totalSentMessages,
numTotalSentMessagesPerSec.get(),
Utilities.readableFileSize(totalReceivedBytes.get()),
numTotalReceivedMessages.get(), totalReceivedMessages,
numTotalReceivedMessagesPerSec.get());
}, TimeUnit.MINUTES.toSeconds(5));
}, TimeUnit.MINUTES.toSeconds(60));
}

public static LongProperty totalSentBytesProperty() {
Expand Down
2 changes: 1 addition & 1 deletion p2p/src/main/java/bisq/network/p2p/peers/PeerManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ public void onAwakeFromStandby(long missedMs) {
};
clockWatcher.addListener(clockWatcherListener);

printStatisticsTimer = UserThread.runPeriodically(this::printStatistics, TimeUnit.MINUTES.toSeconds(5));
printStatisticsTimer = UserThread.runPeriodically(this::printStatistics, TimeUnit.MINUTES.toSeconds(60));
}

public void shutDown() {
Expand Down

0 comments on commit dde48f7

Please sign in to comment.