diff --git a/core/src/main/java/bisq/core/alert/AlertManager.java b/core/src/main/java/bisq/core/alert/AlertManager.java index 05a3872a7ff..0fba35a283f 100644 --- a/core/src/main/java/bisq/core/alert/AlertManager.java +++ b/core/src/main/java/bisq/core/alert/AlertManager.java @@ -44,6 +44,8 @@ import java.math.BigInteger; +import java.util.Collection; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,22 +81,26 @@ public AlertManager(P2PService p2PService, if (!ignoreDevMsg) { p2PService.addHashSetChangedListener(new HashMapChangedListener() { @Override - public void onAdded(ProtectedStorageEntry data) { - final ProtectedStoragePayload protectedStoragePayload = data.getProtectedStoragePayload(); - if (protectedStoragePayload instanceof Alert) { - Alert alert = (Alert) protectedStoragePayload; - if (verifySignature(alert)) - alertMessageProperty.set(alert); - } + public void onAdded(Collection protectedStorageEntries) { + protectedStorageEntries.forEach(protectedStorageEntry -> { + final ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload(); + if (protectedStoragePayload instanceof Alert) { + Alert alert = (Alert) protectedStoragePayload; + if (verifySignature(alert)) + alertMessageProperty.set(alert); + } + }); } @Override - public void onRemoved(ProtectedStorageEntry data) { - final ProtectedStoragePayload protectedStoragePayload = data.getProtectedStoragePayload(); - if (protectedStoragePayload instanceof Alert) { - if (verifySignature((Alert) protectedStoragePayload)) - alertMessageProperty.set(null); - } + public void onRemoved(Collection protectedStorageEntries) { + protectedStorageEntries.forEach(protectedStorageEntry -> { + final ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload(); + if (protectedStoragePayload instanceof Alert) { + if (verifySignature((Alert) protectedStoragePayload)) + alertMessageProperty.set(null); + } + }); } }); } diff --git a/core/src/main/java/bisq/core/dao/governance/proposal/ProposalListPresentation.java b/core/src/main/java/bisq/core/dao/governance/proposal/ProposalListPresentation.java index 9be6942b6c9..49aa219281c 100644 --- a/core/src/main/java/bisq/core/dao/governance/proposal/ProposalListPresentation.java +++ b/core/src/main/java/bisq/core/dao/governance/proposal/ProposalListPresentation.java @@ -20,16 +20,11 @@ import bisq.core.btc.wallet.BsqWalletService; import bisq.core.dao.DaoSetupService; import bisq.core.dao.governance.proposal.storage.appendonly.ProposalPayload; -import bisq.core.dao.governance.proposal.storage.temp.TempProposalPayload; import bisq.core.dao.state.DaoStateListener; import bisq.core.dao.state.DaoStateService; import bisq.core.dao.state.model.blockchain.Block; import bisq.core.dao.state.model.governance.Proposal; -import bisq.network.p2p.storage.HashMapChangedListener; -import bisq.network.p2p.storage.P2PDataStorage; -import bisq.network.p2p.storage.payload.ProtectedStorageEntry; - import bisq.common.UserThread; import org.bitcoinj.core.TransactionConfidence; @@ -55,8 +50,7 @@ * our own proposal that is not critical). Foreign proposals are only shown if confirmed and fully validated. */ @Slf4j -public class ProposalListPresentation implements DaoStateListener, HashMapChangedListener, - MyProposalListService.Listener, DaoSetupService { +public class ProposalListPresentation implements DaoStateListener, MyProposalListService.Listener, DaoSetupService { private final ProposalService proposalService; private final DaoStateService daoStateService; private final MyProposalListService myProposalListService; @@ -66,7 +60,6 @@ public class ProposalListPresentation implements DaoStateListener, HashMapChange @Getter private final FilteredList activeOrMyUnconfirmedProposals = new FilteredList<>(allProposals); private final ListChangeListener proposalListChangeListener; - private boolean tempProposalsChanged; /////////////////////////////////////////////////////////////////////////////////////////// @@ -76,7 +69,6 @@ public class ProposalListPresentation implements DaoStateListener, HashMapChange @Inject public ProposalListPresentation(ProposalService proposalService, DaoStateService daoStateService, - P2PDataStorage p2PDataStorage, MyProposalListService myProposalListService, BsqWalletService bsqWalletService, ProposalValidatorProvider validatorProvider) { @@ -87,7 +79,6 @@ public ProposalListPresentation(ProposalService proposalService, this.validatorProvider = validatorProvider; daoStateService.addDaoStateListener(this); - p2PDataStorage.addHashMapChangedListener(this); myProposalListService.addListener(this); proposalListChangeListener = c -> updateLists(); @@ -124,44 +115,6 @@ public void onParseBlockCompleteAfterBatchProcessing(Block block) { updateLists(); } - - /////////////////////////////////////////////////////////////////////////////////////////// - // HashMapChangedListener - /////////////////////////////////////////////////////////////////////////////////////////// - - @Override - public void onAdded(ProtectedStorageEntry entry) { - if (entry.getProtectedStoragePayload() instanceof TempProposalPayload) { - tempProposalsChanged = true; - } - } - - @Override - public void onRemoved(ProtectedStorageEntry entry) { - if (entry.getProtectedStoragePayload() instanceof TempProposalPayload) { - tempProposalsChanged = true; - } - } - - @Override - public void onBatchRemoveExpiredDataStarted() { - // We temporary remove the listener when batch processing starts to avoid that we rebuild our lists at each - // remove call. After batch processing at onBatchRemoveExpiredDataCompleted we add again our listener and call - // the updateLists method. - proposalService.getTempProposals().removeListener(proposalListChangeListener); - } - - @Override - public void onBatchRemoveExpiredDataCompleted() { - proposalService.getTempProposals().addListener(proposalListChangeListener); - // We only call updateLists if tempProposals have changed. updateLists() is an expensive call and takes 200 ms. - if (tempProposalsChanged) { - updateLists(); - tempProposalsChanged = false; - } - } - - /////////////////////////////////////////////////////////////////////////////////////////// // MyProposalListService.Listener /////////////////////////////////////////////////////////////////////////////////////////// diff --git a/core/src/main/java/bisq/core/dao/governance/proposal/ProposalService.java b/core/src/main/java/bisq/core/dao/governance/proposal/ProposalService.java index 6da0d387516..7953d403b66 100644 --- a/core/src/main/java/bisq/core/dao/governance/proposal/ProposalService.java +++ b/core/src/main/java/bisq/core/dao/governance/proposal/ProposalService.java @@ -50,6 +50,8 @@ import javafx.collections.FXCollections; import javafx.collections.ObservableList; +import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; @@ -133,13 +135,15 @@ public void start() { /////////////////////////////////////////////////////////////////////////////////////////// @Override - public void onAdded(ProtectedStorageEntry entry) { - onProtectedDataAdded(entry, true); + public void onAdded(Collection protectedStorageEntries) { + protectedStorageEntries.forEach(protectedStorageEntry -> { + onProtectedDataAdded(protectedStorageEntry, true); + }); } @Override - public void onRemoved(ProtectedStorageEntry entry) { - onProtectedDataRemoved(entry); + public void onRemoved(Collection protectedStorageEntries) { + onProtectedDataRemoved(protectedStorageEntries); } @@ -266,30 +270,39 @@ private void onProtectedDataAdded(ProtectedStorageEntry entry, boolean fromBroad } } - private void onProtectedDataRemoved(ProtectedStorageEntry entry) { - ProtectedStoragePayload protectedStoragePayload = entry.getProtectedStoragePayload(); - if (protectedStoragePayload instanceof TempProposalPayload) { - Proposal proposal = ((TempProposalPayload) protectedStoragePayload).getProposal(); - // We allow removal only if we are in the proposal phase. - boolean inPhase = periodService.isInPhase(daoStateService.getChainHeight(), DaoPhase.Phase.PROPOSAL); - boolean txInPastCycle = periodService.isTxInPastCycle(proposal.getTxId(), daoStateService.getChainHeight()); - Optional tx = daoStateService.getTx(proposal.getTxId()); - boolean unconfirmedOrNonBsqTx = !tx.isPresent(); - // if the tx is unconfirmed we need to be in the PROPOSAL phase, otherwise the tx must be confirmed. - if (inPhase || txInPastCycle || unconfirmedOrNonBsqTx) { - if (tempProposals.contains(proposal)) { - tempProposals.remove(proposal); - log.debug("We received a remove request for a TempProposalPayload and have removed the proposal " + - "from our list. proposal creation date={}, proposalTxId={}, inPhase={}, " + - "txInPastCycle={}, unconfirmedOrNonBsqTx={}", - proposal.getCreationDateAsDate(), proposal.getTxId(), inPhase, txInPastCycle, unconfirmedOrNonBsqTx); + private void onProtectedDataRemoved(Collection protectedStorageEntries) { + + // The listeners of tmpProposals can do large amounts of work that cause performance issues. Apply all of the + // updates at once using retainAll which will cause all listeners to be updated only once. + ArrayList tempProposalsWithUpdates = new ArrayList<>(tempProposals); + + protectedStorageEntries.forEach(protectedStorageEntry -> { + ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload(); + if (protectedStoragePayload instanceof TempProposalPayload) { + Proposal proposal = ((TempProposalPayload) protectedStoragePayload).getProposal(); + // We allow removal only if we are in the proposal phase. + boolean inPhase = periodService.isInPhase(daoStateService.getChainHeight(), DaoPhase.Phase.PROPOSAL); + boolean txInPastCycle = periodService.isTxInPastCycle(proposal.getTxId(), daoStateService.getChainHeight()); + Optional tx = daoStateService.getTx(proposal.getTxId()); + boolean unconfirmedOrNonBsqTx = !tx.isPresent(); + // if the tx is unconfirmed we need to be in the PROPOSAL phase, otherwise the tx must be confirmed. + if (inPhase || txInPastCycle || unconfirmedOrNonBsqTx) { + if (tempProposalsWithUpdates.contains(proposal)) { + tempProposalsWithUpdates.remove(proposal); + log.debug("We received a remove request for a TempProposalPayload and have removed the proposal " + + "from our list. proposal creation date={}, proposalTxId={}, inPhase={}, " + + "txInPastCycle={}, unconfirmedOrNonBsqTx={}", + proposal.getCreationDateAsDate(), proposal.getTxId(), inPhase, txInPastCycle, unconfirmedOrNonBsqTx); + } + } else { + log.warn("We received a remove request outside the PROPOSAL phase. " + + "Proposal creation date={}, proposal.txId={}, current blockHeight={}", + proposal.getCreationDateAsDate(), proposal.getTxId(), daoStateService.getChainHeight()); } - } else { - log.warn("We received a remove request outside the PROPOSAL phase. " + - "Proposal creation date={}, proposal.txId={}, current blockHeight={}", - proposal.getCreationDateAsDate(), proposal.getTxId(), daoStateService.getChainHeight()); } - } + }); + + tempProposals.retainAll(tempProposalsWithUpdates); } private void onAppendOnlyDataAdded(PersistableNetworkPayload persistableNetworkPayload, boolean fromBroadcastMessage) { diff --git a/core/src/main/java/bisq/core/dao/governance/proposal/storage/temp/TempProposalStore.java b/core/src/main/java/bisq/core/dao/governance/proposal/storage/temp/TempProposalStore.java index 81ef3c05908..169ba028a6b 100644 --- a/core/src/main/java/bisq/core/dao/governance/proposal/storage/temp/TempProposalStore.java +++ b/core/src/main/java/bisq/core/dao/governance/proposal/storage/temp/TempProposalStore.java @@ -56,7 +56,7 @@ public class TempProposalStore implements PersistableEnvelope { /////////////////////////////////////////////////////////////////////////////////////////// private TempProposalStore(List list) { - list.forEach(entry -> map.put(P2PDataStorage.getCompactHashAsByteArray(entry.getProtectedStoragePayload()), entry)); + list.forEach(entry -> map.put(P2PDataStorage.get32ByteHashAsByteArray(entry.getProtectedStoragePayload()), entry)); } public Message toProtoMessage() { diff --git a/core/src/main/java/bisq/core/filter/FilterManager.java b/core/src/main/java/bisq/core/filter/FilterManager.java index 86687d92c37..c32c27b07b7 100644 --- a/core/src/main/java/bisq/core/filter/FilterManager.java +++ b/core/src/main/java/bisq/core/filter/FilterManager.java @@ -52,6 +52,7 @@ import java.math.BigInteger; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Optional; import java.util.concurrent.CopyOnWriteArrayList; @@ -133,23 +134,27 @@ public void onAllServicesInitialized() { p2PService.addHashSetChangedListener(new HashMapChangedListener() { @Override - public void onAdded(ProtectedStorageEntry data) { - if (data.getProtectedStoragePayload() instanceof Filter) { - Filter filter = (Filter) data.getProtectedStoragePayload(); - boolean wasValid = addFilter(filter); - if (!wasValid) { - UserThread.runAfter(() -> p2PService.getP2PDataStorage().removeInvalidProtectedStorageEntry(data), 1); + public void onAdded(Collection protectedStorageEntries) { + protectedStorageEntries.forEach(protectedStorageEntry -> { + if (protectedStorageEntry.getProtectedStoragePayload() instanceof Filter) { + Filter filter = (Filter) protectedStorageEntry.getProtectedStoragePayload(); + boolean wasValid = addFilter(filter); + if (!wasValid) { + UserThread.runAfter(() -> p2PService.getP2PDataStorage().removeInvalidProtectedStorageEntry(protectedStorageEntry), 1); + } } - } + }); } @Override - public void onRemoved(ProtectedStorageEntry data) { - if (data.getProtectedStoragePayload() instanceof Filter) { - Filter filter = (Filter) data.getProtectedStoragePayload(); - if (verifySignature(filter)) - resetFilters(); - } + public void onRemoved(Collection protectedStorageEntries) { + protectedStorageEntries.forEach(protectedStorageEntry -> { + if (protectedStorageEntry.getProtectedStoragePayload() instanceof Filter) { + Filter filter = (Filter) protectedStorageEntry.getProtectedStoragePayload(); + if (verifySignature(filter)) + resetFilters(); + } + }); } }); } diff --git a/core/src/main/java/bisq/core/offer/OfferBookService.java b/core/src/main/java/bisq/core/offer/OfferBookService.java index 5ebe152453c..e9b36e89219 100644 --- a/core/src/main/java/bisq/core/offer/OfferBookService.java +++ b/core/src/main/java/bisq/core/offer/OfferBookService.java @@ -40,6 +40,7 @@ import java.io.File; +import java.util.Collection; import java.util.LinkedList; import java.util.List; import java.util.Objects; @@ -87,26 +88,30 @@ public OfferBookService(P2PService p2PService, p2PService.addHashSetChangedListener(new HashMapChangedListener() { @Override - public void onAdded(ProtectedStorageEntry data) { - offerBookChangedListeners.stream().forEach(listener -> { - if (data.getProtectedStoragePayload() instanceof OfferPayload) { - OfferPayload offerPayload = (OfferPayload) data.getProtectedStoragePayload(); - Offer offer = new Offer(offerPayload); - offer.setPriceFeedService(priceFeedService); - listener.onAdded(offer); - } + public void onAdded(Collection protectedStorageEntries) { + protectedStorageEntries.forEach(protectedStorageEntry -> { + offerBookChangedListeners.stream().forEach(listener -> { + if (protectedStorageEntry.getProtectedStoragePayload() instanceof OfferPayload) { + OfferPayload offerPayload = (OfferPayload) protectedStorageEntry.getProtectedStoragePayload(); + Offer offer = new Offer(offerPayload); + offer.setPriceFeedService(priceFeedService); + listener.onAdded(offer); + } + }); }); } @Override - public void onRemoved(ProtectedStorageEntry data) { - offerBookChangedListeners.stream().forEach(listener -> { - if (data.getProtectedStoragePayload() instanceof OfferPayload) { - OfferPayload offerPayload = (OfferPayload) data.getProtectedStoragePayload(); - Offer offer = new Offer(offerPayload); - offer.setPriceFeedService(priceFeedService); - listener.onRemoved(offer); - } + public void onRemoved(Collection protectedStorageEntries) { + protectedStorageEntries.forEach(protectedStorageEntry -> { + offerBookChangedListeners.stream().forEach(listener -> { + if (protectedStorageEntry.getProtectedStoragePayload() instanceof OfferPayload) { + OfferPayload offerPayload = (OfferPayload) protectedStorageEntry.getProtectedStoragePayload(); + Offer offer = new Offer(offerPayload); + offer.setPriceFeedService(priceFeedService); + listener.onRemoved(offer); + } + }); }); } }); diff --git a/core/src/main/java/bisq/core/support/dispute/agent/DisputeAgentManager.java b/core/src/main/java/bisq/core/support/dispute/agent/DisputeAgentManager.java index 16cc1b38e87..ac6480bb1b2 100644 --- a/core/src/main/java/bisq/core/support/dispute/agent/DisputeAgentManager.java +++ b/core/src/main/java/bisq/core/support/dispute/agent/DisputeAgentManager.java @@ -46,6 +46,7 @@ import java.math.BigInteger; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -131,18 +132,22 @@ public DisputeAgentManager(KeyRing keyRing, public void onAllServicesInitialized() { disputeAgentService.addHashSetChangedListener(new HashMapChangedListener() { @Override - public void onAdded(ProtectedStorageEntry data) { - if (isExpectedInstance(data)) { - updateMap(); - } + public void onAdded(Collection protectedStorageEntries) { + protectedStorageEntries.forEach(protectedStorageEntry -> { + if (isExpectedInstance(protectedStorageEntry)) { + updateMap(); + } + }); } @Override - public void onRemoved(ProtectedStorageEntry data) { - if (isExpectedInstance(data)) { - updateMap(); - removeAcceptedDisputeAgentFromUser(data); - } + public void onRemoved(Collection protectedStorageEntries) { + protectedStorageEntries.forEach(protectedStorageEntry -> { + if (isExpectedInstance(protectedStorageEntry)) { + updateMap(); + removeAcceptedDisputeAgentFromUser(protectedStorageEntry); + } + }); } }); diff --git a/core/src/test/java/bisq/core/dao/governance/proposal/ProposalServiceP2PDataStorageListenerTest.java b/core/src/test/java/bisq/core/dao/governance/proposal/ProposalServiceP2PDataStorageListenerTest.java new file mode 100644 index 00000000000..7945a241b28 --- /dev/null +++ b/core/src/test/java/bisq/core/dao/governance/proposal/ProposalServiceP2PDataStorageListenerTest.java @@ -0,0 +1,127 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.core.dao.governance.proposal; + +import bisq.core.dao.governance.period.PeriodService; +import bisq.core.dao.governance.proposal.storage.appendonly.ProposalStorageService; +import bisq.core.dao.governance.proposal.storage.temp.TempProposalPayload; +import bisq.core.dao.governance.proposal.storage.temp.TempProposalStorageService; +import bisq.core.dao.state.DaoStateService; +import bisq.core.dao.state.model.governance.DaoPhase; +import bisq.core.dao.state.model.governance.Proposal; + +import bisq.network.p2p.P2PService; +import bisq.network.p2p.storage.payload.ProtectedStorageEntry; +import bisq.network.p2p.storage.persistence.AppendOnlyDataStoreService; +import bisq.network.p2p.storage.persistence.ProtectedDataStoreService; + +import javafx.collections.ListChangeListener; + +import java.util.Arrays; +import java.util.Collections; + +import org.junit.Before; +import org.junit.Test; + +import static org.mockito.Mockito.*; + +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + + +/** + * Tests of the P2PDataStorage::onRemoved callback behavior to ensure that the proper number of signal events occur. + */ +public class ProposalServiceP2PDataStorageListenerTest { + private ProposalService proposalService; + + @Mock + private PeriodService periodService; + + @Mock + private DaoStateService daoStateService; + + @Mock + private ListChangeListener tempProposalListener; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + + this.proposalService = new ProposalService( + mock(P2PService.class), + this.periodService, + mock(ProposalStorageService.class), + mock(TempProposalStorageService.class), + mock(AppendOnlyDataStoreService.class), + mock(ProtectedDataStoreService.class), + this.daoStateService, + mock(ProposalValidatorProvider.class), + true); + + // Create a state so that all added/removed Proposals will actually update the tempProposals list. + when(this.periodService.isInPhase(anyInt(), any(DaoPhase.Phase.class))).thenReturn(true); + when(this.daoStateService.isParseBlockChainComplete()).thenReturn(false); + } + + private static ProtectedStorageEntry buildProtectedStorageEntry() { + ProtectedStorageEntry protectedStorageEntry = mock(ProtectedStorageEntry.class); + TempProposalPayload tempProposalPayload = mock(TempProposalPayload.class); + Proposal tempProposal = mock(Proposal.class); + when(protectedStorageEntry.getProtectedStoragePayload()).thenReturn(tempProposalPayload); + when(tempProposalPayload.getProposal()).thenReturn(tempProposal); + + return protectedStorageEntry; + } + + // TESTCASE: If an onRemoved callback is called which does not remove anything the tempProposals listeners + // are not signaled. + @Test + public void onRemoved_noSignalIfNoChange() { + this.proposalService.onRemoved(Collections.singletonList(mock(ProtectedStorageEntry.class))); + + verify(this.tempProposalListener, never()).onChanged(any()); + } + + // TESTCASE: If an onRemoved callback is called with 1 element AND it creates a remove of 1 element, the tempProposal + // listeners are signaled once. + @Test + public void onRemoved_signalOnceOnOneChange() { + ProtectedStorageEntry one = buildProtectedStorageEntry(); + this.proposalService.onAdded(Collections.singletonList(one)); + this.proposalService.getTempProposals().addListener(this.tempProposalListener); + + this.proposalService.onRemoved(Collections.singletonList(one)); + + verify(this.tempProposalListener).onChanged(any()); + } + + // TESTCASE: If an onRemoved callback is called with 2 elements AND it creates a remove of 2 elements, the + // tempProposal listeners are signaled once. + @Test + public void onRemoved_signalOnceOnMultipleChanges() { + ProtectedStorageEntry one = buildProtectedStorageEntry(); + ProtectedStorageEntry two = buildProtectedStorageEntry(); + this.proposalService.onAdded(Arrays.asList(one, two)); + this.proposalService.getTempProposals().addListener(this.tempProposalListener); + + this.proposalService.onRemoved(Arrays.asList(one, two)); + + verify(this.tempProposalListener).onChanged(any()); + } +} diff --git a/p2p/src/main/java/bisq/network/p2p/P2PModule.java b/p2p/src/main/java/bisq/network/p2p/P2PModule.java index 6356515f18b..d1f3aeb22c5 100644 --- a/p2p/src/main/java/bisq/network/p2p/P2PModule.java +++ b/p2p/src/main/java/bisq/network/p2p/P2PModule.java @@ -105,5 +105,6 @@ protected void configure() { bindConstant().annotatedWith(named(NetworkOptionKeys.MSG_THROTTLE_PER_10_SEC)).to(environment.getRequiredProperty(NetworkOptionKeys.MSG_THROTTLE_PER_10_SEC)); bindConstant().annotatedWith(named(NetworkOptionKeys.SEND_MSG_THROTTLE_TRIGGER)).to(environment.getRequiredProperty(NetworkOptionKeys.SEND_MSG_THROTTLE_TRIGGER)); bindConstant().annotatedWith(named(NetworkOptionKeys.SEND_MSG_THROTTLE_SLEEP)).to(environment.getRequiredProperty(NetworkOptionKeys.SEND_MSG_THROTTLE_SLEEP)); + bindConstant().annotatedWith(named("MAX_SEQUENCE_NUMBER_MAP_SIZE_BEFORE_PURGE")).to(1000); } } diff --git a/p2p/src/main/java/bisq/network/p2p/P2PService.java b/p2p/src/main/java/bisq/network/p2p/P2PService.java index d90fdeb2565..733cc385cce 100644 --- a/p2p/src/main/java/bisq/network/p2p/P2PService.java +++ b/p2p/src/main/java/bisq/network/p2p/P2PService.java @@ -75,6 +75,7 @@ import java.security.PublicKey; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -432,15 +433,15 @@ public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) { /////////////////////////////////////////////////////////////////////////////////////////// @Override - public void onAdded(ProtectedStorageEntry protectedStorageEntry) { - if (protectedStorageEntry instanceof ProtectedMailboxStorageEntry) - processMailboxEntry((ProtectedMailboxStorageEntry) protectedStorageEntry); + public void onAdded(Collection protectedStorageEntries) { + protectedStorageEntries.forEach(protectedStorageEntry -> { + if (protectedStorageEntry instanceof ProtectedMailboxStorageEntry) + processMailboxEntry((ProtectedMailboxStorageEntry) protectedStorageEntry); + }); } @Override - public void onRemoved(ProtectedStorageEntry data) { - } - + public void onRemoved(Collection protectedStorageEntries) { } /////////////////////////////////////////////////////////////////////////////////////////// // DirectMessages diff --git a/p2p/src/main/java/bisq/network/p2p/peers/getdata/RequestDataHandler.java b/p2p/src/main/java/bisq/network/p2p/peers/getdata/RequestDataHandler.java index f0c972780e3..a116d171b7d 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/getdata/RequestDataHandler.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/getdata/RequestDataHandler.java @@ -132,7 +132,7 @@ void requestData(NodeAddress nodeAddress, boolean isPreliminaryDataRequest) { .map(e -> e.bytes) .collect(Collectors.toSet()); - Set excludedKeysFromPersistedEntryMap = dataStorage.getProtectedDataStoreMap().keySet() + Set excludedKeysFromPersistedEntryMap = dataStorage.getMap().keySet() .stream() .map(e -> e.bytes) .collect(Collectors.toSet()); diff --git a/p2p/src/main/java/bisq/network/p2p/storage/HashMapChangedListener.java b/p2p/src/main/java/bisq/network/p2p/storage/HashMapChangedListener.java index a3ac1c20258..ce483889703 100644 --- a/p2p/src/main/java/bisq/network/p2p/storage/HashMapChangedListener.java +++ b/p2p/src/main/java/bisq/network/p2p/storage/HashMapChangedListener.java @@ -19,17 +19,11 @@ import bisq.network.p2p.storage.payload.ProtectedStorageEntry; +import java.util.Collection; + public interface HashMapChangedListener { - void onAdded(ProtectedStorageEntry data); + void onAdded(Collection protectedStorageEntries); @SuppressWarnings("UnusedParameters") - void onRemoved(ProtectedStorageEntry data); - - // We process all expired entries after a delay (60 s) after onBootstrapComplete. - // We notify listeners of start and completion so they can optimize to only update after batch processing is done. - default void onBatchRemoveExpiredDataStarted() { - } - - default void onBatchRemoveExpiredDataCompleted() { - } + void onRemoved(Collection protectedStorageEntries); } diff --git a/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java b/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java index 04313763d17..fc5a2b1d197 100644 --- a/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java +++ b/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java @@ -63,9 +63,13 @@ import com.google.protobuf.ByteString; +import com.google.inject.name.Named; + import javax.inject.Inject; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Maps; + import java.security.KeyPair; import java.security.PublicKey; @@ -73,6 +77,8 @@ import java.time.Clock; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.Comparator; import java.util.Date; import java.util.HashMap; @@ -115,13 +121,17 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers private Timer removeExpiredEntriesTimer; private final Storage sequenceNumberMapStorage; - private final SequenceNumberMap sequenceNumberMap = new SequenceNumberMap(); + + @VisibleForTesting + final SequenceNumberMap sequenceNumberMap = new SequenceNumberMap(); private final Set appendOnlyDataStoreListeners = new CopyOnWriteArraySet<>(); private final Set protectedDataStoreListeners = new CopyOnWriteArraySet<>(); private final Clock clock; - protected int maxSequenceNumberMapSizeBeforePurge; + /// The maximum number of items that must exist in the SequenceNumberMap before it is scheduled for a purge + /// which removes entries after PURGE_AGE_DAYS. + private final int maxSequenceNumberMapSizeBeforePurge; /////////////////////////////////////////////////////////////////////////////////////////// // Constructor @@ -134,12 +144,14 @@ public P2PDataStorage(NetworkNode networkNode, ProtectedDataStoreService protectedDataStoreService, ResourceDataStoreService resourceDataStoreService, Storage sequenceNumberMapStorage, - Clock clock) { + Clock clock, + @Named("MAX_SEQUENCE_NUMBER_MAP_SIZE_BEFORE_PURGE") int maxSequenceNumberBeforePurge) { this.broadcaster = broadcaster; this.appendOnlyDataStoreService = appendOnlyDataStoreService; this.protectedDataStoreService = protectedDataStoreService; this.resourceDataStoreService = resourceDataStoreService; this.clock = clock; + this.maxSequenceNumberMapSizeBeforePurge = maxSequenceNumberBeforePurge; networkNode.addMessageListener(this); @@ -147,7 +159,6 @@ public P2PDataStorage(NetworkNode networkNode, this.sequenceNumberMapStorage = sequenceNumberMapStorage; sequenceNumberMapStorage.setNumMaxBackupFiles(5); - this.maxSequenceNumberMapSizeBeforePurge = 1000; } @Override @@ -191,17 +202,13 @@ void removeExpiredEntries() { .filter(entry -> entry.getValue().isExpired(this.clock)) .collect(Collectors.toCollection(ArrayList::new)); - // Batch processing can cause performance issues, so we give listeners a chance to deal with it by notifying - // about start and end of iteration. - hashMapChangedListeners.forEach(HashMapChangedListener::onBatchRemoveExpiredDataStarted); - toRemoveList.forEach(mapEntry -> { - ProtectedStorageEntry protectedStorageEntry = mapEntry.getValue(); - ByteArray payloadHash = mapEntry.getKey(); - - log.debug("We found an expired data entry. We remove the protectedData:\n\t" + Utilities.toTruncatedString(protectedStorageEntry)); - removeFromMapAndDataStore(protectedStorageEntry, payloadHash); + // Batch processing can cause performance issues, so do all of the removes first, then update the listeners + // to let them know about the removes. + toRemoveList.forEach(toRemoveItem -> { + log.debug("We found an expired data entry. We remove the protectedData:\n\t" + + Utilities.toTruncatedString(toRemoveItem.getValue())); }); - hashMapChangedListeners.forEach(HashMapChangedListener::onBatchRemoveExpiredDataCompleted); + removeFromMapAndDataStore(toRemoveList); if (sequenceNumberMap.size() > this.maxSequenceNumberMapSizeBeforePurge) sequenceNumberMap.setMap(getPurgedSequenceNumberMap(sequenceNumberMap.getMap())); @@ -215,11 +222,6 @@ public Map getAppendOnlyDataStoreMap() { return appendOnlyDataStoreService.getMap(); } - public Map getProtectedDataStoreMap() { - return protectedDataStoreService.getMap(); - } - - /////////////////////////////////////////////////////////////////////////////////////////// // MessageListener implementation /////////////////////////////////////////////////////////////////////////////////////////// @@ -404,7 +406,7 @@ public boolean addProtectedStorageEntry(ProtectedStorageEntry protectedStorageEn // This is an updated entry. Record it and signal listeners. map.put(hashOfPayload, protectedStorageEntry); - hashMapChangedListeners.forEach(e -> e.onAdded(protectedStorageEntry)); + hashMapChangedListeners.forEach(e -> e.onAdded(Collections.singletonList(protectedStorageEntry))); // Record the updated sequence number and persist it. Higher delay so we can batch more items. sequenceNumberMap.put(hashOfPayload, new MapValue(protectedStorageEntry.getSequenceNumber(), this.clock.millis())); @@ -416,8 +418,7 @@ public boolean addProtectedStorageEntry(ProtectedStorageEntry protectedStorageEn // Persist ProtectedStorageEntrys carrying PersistablePayload payloads and signal listeners on changes if (protectedStoragePayload instanceof PersistablePayload) { - ByteArray compactHash = P2PDataStorage.getCompactHashAsByteArray(protectedStoragePayload); - ProtectedStorageEntry previous = protectedDataStoreService.putIfAbsent(compactHash, protectedStorageEntry); + ProtectedStorageEntry previous = protectedDataStoreService.putIfAbsent(hashOfPayload, protectedStorageEntry); if (previous == null) protectedDataStoreListeners.forEach(e -> e.onAdded(protectedStorageEntry)); } @@ -637,19 +638,35 @@ public void removeProtectedDataStoreListener(ProtectedDataStoreListener listener /////////////////////////////////////////////////////////////////////////////////////////// private void removeFromMapAndDataStore(ProtectedStorageEntry protectedStorageEntry, ByteArray hashOfPayload) { - map.remove(hashOfPayload); - hashMapChangedListeners.forEach(e -> e.onRemoved(protectedStorageEntry)); + removeFromMapAndDataStore(Collections.singletonList(Maps.immutableEntry(hashOfPayload, protectedStorageEntry))); + } - ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload(); - if (protectedStoragePayload instanceof PersistablePayload) { - ByteArray compactHash = getCompactHashAsByteArray(protectedStoragePayload); - ProtectedStorageEntry previous = protectedDataStoreService.remove(compactHash, protectedStorageEntry); - if (previous != null) { - protectedDataStoreListeners.forEach(e -> e.onRemoved(protectedStorageEntry)); - } else { - log.info("We cannot remove the protectedStorageEntry from the persistedEntryMap as it does not exist."); + private void removeFromMapAndDataStore( + Collection> entriesToRemoveWithPayloadHash) { + + if (entriesToRemoveWithPayloadHash.isEmpty()) + return; + + ArrayList entriesForSignal = new ArrayList<>(entriesToRemoveWithPayloadHash.size()); + entriesToRemoveWithPayloadHash.forEach(entryToRemoveWithPayloadHash -> { + ByteArray hashOfPayload = entryToRemoveWithPayloadHash.getKey(); + ProtectedStorageEntry protectedStorageEntry = entryToRemoveWithPayloadHash.getValue(); + + map.remove(hashOfPayload); + entriesForSignal.add(protectedStorageEntry); + + ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload(); + if (protectedStoragePayload instanceof PersistablePayload) { + ProtectedStorageEntry previous = protectedDataStoreService.remove(hashOfPayload, protectedStorageEntry); + if (previous != null) { + protectedDataStoreListeners.forEach(e -> e.onRemoved(protectedStorageEntry)); + } else { + log.info("We cannot remove the protectedStorageEntry from the persistedEntryMap as it does not exist."); + } } - } + }); + + hashMapChangedListeners.forEach(e -> e.onRemoved(entriesForSignal)); } private boolean hasSequenceNrIncreased(int newSequenceNumber, ByteArray hashOfData) { @@ -690,14 +707,6 @@ public static ByteArray get32ByteHashAsByteArray(NetworkPayload data) { return new ByteArray(P2PDataStorage.get32ByteHash(data)); } - public static ByteArray getCompactHashAsByteArray(ProtectedStoragePayload protectedStoragePayload) { - return new ByteArray(getCompactHash(protectedStoragePayload)); - } - - private static byte[] getCompactHash(ProtectedStoragePayload protectedStoragePayload) { - return Hash.getSha256Ripemd160hash(protectedStoragePayload.toProtoMessage().toByteArray()); - } - // Get a new map with entries older than PURGE_AGE_DAYS purged from the given map. private Map getPurgedSequenceNumberMap(Map persisted) { Map purged = new HashMap<>(); diff --git a/p2p/src/test/java/bisq/network/p2p/storage/P2PDataStoragePersistableNetworkPayloadTest.java b/p2p/src/test/java/bisq/network/p2p/storage/P2PDataStoragePersistableNetworkPayloadTest.java index c8a3d9c6134..9213bcb0fbe 100644 --- a/p2p/src/test/java/bisq/network/p2p/storage/P2PDataStoragePersistableNetworkPayloadTest.java +++ b/p2p/src/test/java/bisq/network/p2p/storage/P2PDataStoragePersistableNetworkPayloadTest.java @@ -55,6 +55,7 @@ * 2 & 3 Client API [addPersistableNetworkPayload(reBroadcast=(true && false))] * 4. onMessage() [onMessage(AddPersistableNetworkPayloadMessage)] */ +@SuppressWarnings("unused") public class P2PDataStoragePersistableNetworkPayloadTest { @RunWith(Parameterized.class) diff --git a/p2p/src/test/java/bisq/network/p2p/storage/P2PDataStorageProtectedStorageEntryTest.java b/p2p/src/test/java/bisq/network/p2p/storage/P2PDataStorageProtectedStorageEntryTest.java index 2aa8bc03603..dda63aa1dfa 100644 --- a/p2p/src/test/java/bisq/network/p2p/storage/P2PDataStorageProtectedStorageEntryTest.java +++ b/p2p/src/test/java/bisq/network/p2p/storage/P2PDataStorageProtectedStorageEntryTest.java @@ -38,6 +38,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Optional; import org.junit.Assert; @@ -65,6 +66,7 @@ * 1. Client API [addProtectedStorageEntry(), refreshTTL(), remove()] * 2. onMessage() [AddDataMessage, RefreshOfferMessage, RemoveDataMessage] */ +@SuppressWarnings("unused") public class P2PDataStorageProtectedStorageEntryTest { @RunWith(Parameterized.class) abstract public static class ProtectedStorageEntryTestBase { @@ -570,6 +572,21 @@ protected Class getEntryClass() { return ProtectedStorageEntry.class; } + + // Tests that just apply to PersistablePayload objects + + // TESTCASE: Ensure the HashMap is the same before and after a restart + @Test + public void addProtectedStorageEntry_afterReadFromResourcesWithDuplicate_3629RegressionTest() { + ProtectedStorageEntry protectedStorageEntry = this.getProtectedStorageEntryForAdd(1); + doProtectedStorageAddAndVerify(protectedStorageEntry, true, true); + + Map beforeRestart = this.testState.mockedStorage.getMap(); + + this.testState.simulateRestart(); + + Assert.assertEquals(beforeRestart, this.testState.mockedStorage.getMap()); + } } /** diff --git a/p2p/src/test/java/bisq/network/p2p/storage/P2PDataStorageRemoveExpiredTest.java b/p2p/src/test/java/bisq/network/p2p/storage/P2PDataStorageRemoveExpiredTest.java index 7814004cd2e..1a30e2cc789 100644 --- a/p2p/src/test/java/bisq/network/p2p/storage/P2PDataStorageRemoveExpiredTest.java +++ b/p2p/src/test/java/bisq/network/p2p/storage/P2PDataStorageRemoveExpiredTest.java @@ -32,6 +32,7 @@ import java.security.KeyPair; import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; import java.util.concurrent.TimeUnit; import org.junit.Assert; @@ -149,17 +150,21 @@ public void removeExpiredEntries_ExpiresExpiredPersistableExpirableEntries() thr public void removeExpiredEntries_PurgeSeqNrMap() throws CryptoException, NoSuchAlgorithmException { final int initialClockIncrement = 5; + ArrayList expectedRemoves = new ArrayList<>(); + // Add 4 entries to our sequence number map that will be purged KeyPair purgedOwnerKeys = TestUtils.generateKeyPair(); ProtectedStoragePayload purgedProtectedStoragePayload = new PersistableExpirableProtectedStoragePayloadStub(purgedOwnerKeys.getPublic(), 0); ProtectedStorageEntry purgedProtectedStorageEntry = testState.mockedStorage.getProtectedStorageEntry(purgedProtectedStoragePayload, purgedOwnerKeys); + expectedRemoves.add(purgedProtectedStorageEntry); Assert.assertTrue(testState.mockedStorage.addProtectedStorageEntry(purgedProtectedStorageEntry, TestState.getTestNodeAddress(), null, true)); - for (int i = 0; i < 4; ++i) { + for (int i = 0; i < MAX_SEQUENCE_NUMBER_MAP_SIZE_BEFORE_PURGE - 1; ++i) { KeyPair ownerKeys = TestUtils.generateKeyPair(); ProtectedStoragePayload protectedStoragePayload = new PersistableExpirableProtectedStoragePayloadStub(ownerKeys.getPublic(), 0); ProtectedStorageEntry tmpEntry = testState.mockedStorage.getProtectedStorageEntry(protectedStoragePayload, ownerKeys); + expectedRemoves.add(tmpEntry); Assert.assertTrue(testState.mockedStorage.addProtectedStorageEntry(tmpEntry, TestState.getTestNodeAddress(), null, true)); } @@ -171,6 +176,7 @@ public void removeExpiredEntries_PurgeSeqNrMap() throws CryptoException, NoSuchA KeyPair keepOwnerKeys = TestUtils.generateKeyPair(); ProtectedStoragePayload keepProtectedStoragePayload = new PersistableExpirableProtectedStoragePayloadStub(keepOwnerKeys.getPublic(), 0); ProtectedStorageEntry keepProtectedStorageEntry = testState.mockedStorage.getProtectedStorageEntry(keepProtectedStoragePayload, keepOwnerKeys); + expectedRemoves.add(keepProtectedStorageEntry); Assert.assertTrue(testState.mockedStorage.addProtectedStorageEntry(keepProtectedStorageEntry, TestState.getTestNodeAddress(), null, true)); @@ -178,17 +184,17 @@ public void removeExpiredEntries_PurgeSeqNrMap() throws CryptoException, NoSuchA // Advance time past it so they will be valid purge targets this.testState.clockFake.increment(TimeUnit.DAYS.toMillis(P2PDataStorage.PURGE_AGE_DAYS + 1 - initialClockIncrement)); - // The first entry (11 days old) should be purged + // The first 4 entries (11 days old) should be purged from the SequenceNumberMap SavedTestState beforeState = this.testState.saveTestState(purgedProtectedStorageEntry); this.testState.mockedStorage.removeExpiredEntries(); - this.testState.verifyProtectedStorageRemove(beforeState, purgedProtectedStorageEntry, true, false, false, false); + this.testState.verifyProtectedStorageRemove(beforeState, expectedRemoves, true, false, false, false); // Which means that an addition of a purged entry should succeed. beforeState = this.testState.saveTestState(purgedProtectedStorageEntry); Assert.assertTrue(this.testState.mockedStorage.addProtectedStorageEntry(purgedProtectedStorageEntry, TestState.getTestNodeAddress(), null, false)); this.testState.verifyProtectedStorageAdd(beforeState, purgedProtectedStorageEntry, true, false); - // The second entry (5 days old) should still exist which means trying to add it again should fail. + // The last entry (5 days old) should still exist in the SequenceNumberMap which means trying to add it again should fail. beforeState = this.testState.saveTestState(keepProtectedStorageEntry); Assert.assertFalse(this.testState.mockedStorage.addProtectedStorageEntry(keepProtectedStorageEntry, TestState.getTestNodeAddress(), null, false)); this.testState.verifyProtectedStorageAdd(beforeState, keepProtectedStorageEntry, false, false); diff --git a/p2p/src/test/java/bisq/network/p2p/storage/P2PDataStoreDisconnectTest.java b/p2p/src/test/java/bisq/network/p2p/storage/P2PDataStoreDisconnectTest.java index 7a54138a928..6b1ce7db935 100644 --- a/p2p/src/test/java/bisq/network/p2p/storage/P2PDataStoreDisconnectTest.java +++ b/p2p/src/test/java/bisq/network/p2p/storage/P2PDataStoreDisconnectTest.java @@ -39,10 +39,7 @@ import org.junit.Before; import org.junit.Test; -import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static bisq.network.p2p.storage.TestState.*; @@ -173,7 +170,7 @@ public void connectionClosedReduceTTLAndExpireItemsFromPeerPersistable() class ExpirablePersistentProtectedStoragePayloadStub extends ExpirableProtectedStoragePayloadStub implements PersistablePayload { - public ExpirablePersistentProtectedStoragePayloadStub(PublicKey ownerPubKey) { + private ExpirablePersistentProtectedStoragePayloadStub(PublicKey ownerPubKey) { super(ownerPubKey, 0); } } diff --git a/p2p/src/test/java/bisq/network/p2p/storage/TestState.java b/p2p/src/test/java/bisq/network/p2p/storage/TestState.java index b13ec2a97a4..37fe4e4b081 100644 --- a/p2p/src/test/java/bisq/network/p2p/storage/TestState.java +++ b/p2p/src/test/java/bisq/network/p2p/storage/TestState.java @@ -35,7 +35,6 @@ import bisq.network.p2p.storage.payload.ProtectedMailboxStorageEntry; import bisq.network.p2p.storage.payload.ProtectedStorageEntry; import bisq.network.p2p.storage.persistence.AppendOnlyDataStoreListener; -import bisq.network.p2p.storage.persistence.AppendOnlyDataStoreService; import bisq.network.p2p.storage.persistence.ProtectedDataStoreListener; import bisq.network.p2p.storage.persistence.ProtectedDataStoreService; import bisq.network.p2p.storage.persistence.ResourceDataStoreService; @@ -47,8 +46,10 @@ import java.security.PublicKey; -import java.time.Clock; - +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.TimeUnit; import org.junit.Assert; @@ -63,51 +64,88 @@ * Used in the P2PDataStorage*Test(s) in order to leverage common test set up and validation. */ public class TestState { - final P2PDataStorage mockedStorage; + static final int MAX_SEQUENCE_NUMBER_MAP_SIZE_BEFORE_PURGE = 5; + + P2PDataStorage mockedStorage; final Broadcaster mockBroadcaster; final AppendOnlyDataStoreListener appendOnlyDataStoreListener; private final ProtectedDataStoreListener protectedDataStoreListener; - final HashMapChangedListener hashMapChangedListener; + private final HashMapChangedListener hashMapChangedListener; private final Storage mockSeqNrStorage; + private final ProtectedDataStoreService protectedDataStoreService; final ClockFake clockFake; - /** - * Subclass of P2PDataStorage that allows for easier testing, but keeps all functionality - */ - static class P2PDataStorageForTest extends P2PDataStorage { - - P2PDataStorageForTest(NetworkNode networkNode, - Broadcaster broadcaster, - AppendOnlyDataStoreService appendOnlyDataStoreService, - ProtectedDataStoreService protectedDataStoreService, - ResourceDataStoreService resourceDataStoreService, - Storage sequenceNumberMapStorage, - Clock clock) { - super(networkNode, broadcaster, appendOnlyDataStoreService, protectedDataStoreService, resourceDataStoreService, sequenceNumberMapStorage, clock); - - this.maxSequenceNumberMapSizeBeforePurge = 5; - } - } - TestState() { this.mockBroadcaster = mock(Broadcaster.class); this.mockSeqNrStorage = mock(Storage.class); this.clockFake = new ClockFake(); + this.protectedDataStoreService = new ProtectedDataStoreServiceFake(); - this.mockedStorage = new P2PDataStorageForTest(mock(NetworkNode.class), + this.mockedStorage = new P2PDataStorage(mock(NetworkNode.class), this.mockBroadcaster, new AppendOnlyDataStoreServiceFake(), - new ProtectedDataStoreServiceFake(), mock(ResourceDataStoreService.class), - this.mockSeqNrStorage, this.clockFake); + this.protectedDataStoreService, mock(ResourceDataStoreService.class), + this.mockSeqNrStorage, this.clockFake, MAX_SEQUENCE_NUMBER_MAP_SIZE_BEFORE_PURGE); this.appendOnlyDataStoreListener = mock(AppendOnlyDataStoreListener.class); this.protectedDataStoreListener = mock(ProtectedDataStoreListener.class); this.hashMapChangedListener = mock(HashMapChangedListener.class); - this.mockedStorage.addHashMapChangedListener(this.hashMapChangedListener); - this.mockedStorage.addAppendOnlyDataStoreListener(this.appendOnlyDataStoreListener); - this.mockedStorage.addProtectedDataStoreListener(this.protectedDataStoreListener); + this.mockedStorage = createP2PDataStorageForTest( + this.mockBroadcaster, + this.protectedDataStoreService, + this.mockSeqNrStorage, + this.clockFake, + this.hashMapChangedListener, + this.appendOnlyDataStoreListener, + this.protectedDataStoreListener); + } + + + /** + * Re-initializes the in-memory data structures from the storage objects to simulate a node restarting. Important + * to note that the current TestState uses Test Doubles instead of actual disk storage so this is just "simulating" + * not running the entire storage code paths. + */ + void simulateRestart() { + when(this.mockSeqNrStorage.initAndGetPersisted(any(SequenceNumberMap.class), anyLong())) + .thenReturn(this.mockedStorage.sequenceNumberMap); + + this.mockedStorage = createP2PDataStorageForTest( + this.mockBroadcaster, + this.protectedDataStoreService, + this.mockSeqNrStorage, + this.clockFake, + this.hashMapChangedListener, + this.appendOnlyDataStoreListener, + this.protectedDataStoreListener); + } + + private static P2PDataStorage createP2PDataStorageForTest( + Broadcaster broadcaster, + ProtectedDataStoreService protectedDataStoreService, + Storage sequenceNrMapStorage, + ClockFake clock, + HashMapChangedListener hashMapChangedListener, + AppendOnlyDataStoreListener appendOnlyDataStoreListener, + ProtectedDataStoreListener protectedDataStoreListener) { + + P2PDataStorage p2PDataStorage = new P2PDataStorage(mock(NetworkNode.class), + broadcaster, + new AppendOnlyDataStoreServiceFake(), + protectedDataStoreService, mock(ResourceDataStoreService.class), + sequenceNrMapStorage, clock, MAX_SEQUENCE_NUMBER_MAP_SIZE_BEFORE_PURGE); + + // Currently TestState only supports reading ProtectedStorageEntries off disk. + p2PDataStorage.readFromResources("unused"); + p2PDataStorage.readPersisted(); + + p2PDataStorage.addHashMapChangedListener(hashMapChangedListener); + p2PDataStorage.addAppendOnlyDataStoreListener(appendOnlyDataStoreListener); + p2PDataStorage.addProtectedDataStoreListener(protectedDataStoreListener); + + return p2PDataStorage; } private void resetState() { @@ -177,7 +215,6 @@ void verifyProtectedStorageAdd(SavedTestState beforeState, boolean expectedStateChange, boolean expectedIsDataOwner) { P2PDataStorage.ByteArray hashMapHash = P2PDataStorage.get32ByteHashAsByteArray(protectedStorageEntry.getProtectedStoragePayload()); - P2PDataStorage.ByteArray storageHash = P2PDataStorage.getCompactHashAsByteArray(protectedStorageEntry.getProtectedStoragePayload()); if (expectedStateChange) { Assert.assertEquals(protectedStorageEntry, this.mockedStorage.getMap().get(hashMapHash)); @@ -187,14 +224,14 @@ void verifyProtectedStorageAdd(SavedTestState beforeState, // TODO: Should the behavior be identical between this and the HashMap listeners? // TODO: Do we want ot overwrite stale values in order to persist updated sequence numbers and timestamps? if (protectedStorageEntry.getProtectedStoragePayload() instanceof PersistablePayload && beforeState.protectedStorageEntryBeforeOpDataStoreMap == null) { - Assert.assertEquals(protectedStorageEntry, this.mockedStorage.getProtectedDataStoreMap().get(storageHash)); + Assert.assertEquals(protectedStorageEntry, this.protectedDataStoreService.getMap().get(hashMapHash)); verify(this.protectedDataStoreListener).onAdded(protectedStorageEntry); } else { - Assert.assertEquals(beforeState.protectedStorageEntryBeforeOpDataStoreMap, this.mockedStorage.getProtectedDataStoreMap().get(storageHash)); + Assert.assertEquals(beforeState.protectedStorageEntryBeforeOpDataStoreMap, this.protectedDataStoreService.getMap().get(hashMapHash)); verify(this.protectedDataStoreListener, never()).onAdded(protectedStorageEntry); } - verify(this.hashMapChangedListener).onAdded(protectedStorageEntry); + verify(this.hashMapChangedListener).onAdded(Collections.singletonList(protectedStorageEntry)); final ArgumentCaptor captor = ArgumentCaptor.forClass(BroadcastMessage.class); verify(this.mockBroadcaster).broadcast(captor.capture(), any(NodeAddress.class), @@ -207,12 +244,12 @@ void verifyProtectedStorageAdd(SavedTestState beforeState, this.verifySequenceNumberMapWriteContains(P2PDataStorage.get32ByteHashAsByteArray(protectedStorageEntry.getProtectedStoragePayload()), protectedStorageEntry.getSequenceNumber()); } else { Assert.assertEquals(beforeState.protectedStorageEntryBeforeOp, this.mockedStorage.getMap().get(hashMapHash)); - Assert.assertEquals(beforeState.protectedStorageEntryBeforeOpDataStoreMap, this.mockedStorage.getProtectedDataStoreMap().get(storageHash)); + Assert.assertEquals(beforeState.protectedStorageEntryBeforeOpDataStoreMap, this.protectedDataStoreService.getMap().get(hashMapHash)); verify(this.mockBroadcaster, never()).broadcast(any(BroadcastMessage.class), any(NodeAddress.class), any(BroadcastHandler.Listener.class), anyBoolean()); // Internal state didn't change... nothing should be notified - verify(this.hashMapChangedListener, never()).onAdded(protectedStorageEntry); + verify(this.hashMapChangedListener, never()).onAdded(Collections.singletonList(protectedStorageEntry)); verify(this.protectedDataStoreListener, never()).onAdded(protectedStorageEntry); verify(this.mockSeqNrStorage, never()).queueUpForSave(any(SequenceNumberMap.class), anyLong()); } @@ -224,38 +261,67 @@ void verifyProtectedStorageRemove(SavedTestState beforeState, boolean expectedBroadcastOnStateChange, boolean expectedSeqNrWriteOnStateChange, boolean expectedIsDataOwner) { - P2PDataStorage.ByteArray hashMapHash = P2PDataStorage.get32ByteHashAsByteArray(protectedStorageEntry.getProtectedStoragePayload()); - P2PDataStorage.ByteArray storageHash = P2PDataStorage.getCompactHashAsByteArray(protectedStorageEntry.getProtectedStoragePayload()); + verifyProtectedStorageRemove(beforeState, Collections.singletonList(protectedStorageEntry), + expectedStateChange, expectedBroadcastOnStateChange, + expectedSeqNrWriteOnStateChange, expectedIsDataOwner); + } + + void verifyProtectedStorageRemove(SavedTestState beforeState, + Collection protectedStorageEntries, + boolean expectedStateChange, + boolean expectedBroadcastOnStateChange, + boolean expectedSeqNrWriteOnStateChange, + boolean expectedIsDataOwner) { + + // The default matcher expects orders to stay the same. So, create a custom matcher function since + // we don't care about the order. if (expectedStateChange) { - Assert.assertNull(this.mockedStorage.getMap().get(hashMapHash)); + final ArgumentCaptor> argument = ArgumentCaptor.forClass(Collection.class); + verify(this.hashMapChangedListener).onRemoved(argument.capture()); - if (protectedStorageEntry.getProtectedStoragePayload() instanceof PersistablePayload) { - Assert.assertNull(this.mockedStorage.getProtectedDataStoreMap().get(storageHash)); + Set actual = new HashSet<>(argument.getValue()); + Set expected = new HashSet<>(protectedStorageEntries); - verify(this.protectedDataStoreListener).onRemoved(protectedStorageEntry); - } + // Ensure we didn't remove duplicates + Assert.assertEquals(protectedStorageEntries.size(), expected.size()); + Assert.assertEquals(argument.getValue().size(), actual.size()); + Assert.assertEquals(expected, actual); + } else { + verify(this.hashMapChangedListener, never()).onRemoved(any()); + } - verify(this.hashMapChangedListener).onRemoved(protectedStorageEntry); + protectedStorageEntries.forEach(protectedStorageEntry -> { + P2PDataStorage.ByteArray hashMapHash = P2PDataStorage.get32ByteHashAsByteArray(protectedStorageEntry.getProtectedStoragePayload()); - if (expectedSeqNrWriteOnStateChange) - this.verifySequenceNumberMapWriteContains(P2PDataStorage.get32ByteHashAsByteArray(protectedStorageEntry.getProtectedStoragePayload()), protectedStorageEntry.getSequenceNumber()); + if (expectedStateChange) { + Assert.assertNull(this.mockedStorage.getMap().get(hashMapHash)); - if (expectedBroadcastOnStateChange) { - if (protectedStorageEntry instanceof ProtectedMailboxStorageEntry) - verify(this.mockBroadcaster).broadcast(any(RemoveMailboxDataMessage.class), any(NodeAddress.class), eq(null), eq(expectedIsDataOwner)); - else - verify(this.mockBroadcaster).broadcast(any(RemoveDataMessage.class), any(NodeAddress.class), eq(null), eq(expectedIsDataOwner)); - } + if (protectedStorageEntry.getProtectedStoragePayload() instanceof PersistablePayload) { + Assert.assertNull(this.protectedDataStoreService.getMap().get(hashMapHash)); - } else { - Assert.assertEquals(beforeState.protectedStorageEntryBeforeOp, this.mockedStorage.getMap().get(hashMapHash)); + verify(this.protectedDataStoreListener).onRemoved(protectedStorageEntry); + } - verify(this.mockBroadcaster, never()).broadcast(any(BroadcastMessage.class), any(NodeAddress.class), any(BroadcastHandler.Listener.class), anyBoolean()); - verify(this.hashMapChangedListener, never()).onAdded(protectedStorageEntry); - verify(this.protectedDataStoreListener, never()).onAdded(protectedStorageEntry); - verify(this.mockSeqNrStorage, never()).queueUpForSave(any(SequenceNumberMap.class), anyLong()); - } + if (expectedSeqNrWriteOnStateChange) + this.verifySequenceNumberMapWriteContains(P2PDataStorage.get32ByteHashAsByteArray(protectedStorageEntry.getProtectedStoragePayload()), protectedStorageEntry.getSequenceNumber()); + + if (expectedBroadcastOnStateChange) { + if (protectedStorageEntry instanceof ProtectedMailboxStorageEntry) + verify(this.mockBroadcaster).broadcast(any(RemoveMailboxDataMessage.class), any(NodeAddress.class), eq(null), eq(expectedIsDataOwner)); + else + verify(this.mockBroadcaster).broadcast(any(RemoveDataMessage.class), any(NodeAddress.class), eq(null), eq(expectedIsDataOwner)); + } + + } else { + Assert.assertEquals(beforeState.protectedStorageEntryBeforeOp, this.mockedStorage.getMap().get(hashMapHash)); + + verify(this.mockBroadcaster, never()).broadcast(any(BroadcastMessage.class), any(NodeAddress.class), any(BroadcastHandler.Listener.class), anyBoolean()); + verify(this.hashMapChangedListener, never()).onAdded(Collections.singletonList(protectedStorageEntry)); + verify(this.protectedDataStoreListener, never()).onAdded(protectedStorageEntry); + verify(this.mockSeqNrStorage, never()).queueUpForSave(any(SequenceNumberMap.class), anyLong()); + } + }); } void verifyRefreshTTL(SavedTestState beforeState, @@ -352,11 +418,10 @@ private SavedTestState(TestState testState, PersistableNetworkPayload persistabl private SavedTestState(TestState testState, ProtectedStorageEntry protectedStorageEntry) { this(testState); - P2PDataStorage.ByteArray storageHash = P2PDataStorage.getCompactHashAsByteArray(protectedStorageEntry.getProtectedStoragePayload()); - this.protectedStorageEntryBeforeOpDataStoreMap = testState.mockedStorage.getProtectedDataStoreMap().get(storageHash); - P2PDataStorage.ByteArray hashMapHash = P2PDataStorage.get32ByteHashAsByteArray(protectedStorageEntry.getProtectedStoragePayload()); this.protectedStorageEntryBeforeOp = testState.mockedStorage.getMap().get(hashMapHash); + this.protectedStorageEntryBeforeOpDataStoreMap = testState.protectedDataStoreService.getMap().get(hashMapHash); + this.creationTimestampBeforeUpdate = (this.protectedStorageEntryBeforeOp != null) ? this.protectedStorageEntryBeforeOp.getCreationTimeStamp() : 0; } diff --git a/p2p/src/test/java/bisq/network/p2p/storage/mocks/ProtectedStoragePayloadStub.java b/p2p/src/test/java/bisq/network/p2p/storage/mocks/ProtectedStoragePayloadStub.java index d6be958138f..5d9a9d3784f 100644 --- a/p2p/src/test/java/bisq/network/p2p/storage/mocks/ProtectedStoragePayloadStub.java +++ b/p2p/src/test/java/bisq/network/p2p/storage/mocks/ProtectedStoragePayloadStub.java @@ -45,7 +45,7 @@ public class ProtectedStoragePayloadStub implements ProtectedStoragePayload { @Getter private PublicKey ownerPubKey; - protected Message messageMock; + protected final Message messageMock; public ProtectedStoragePayloadStub(PublicKey ownerPubKey) { this.ownerPubKey = ownerPubKey;