Skip to content

Commit

Permalink
Merge pull request #3154 from chimp1984/Fix-small-P2P-network-issues
Browse files Browse the repository at this point in the history
Fix small p2p network issues
  • Loading branch information
freimair authored Aug 29, 2019
2 parents edc1e69 + 55092ed commit 4e7cb04
Show file tree
Hide file tree
Showing 8 changed files with 197 additions and 93 deletions.
10 changes: 9 additions & 1 deletion common/src/main/java/bisq/common/app/Capabilities.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ public class Capabilities {
*/
public static final Capabilities app = new Capabilities();

// Defines which most recent capability any node need to support.
// This helps to clean network from very old inactive but still running nodes.
private static final Capability mandatoryCapability = Capability.DAO_STATE;

protected final Set<Capability> capabilities = new HashSet<>();

public Capabilities(Capability... capabilities) {
Expand Down Expand Up @@ -71,7 +75,7 @@ public void addAll(Capability... capabilities) {
}

public void addAll(Capabilities capabilities) {
if(capabilities != null)
if (capabilities != null)
this.capabilities.addAll(capabilities.capabilities);
}

Expand Down Expand Up @@ -111,6 +115,10 @@ public static Capabilities fromIntList(List<Integer> capabilities) {
.collect(Collectors.toSet()));
}

public static boolean hasMandatoryCapability(Capabilities capabilities) {
return capabilities.capabilities.stream().anyMatch(c -> c == mandatoryCapability);
}

@Override
public String toString() {
return Arrays.toString(Capabilities.toIntList(this).toArray());
Expand Down
15 changes: 12 additions & 3 deletions core/src/main/java/bisq/core/setup/CoreNetworkCapabilities.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,20 @@
@Slf4j
public class CoreNetworkCapabilities {
public static void setSupportedCapabilities(BisqEnvironment bisqEnvironment) {
Capabilities.app.addAll(Capability.TRADE_STATISTICS, Capability.TRADE_STATISTICS_2, Capability.ACCOUNT_AGE_WITNESS, Capability.ACK_MSG);
Capabilities.app.addAll(Capability.BUNDLE_OF_ENVELOPES, Capability.SIGNED_ACCOUNT_AGE_WITNESS);
Capabilities.app.addAll(
Capability.TRADE_STATISTICS,
Capability.TRADE_STATISTICS_2,
Capability.ACCOUNT_AGE_WITNESS,
Capability.ACK_MSG,
Capability.BUNDLE_OF_ENVELOPES
);

if (BisqEnvironment.isDaoActivated(bisqEnvironment)) {
Capabilities.app.addAll(Capability.PROPOSAL, Capability.BLIND_VOTE, Capability.DAO_STATE);
Capabilities.app.addAll(
Capability.PROPOSAL,
Capability.BLIND_VOTE,
Capability.DAO_STATE
);

maybeApplyDaoFullMode(bisqEnvironment);
}
Expand Down
52 changes: 35 additions & 17 deletions p2p/src/main/java/bisq/network/p2p/network/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ public enum PeerType {
private static ConnectionConfig connectionConfig;

// Leaving some constants package-private for tests to know limits.
static final int PERMITTED_MESSAGE_SIZE = 200 * 1024; // 200 kb
static final int MAX_PERMITTED_MESSAGE_SIZE = 10 * 1024 * 1024; // 10 MB (425 offers resulted in about 660 kb, mailbox msg will add more to it) offer has usually 2 kb, mailbox 3kb.
private static final int PERMITTED_MESSAGE_SIZE = 200 * 1024; // 200 kb
private static final int MAX_PERMITTED_MESSAGE_SIZE = 10 * 1024 * 1024; // 10 MB (425 offers resulted in about 660 kb, mailbox msg will add more to it) offer has usually 2 kb, mailbox 3kb.
//TODO decrease limits again after testing
private static final int SOCKET_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(120);

Expand Down Expand Up @@ -172,7 +172,7 @@ public static int getPermittedMessageSize() {
private RuleViolation ruleViolation;
private final ConcurrentHashMap<RuleViolation, Integer> ruleViolations = new ConcurrentHashMap<>();

private Capabilities capabilities = new Capabilities();
private final Capabilities capabilities = new Capabilities();


///////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -233,9 +233,9 @@ public Capabilities getCapabilities() {
return capabilities;
}

Object lock = new Object();
Queue<BundleOfEnvelopes> queueOfBundles = new ConcurrentLinkedQueue<>();
ScheduledExecutorService bundleSender = Executors.newSingleThreadScheduledExecutor();
private final Object lock = new Object();
private final Queue<BundleOfEnvelopes> queueOfBundles = new ConcurrentLinkedQueue<>();
private final ScheduledExecutorService bundleSender = Executors.newSingleThreadScheduledExecutor();

// Called from various threads
public void sendMessage(NetworkEnvelope networkEnvelope) {
Expand All @@ -250,7 +250,7 @@ public void sendMessage(NetworkEnvelope networkEnvelope) {
log.debug("Sending message: {}", Utilities.toTruncatedString(proto.toString(), 10000));

if (networkEnvelope instanceof Ping | networkEnvelope instanceof RefreshOfferMessage) {
// pings and offer refresh msg we dont want to log in production
// pings and offer refresh msg we don't want to log in production
log.trace("\n\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n" +
"Sending direct message to peer" +
"Write object to outputStream to peer: {} (uid={})\ntruncated message={} / size={}" +
Expand Down Expand Up @@ -298,10 +298,13 @@ public void sendMessage(NetworkEnvelope networkEnvelope) {
if (!stopped) {
synchronized (lock) {
BundleOfEnvelopes current = queueOfBundles.poll();
if(current.getEnvelopes().size() == 1)
protoOutputStream.writeEnvelope(current.getEnvelopes().get(0));
else
protoOutputStream.writeEnvelope(current);
if (current != null) {
if (current.getEnvelopes().size() == 1) {
protoOutputStream.writeEnvelope(current.getEnvelopes().get(0));
} else {
protoOutputStream.writeEnvelope(current);
}
}
}
}
}, lastSendTimeStamp - now, TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -386,10 +389,10 @@ private boolean violatesThrottleLimit() {
messageTimeStamps.add(now);

// clean list
while(messageTimeStamps.size() > msgThrottlePer10Sec)
while (messageTimeStamps.size() > msgThrottlePer10Sec)
messageTimeStamps.remove(0);

return violatesThrottleLimit(now,1, msgThrottlePerSec) || violatesThrottleLimit(now,10, msgThrottlePer10Sec);
return violatesThrottleLimit(now, 1, msgThrottlePerSec) || violatesThrottleLimit(now, 10, msgThrottlePer10Sec);
}

private boolean violatesThrottleLimit(long now, int seconds, int messageCountLimit) {
Expand All @@ -399,7 +402,7 @@ private boolean violatesThrottleLimit(long now, int seconds, int messageCountLim
long compareValue = messageTimeStamps.get(messageTimeStamps.size() - messageCountLimit);

// if duration < seconds sec we received too much network_messages
if(now - compareValue < TimeUnit.SECONDS.toMillis(seconds)) {
if (now - compareValue < TimeUnit.SECONDS.toMillis(seconds)) {
log.error("violatesThrottleLimit {}/{} second(s)", messageCountLimit, seconds);

return true;
Expand Down Expand Up @@ -436,7 +439,7 @@ public void setPeerType(PeerType peerType) {
this.peerType = peerType;
}

public void setPeersNodeAddress(NodeAddress peerNodeAddress) {
private void setPeersNodeAddress(NodeAddress peerNodeAddress) {
checkNotNull(peerNodeAddress, "peerAddress must not be null");
peersNodeAddressOptional = Optional.of(peerNodeAddress);

Expand Down Expand Up @@ -494,6 +497,7 @@ public void shutDown(CloseConnectionReason closeConnectionReason, @Nullable Runn

stopped = true;

//noinspection UnstableApiUsage
Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
} catch (Throwable t) {
log.error(t.getMessage());
Expand Down Expand Up @@ -534,6 +538,7 @@ private void doShutDown(CloseConnectionReason closeConnectionReason, @Nullable R
e.printStackTrace();
}

//noinspection UnstableApiUsage
MoreExecutors.shutdownAndAwaitTermination(singleThreadExecutor, 500, TimeUnit.MILLISECONDS);

log.debug("Connection shutdown complete " + this.toString());
Expand Down Expand Up @@ -705,7 +710,7 @@ public void run() {
Thread.sleep(20);
}

// Reading the protobuffer message from the inputstream
// Reading the protobuffer message from the inputStream
protobuf.NetworkEnvelope proto = protobuf.NetworkEnvelope.parseDelimitedFrom(protoInputStream);

if (proto == null) {
Expand Down Expand Up @@ -793,7 +798,20 @@ && reportInvalidRequest(RuleViolation.WRONG_NETWORK_ID)) {
if (networkEnvelope instanceof SupportedCapabilitiesMessage) {
Capabilities supportedCapabilities = ((SupportedCapabilitiesMessage) networkEnvelope).getSupportedCapabilities();
if (supportedCapabilities != null) {
capabilities.set(supportedCapabilities);
if (!capabilities.equals(supportedCapabilities)) {
if (!Capabilities.hasMandatoryCapability(capabilities)) {
shutDown(CloseConnectionReason.RULE_VIOLATION);
return;
}

capabilities.set(supportedCapabilities);
capabilitiesListeners.forEach(weakListener -> {
SupportedCapabilitiesListener supportedCapabilitiesListener = weakListener.get();
if (supportedCapabilitiesListener != null) {
UserThread.execute(() -> supportedCapabilitiesListener.onChanged(supportedCapabilities));
}
});
}
}
}

Expand Down
13 changes: 8 additions & 5 deletions p2p/src/main/java/bisq/network/p2p/network/NetworkNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ public abstract class NetworkNode implements MessageListener {
// when the events happen.
abstract public void start(@Nullable SetupListener setupListener);

public SettableFuture<Connection> sendMessage(@NotNull NodeAddress peersNodeAddress, NetworkEnvelope networkEnvelope) {
public SettableFuture<Connection> sendMessage(@NotNull NodeAddress peersNodeAddress,
NetworkEnvelope networkEnvelope) {
log.debug("sendMessage: peersNodeAddress=" + peersNodeAddress + "\n\tmessage=" + Utilities.toTruncatedString(networkEnvelope));
checkNotNull(peersNodeAddress, "peerAddress must not be null");

Expand All @@ -112,9 +113,9 @@ public SettableFuture<Connection> sendMessage(@NotNull NodeAddress peersNodeAddr

final SettableFuture<Connection> resultFuture = SettableFuture.create();
ListenableFuture<Connection> future = executorService.submit(() -> {
Thread.currentThread().setName("NetworkNode:SendMessage-to-" + peersNodeAddress);
Thread.currentThread().setName("NetworkNode:SendMessage-to-" + peersNodeAddress.getFullAddress());

if(peersNodeAddress.equals(getNodeAddress())){
if (peersNodeAddress.equals(getNodeAddress())) {
throw new ConnectException("We do not send a message to ourselves");
}

Expand Down Expand Up @@ -162,7 +163,8 @@ public void onConnection(Connection connection) {
}

@Override
public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) {
public void onDisconnect(CloseConnectionReason closeConnectionReason,
Connection connection) {
log.trace("onDisconnect connectionListener\n\tconnection={}" + connection);
//noinspection SuspiciousMethodCalls
outBoundConnections.remove(connection);
Expand Down Expand Up @@ -264,7 +266,8 @@ public Socks5Proxy getSocksProxy() {
public SettableFuture<Connection> sendMessage(Connection connection, NetworkEnvelope networkEnvelope) {
// connection.sendMessage might take a bit (compression, write to stream), so we use a thread to not block
ListenableFuture<Connection> future = executorService.submit(() -> {
Thread.currentThread().setName("NetworkNode:SendMessage-to-" + connection.getUid());
String id = connection.getPeersNodeAddressOptional().isPresent() ? connection.getPeersNodeAddressOptional().get().getFullAddress() : connection.getUid();
Thread.currentThread().setName("NetworkNode:SendMessage-to-" + id);
connection.sendMessage(networkEnvelope);
return connection;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,9 @@ public Date getDate() {

@Override
public void onChanged(Capabilities supportedCapabilities) {
if (!supportedCapabilities.isEmpty())
if (!supportedCapabilities.isEmpty()) {
capabilities.set(supportedCapabilities);
}
}


Expand Down
Loading

0 comments on commit 4e7cb04

Please sign in to comment.