diff --git a/common/src/main/java/bisq/common/storage/FileManager.java b/common/src/main/java/bisq/common/storage/FileManager.java index c66d08803d7..ddf44301ff0 100644 --- a/common/src/main/java/bisq/common/storage/FileManager.java +++ b/common/src/main/java/bisq/common/storage/FileManager.java @@ -36,7 +36,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; import lombok.extern.slf4j.Slf4j; @@ -46,10 +46,9 @@ public class FileManager { private final File dir; private final File storageFile; private final ScheduledThreadPoolExecutor executor; - private final AtomicBoolean savePending; private final long delay; private final Callable saveFileTask; - private T persistable; + private final AtomicReference nextWrite; private final PersistenceProtoResolver persistenceProtoResolver; private final ReentrantLock writeLock = CycleDetectingLockFactory.newInstance(CycleDetectingLockFactory.Policies.THROW).newReentrantLock("writeLock"); @@ -61,26 +60,28 @@ public FileManager(File dir, File storageFile, long delay, PersistenceProtoResol this.dir = dir; this.storageFile = storageFile; this.persistenceProtoResolver = persistenceProtoResolver; + this.nextWrite = new AtomicReference<>(null); executor = Utilities.getScheduledThreadPoolExecutor("FileManager", 1, 10, 5); // File must only be accessed from the auto-save executor from now on, to avoid simultaneous access. - savePending = new AtomicBoolean(); this.delay = delay; saveFileTask = () -> { try { Thread.currentThread().setName("Save-file-task-" + new Random().nextInt(10000)); - // Runs in an auto save thread. - // TODO: this looks like it could cause corrupt data as the savePending is unset before the actual - // save. By moving to after the save there might be some persist operations that are not performed - // and data would be lost. Probably all persist operations should happen sequencially rather than - // skip one when there is already one scheduled - if (!savePending.getAndSet(false)) { - // Some other scheduled request already beat us to it. + + // Atomically take the next object to write and set the value to null so concurrent saveFileTask + // won't duplicate work. + T persistable = this.nextWrite.getAndSet(null); + + // If null, a concurrent saveFileTask already grabbed the data. Don't duplicate work. + if (persistable == null) return null; - } - saveNowInternal(persistable); + + long now = System.currentTimeMillis(); + saveToFile(persistable, dir, storageFile); + log.debug("Save {} completed in {} msec", storageFile, System.currentTimeMillis() - now); } catch (Throwable e) { log.error("Error during saveFileTask", e); } @@ -96,26 +97,20 @@ public FileManager(File dir, File storageFile, long delay, PersistenceProtoResol // API /////////////////////////////////////////////////////////////////////////////////////////// - /** - * Actually write the wallet file to disk, using an atomic rename when possible. Runs on the current thread. - */ - public void saveNow(T persistable) { - saveNowInternal(persistable); - } - /** * Queues up a save in the background. Useful for not very important wallet changes. */ - public void saveLater(T persistable) { + void saveLater(T persistable) { saveLater(persistable, delay); } public void saveLater(T persistable, long delayInMilli) { - this.persistable = persistable; - - if (savePending.getAndSet(true)) - return; // Already pending. + // Atomically set the value of the next write. This allows batching of multiple writes of the same data + // structure if there are multiple calls to saveLater within a given `delayInMillis`. + this.nextWrite.set(persistable); + // Always schedule a write. It is possible that a previous saveLater was called with a larger `delayInMilli` + // and we want the lower delay to execute. The saveFileTask handles concurrent operations. executor.schedule(saveFileTask, delayInMilli, TimeUnit.MILLISECONDS); } @@ -134,7 +129,7 @@ public synchronized T read(File file) { } } - public synchronized void removeFile(String fileName) { + synchronized void removeFile(String fileName) { File file = new File(dir, fileName); boolean result = file.delete(); if (!result) @@ -155,7 +150,7 @@ public synchronized void removeFile(String fileName) { /** * Shut down auto-saving. */ - void shutDown() { + private void shutDown() { executor.shutdown(); try { executor.awaitTermination(5, TimeUnit.SECONDS); @@ -175,11 +170,11 @@ public static void removeAndBackupFile(File dbDir, File storageFile, String file FileUtil.renameFile(storageFile, corruptedFile); } - public synchronized void removeAndBackupFile(String fileName) throws IOException { + synchronized void removeAndBackupFile(String fileName) throws IOException { removeAndBackupFile(dir, storageFile, fileName, "backup_of_corrupted_data"); } - public synchronized void backupFile(String fileName, int numMaxBackupFiles) { + synchronized void backupFile(String fileName, int numMaxBackupFiles) { FileUtil.rollingBackup(dir, fileName, numMaxBackupFiles); } @@ -187,12 +182,6 @@ public synchronized void backupFile(String fileName, int numMaxBackupFiles) { // Private /////////////////////////////////////////////////////////////////////////////////////////// - private void saveNowInternal(T persistable) { - long now = System.currentTimeMillis(); - saveToFile(persistable, dir, storageFile); - log.debug("Save {} completed in {} msec", storageFile, System.currentTimeMillis() - now); - } - private synchronized void saveToFile(T persistable, File dir, File storageFile) { File tempFile = null; FileOutputStream fileOutputStream = null; diff --git a/core/src/main/java/bisq/core/alert/AlertManager.java b/core/src/main/java/bisq/core/alert/AlertManager.java index 05a3872a7ff..0fba35a283f 100644 --- a/core/src/main/java/bisq/core/alert/AlertManager.java +++ b/core/src/main/java/bisq/core/alert/AlertManager.java @@ -44,6 +44,8 @@ import java.math.BigInteger; +import java.util.Collection; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,22 +81,26 @@ public AlertManager(P2PService p2PService, if (!ignoreDevMsg) { p2PService.addHashSetChangedListener(new HashMapChangedListener() { @Override - public void onAdded(ProtectedStorageEntry data) { - final ProtectedStoragePayload protectedStoragePayload = data.getProtectedStoragePayload(); - if (protectedStoragePayload instanceof Alert) { - Alert alert = (Alert) protectedStoragePayload; - if (verifySignature(alert)) - alertMessageProperty.set(alert); - } + public void onAdded(Collection protectedStorageEntries) { + protectedStorageEntries.forEach(protectedStorageEntry -> { + final ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload(); + if (protectedStoragePayload instanceof Alert) { + Alert alert = (Alert) protectedStoragePayload; + if (verifySignature(alert)) + alertMessageProperty.set(alert); + } + }); } @Override - public void onRemoved(ProtectedStorageEntry data) { - final ProtectedStoragePayload protectedStoragePayload = data.getProtectedStoragePayload(); - if (protectedStoragePayload instanceof Alert) { - if (verifySignature((Alert) protectedStoragePayload)) - alertMessageProperty.set(null); - } + public void onRemoved(Collection protectedStorageEntries) { + protectedStorageEntries.forEach(protectedStorageEntry -> { + final ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload(); + if (protectedStoragePayload instanceof Alert) { + if (verifySignature((Alert) protectedStoragePayload)) + alertMessageProperty.set(null); + } + }); } }); } diff --git a/core/src/main/java/bisq/core/dao/governance/proposal/ProposalListPresentation.java b/core/src/main/java/bisq/core/dao/governance/proposal/ProposalListPresentation.java index 9be6942b6c9..49aa219281c 100644 --- a/core/src/main/java/bisq/core/dao/governance/proposal/ProposalListPresentation.java +++ b/core/src/main/java/bisq/core/dao/governance/proposal/ProposalListPresentation.java @@ -20,16 +20,11 @@ import bisq.core.btc.wallet.BsqWalletService; import bisq.core.dao.DaoSetupService; import bisq.core.dao.governance.proposal.storage.appendonly.ProposalPayload; -import bisq.core.dao.governance.proposal.storage.temp.TempProposalPayload; import bisq.core.dao.state.DaoStateListener; import bisq.core.dao.state.DaoStateService; import bisq.core.dao.state.model.blockchain.Block; import bisq.core.dao.state.model.governance.Proposal; -import bisq.network.p2p.storage.HashMapChangedListener; -import bisq.network.p2p.storage.P2PDataStorage; -import bisq.network.p2p.storage.payload.ProtectedStorageEntry; - import bisq.common.UserThread; import org.bitcoinj.core.TransactionConfidence; @@ -55,8 +50,7 @@ * our own proposal that is not critical). Foreign proposals are only shown if confirmed and fully validated. */ @Slf4j -public class ProposalListPresentation implements DaoStateListener, HashMapChangedListener, - MyProposalListService.Listener, DaoSetupService { +public class ProposalListPresentation implements DaoStateListener, MyProposalListService.Listener, DaoSetupService { private final ProposalService proposalService; private final DaoStateService daoStateService; private final MyProposalListService myProposalListService; @@ -66,7 +60,6 @@ public class ProposalListPresentation implements DaoStateListener, HashMapChange @Getter private final FilteredList activeOrMyUnconfirmedProposals = new FilteredList<>(allProposals); private final ListChangeListener proposalListChangeListener; - private boolean tempProposalsChanged; /////////////////////////////////////////////////////////////////////////////////////////// @@ -76,7 +69,6 @@ public class ProposalListPresentation implements DaoStateListener, HashMapChange @Inject public ProposalListPresentation(ProposalService proposalService, DaoStateService daoStateService, - P2PDataStorage p2PDataStorage, MyProposalListService myProposalListService, BsqWalletService bsqWalletService, ProposalValidatorProvider validatorProvider) { @@ -87,7 +79,6 @@ public ProposalListPresentation(ProposalService proposalService, this.validatorProvider = validatorProvider; daoStateService.addDaoStateListener(this); - p2PDataStorage.addHashMapChangedListener(this); myProposalListService.addListener(this); proposalListChangeListener = c -> updateLists(); @@ -124,44 +115,6 @@ public void onParseBlockCompleteAfterBatchProcessing(Block block) { updateLists(); } - - /////////////////////////////////////////////////////////////////////////////////////////// - // HashMapChangedListener - /////////////////////////////////////////////////////////////////////////////////////////// - - @Override - public void onAdded(ProtectedStorageEntry entry) { - if (entry.getProtectedStoragePayload() instanceof TempProposalPayload) { - tempProposalsChanged = true; - } - } - - @Override - public void onRemoved(ProtectedStorageEntry entry) { - if (entry.getProtectedStoragePayload() instanceof TempProposalPayload) { - tempProposalsChanged = true; - } - } - - @Override - public void onBatchRemoveExpiredDataStarted() { - // We temporary remove the listener when batch processing starts to avoid that we rebuild our lists at each - // remove call. After batch processing at onBatchRemoveExpiredDataCompleted we add again our listener and call - // the updateLists method. - proposalService.getTempProposals().removeListener(proposalListChangeListener); - } - - @Override - public void onBatchRemoveExpiredDataCompleted() { - proposalService.getTempProposals().addListener(proposalListChangeListener); - // We only call updateLists if tempProposals have changed. updateLists() is an expensive call and takes 200 ms. - if (tempProposalsChanged) { - updateLists(); - tempProposalsChanged = false; - } - } - - /////////////////////////////////////////////////////////////////////////////////////////// // MyProposalListService.Listener /////////////////////////////////////////////////////////////////////////////////////////// diff --git a/core/src/main/java/bisq/core/dao/governance/proposal/ProposalService.java b/core/src/main/java/bisq/core/dao/governance/proposal/ProposalService.java index 6da0d387516..7953d403b66 100644 --- a/core/src/main/java/bisq/core/dao/governance/proposal/ProposalService.java +++ b/core/src/main/java/bisq/core/dao/governance/proposal/ProposalService.java @@ -50,6 +50,8 @@ import javafx.collections.FXCollections; import javafx.collections.ObservableList; +import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; @@ -133,13 +135,15 @@ public void start() { /////////////////////////////////////////////////////////////////////////////////////////// @Override - public void onAdded(ProtectedStorageEntry entry) { - onProtectedDataAdded(entry, true); + public void onAdded(Collection protectedStorageEntries) { + protectedStorageEntries.forEach(protectedStorageEntry -> { + onProtectedDataAdded(protectedStorageEntry, true); + }); } @Override - public void onRemoved(ProtectedStorageEntry entry) { - onProtectedDataRemoved(entry); + public void onRemoved(Collection protectedStorageEntries) { + onProtectedDataRemoved(protectedStorageEntries); } @@ -266,30 +270,39 @@ private void onProtectedDataAdded(ProtectedStorageEntry entry, boolean fromBroad } } - private void onProtectedDataRemoved(ProtectedStorageEntry entry) { - ProtectedStoragePayload protectedStoragePayload = entry.getProtectedStoragePayload(); - if (protectedStoragePayload instanceof TempProposalPayload) { - Proposal proposal = ((TempProposalPayload) protectedStoragePayload).getProposal(); - // We allow removal only if we are in the proposal phase. - boolean inPhase = periodService.isInPhase(daoStateService.getChainHeight(), DaoPhase.Phase.PROPOSAL); - boolean txInPastCycle = periodService.isTxInPastCycle(proposal.getTxId(), daoStateService.getChainHeight()); - Optional tx = daoStateService.getTx(proposal.getTxId()); - boolean unconfirmedOrNonBsqTx = !tx.isPresent(); - // if the tx is unconfirmed we need to be in the PROPOSAL phase, otherwise the tx must be confirmed. - if (inPhase || txInPastCycle || unconfirmedOrNonBsqTx) { - if (tempProposals.contains(proposal)) { - tempProposals.remove(proposal); - log.debug("We received a remove request for a TempProposalPayload and have removed the proposal " + - "from our list. proposal creation date={}, proposalTxId={}, inPhase={}, " + - "txInPastCycle={}, unconfirmedOrNonBsqTx={}", - proposal.getCreationDateAsDate(), proposal.getTxId(), inPhase, txInPastCycle, unconfirmedOrNonBsqTx); + private void onProtectedDataRemoved(Collection protectedStorageEntries) { + + // The listeners of tmpProposals can do large amounts of work that cause performance issues. Apply all of the + // updates at once using retainAll which will cause all listeners to be updated only once. + ArrayList tempProposalsWithUpdates = new ArrayList<>(tempProposals); + + protectedStorageEntries.forEach(protectedStorageEntry -> { + ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload(); + if (protectedStoragePayload instanceof TempProposalPayload) { + Proposal proposal = ((TempProposalPayload) protectedStoragePayload).getProposal(); + // We allow removal only if we are in the proposal phase. + boolean inPhase = periodService.isInPhase(daoStateService.getChainHeight(), DaoPhase.Phase.PROPOSAL); + boolean txInPastCycle = periodService.isTxInPastCycle(proposal.getTxId(), daoStateService.getChainHeight()); + Optional tx = daoStateService.getTx(proposal.getTxId()); + boolean unconfirmedOrNonBsqTx = !tx.isPresent(); + // if the tx is unconfirmed we need to be in the PROPOSAL phase, otherwise the tx must be confirmed. + if (inPhase || txInPastCycle || unconfirmedOrNonBsqTx) { + if (tempProposalsWithUpdates.contains(proposal)) { + tempProposalsWithUpdates.remove(proposal); + log.debug("We received a remove request for a TempProposalPayload and have removed the proposal " + + "from our list. proposal creation date={}, proposalTxId={}, inPhase={}, " + + "txInPastCycle={}, unconfirmedOrNonBsqTx={}", + proposal.getCreationDateAsDate(), proposal.getTxId(), inPhase, txInPastCycle, unconfirmedOrNonBsqTx); + } + } else { + log.warn("We received a remove request outside the PROPOSAL phase. " + + "Proposal creation date={}, proposal.txId={}, current blockHeight={}", + proposal.getCreationDateAsDate(), proposal.getTxId(), daoStateService.getChainHeight()); } - } else { - log.warn("We received a remove request outside the PROPOSAL phase. " + - "Proposal creation date={}, proposal.txId={}, current blockHeight={}", - proposal.getCreationDateAsDate(), proposal.getTxId(), daoStateService.getChainHeight()); } - } + }); + + tempProposals.retainAll(tempProposalsWithUpdates); } private void onAppendOnlyDataAdded(PersistableNetworkPayload persistableNetworkPayload, boolean fromBroadcastMessage) { diff --git a/core/src/main/java/bisq/core/dao/governance/proposal/storage/temp/TempProposalStore.java b/core/src/main/java/bisq/core/dao/governance/proposal/storage/temp/TempProposalStore.java index 81ef3c05908..169ba028a6b 100644 --- a/core/src/main/java/bisq/core/dao/governance/proposal/storage/temp/TempProposalStore.java +++ b/core/src/main/java/bisq/core/dao/governance/proposal/storage/temp/TempProposalStore.java @@ -56,7 +56,7 @@ public class TempProposalStore implements PersistableEnvelope { /////////////////////////////////////////////////////////////////////////////////////////// private TempProposalStore(List list) { - list.forEach(entry -> map.put(P2PDataStorage.getCompactHashAsByteArray(entry.getProtectedStoragePayload()), entry)); + list.forEach(entry -> map.put(P2PDataStorage.get32ByteHashAsByteArray(entry.getProtectedStoragePayload()), entry)); } public Message toProtoMessage() { diff --git a/core/src/main/java/bisq/core/filter/FilterManager.java b/core/src/main/java/bisq/core/filter/FilterManager.java index 86687d92c37..c32c27b07b7 100644 --- a/core/src/main/java/bisq/core/filter/FilterManager.java +++ b/core/src/main/java/bisq/core/filter/FilterManager.java @@ -52,6 +52,7 @@ import java.math.BigInteger; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Optional; import java.util.concurrent.CopyOnWriteArrayList; @@ -133,23 +134,27 @@ public void onAllServicesInitialized() { p2PService.addHashSetChangedListener(new HashMapChangedListener() { @Override - public void onAdded(ProtectedStorageEntry data) { - if (data.getProtectedStoragePayload() instanceof Filter) { - Filter filter = (Filter) data.getProtectedStoragePayload(); - boolean wasValid = addFilter(filter); - if (!wasValid) { - UserThread.runAfter(() -> p2PService.getP2PDataStorage().removeInvalidProtectedStorageEntry(data), 1); + public void onAdded(Collection protectedStorageEntries) { + protectedStorageEntries.forEach(protectedStorageEntry -> { + if (protectedStorageEntry.getProtectedStoragePayload() instanceof Filter) { + Filter filter = (Filter) protectedStorageEntry.getProtectedStoragePayload(); + boolean wasValid = addFilter(filter); + if (!wasValid) { + UserThread.runAfter(() -> p2PService.getP2PDataStorage().removeInvalidProtectedStorageEntry(protectedStorageEntry), 1); + } } - } + }); } @Override - public void onRemoved(ProtectedStorageEntry data) { - if (data.getProtectedStoragePayload() instanceof Filter) { - Filter filter = (Filter) data.getProtectedStoragePayload(); - if (verifySignature(filter)) - resetFilters(); - } + public void onRemoved(Collection protectedStorageEntries) { + protectedStorageEntries.forEach(protectedStorageEntry -> { + if (protectedStorageEntry.getProtectedStoragePayload() instanceof Filter) { + Filter filter = (Filter) protectedStorageEntry.getProtectedStoragePayload(); + if (verifySignature(filter)) + resetFilters(); + } + }); } }); } diff --git a/core/src/main/java/bisq/core/offer/OfferBookService.java b/core/src/main/java/bisq/core/offer/OfferBookService.java index 5ebe152453c..e9b36e89219 100644 --- a/core/src/main/java/bisq/core/offer/OfferBookService.java +++ b/core/src/main/java/bisq/core/offer/OfferBookService.java @@ -40,6 +40,7 @@ import java.io.File; +import java.util.Collection; import java.util.LinkedList; import java.util.List; import java.util.Objects; @@ -87,26 +88,30 @@ public OfferBookService(P2PService p2PService, p2PService.addHashSetChangedListener(new HashMapChangedListener() { @Override - public void onAdded(ProtectedStorageEntry data) { - offerBookChangedListeners.stream().forEach(listener -> { - if (data.getProtectedStoragePayload() instanceof OfferPayload) { - OfferPayload offerPayload = (OfferPayload) data.getProtectedStoragePayload(); - Offer offer = new Offer(offerPayload); - offer.setPriceFeedService(priceFeedService); - listener.onAdded(offer); - } + public void onAdded(Collection protectedStorageEntries) { + protectedStorageEntries.forEach(protectedStorageEntry -> { + offerBookChangedListeners.stream().forEach(listener -> { + if (protectedStorageEntry.getProtectedStoragePayload() instanceof OfferPayload) { + OfferPayload offerPayload = (OfferPayload) protectedStorageEntry.getProtectedStoragePayload(); + Offer offer = new Offer(offerPayload); + offer.setPriceFeedService(priceFeedService); + listener.onAdded(offer); + } + }); }); } @Override - public void onRemoved(ProtectedStorageEntry data) { - offerBookChangedListeners.stream().forEach(listener -> { - if (data.getProtectedStoragePayload() instanceof OfferPayload) { - OfferPayload offerPayload = (OfferPayload) data.getProtectedStoragePayload(); - Offer offer = new Offer(offerPayload); - offer.setPriceFeedService(priceFeedService); - listener.onRemoved(offer); - } + public void onRemoved(Collection protectedStorageEntries) { + protectedStorageEntries.forEach(protectedStorageEntry -> { + offerBookChangedListeners.stream().forEach(listener -> { + if (protectedStorageEntry.getProtectedStoragePayload() instanceof OfferPayload) { + OfferPayload offerPayload = (OfferPayload) protectedStorageEntry.getProtectedStoragePayload(); + Offer offer = new Offer(offerPayload); + offer.setPriceFeedService(priceFeedService); + listener.onRemoved(offer); + } + }); }); } }); diff --git a/core/src/main/java/bisq/core/support/dispute/agent/DisputeAgentManager.java b/core/src/main/java/bisq/core/support/dispute/agent/DisputeAgentManager.java index 16cc1b38e87..ac6480bb1b2 100644 --- a/core/src/main/java/bisq/core/support/dispute/agent/DisputeAgentManager.java +++ b/core/src/main/java/bisq/core/support/dispute/agent/DisputeAgentManager.java @@ -46,6 +46,7 @@ import java.math.BigInteger; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -131,18 +132,22 @@ public DisputeAgentManager(KeyRing keyRing, public void onAllServicesInitialized() { disputeAgentService.addHashSetChangedListener(new HashMapChangedListener() { @Override - public void onAdded(ProtectedStorageEntry data) { - if (isExpectedInstance(data)) { - updateMap(); - } + public void onAdded(Collection protectedStorageEntries) { + protectedStorageEntries.forEach(protectedStorageEntry -> { + if (isExpectedInstance(protectedStorageEntry)) { + updateMap(); + } + }); } @Override - public void onRemoved(ProtectedStorageEntry data) { - if (isExpectedInstance(data)) { - updateMap(); - removeAcceptedDisputeAgentFromUser(data); - } + public void onRemoved(Collection protectedStorageEntries) { + protectedStorageEntries.forEach(protectedStorageEntry -> { + if (isExpectedInstance(protectedStorageEntry)) { + updateMap(); + removeAcceptedDisputeAgentFromUser(protectedStorageEntry); + } + }); } }); diff --git a/core/src/test/java/bisq/core/dao/governance/proposal/ProposalServiceP2PDataStorageListenerTest.java b/core/src/test/java/bisq/core/dao/governance/proposal/ProposalServiceP2PDataStorageListenerTest.java new file mode 100644 index 00000000000..7945a241b28 --- /dev/null +++ b/core/src/test/java/bisq/core/dao/governance/proposal/ProposalServiceP2PDataStorageListenerTest.java @@ -0,0 +1,127 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.core.dao.governance.proposal; + +import bisq.core.dao.governance.period.PeriodService; +import bisq.core.dao.governance.proposal.storage.appendonly.ProposalStorageService; +import bisq.core.dao.governance.proposal.storage.temp.TempProposalPayload; +import bisq.core.dao.governance.proposal.storage.temp.TempProposalStorageService; +import bisq.core.dao.state.DaoStateService; +import bisq.core.dao.state.model.governance.DaoPhase; +import bisq.core.dao.state.model.governance.Proposal; + +import bisq.network.p2p.P2PService; +import bisq.network.p2p.storage.payload.ProtectedStorageEntry; +import bisq.network.p2p.storage.persistence.AppendOnlyDataStoreService; +import bisq.network.p2p.storage.persistence.ProtectedDataStoreService; + +import javafx.collections.ListChangeListener; + +import java.util.Arrays; +import java.util.Collections; + +import org.junit.Before; +import org.junit.Test; + +import static org.mockito.Mockito.*; + +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + + +/** + * Tests of the P2PDataStorage::onRemoved callback behavior to ensure that the proper number of signal events occur. + */ +public class ProposalServiceP2PDataStorageListenerTest { + private ProposalService proposalService; + + @Mock + private PeriodService periodService; + + @Mock + private DaoStateService daoStateService; + + @Mock + private ListChangeListener tempProposalListener; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + + this.proposalService = new ProposalService( + mock(P2PService.class), + this.periodService, + mock(ProposalStorageService.class), + mock(TempProposalStorageService.class), + mock(AppendOnlyDataStoreService.class), + mock(ProtectedDataStoreService.class), + this.daoStateService, + mock(ProposalValidatorProvider.class), + true); + + // Create a state so that all added/removed Proposals will actually update the tempProposals list. + when(this.periodService.isInPhase(anyInt(), any(DaoPhase.Phase.class))).thenReturn(true); + when(this.daoStateService.isParseBlockChainComplete()).thenReturn(false); + } + + private static ProtectedStorageEntry buildProtectedStorageEntry() { + ProtectedStorageEntry protectedStorageEntry = mock(ProtectedStorageEntry.class); + TempProposalPayload tempProposalPayload = mock(TempProposalPayload.class); + Proposal tempProposal = mock(Proposal.class); + when(protectedStorageEntry.getProtectedStoragePayload()).thenReturn(tempProposalPayload); + when(tempProposalPayload.getProposal()).thenReturn(tempProposal); + + return protectedStorageEntry; + } + + // TESTCASE: If an onRemoved callback is called which does not remove anything the tempProposals listeners + // are not signaled. + @Test + public void onRemoved_noSignalIfNoChange() { + this.proposalService.onRemoved(Collections.singletonList(mock(ProtectedStorageEntry.class))); + + verify(this.tempProposalListener, never()).onChanged(any()); + } + + // TESTCASE: If an onRemoved callback is called with 1 element AND it creates a remove of 1 element, the tempProposal + // listeners are signaled once. + @Test + public void onRemoved_signalOnceOnOneChange() { + ProtectedStorageEntry one = buildProtectedStorageEntry(); + this.proposalService.onAdded(Collections.singletonList(one)); + this.proposalService.getTempProposals().addListener(this.tempProposalListener); + + this.proposalService.onRemoved(Collections.singletonList(one)); + + verify(this.tempProposalListener).onChanged(any()); + } + + // TESTCASE: If an onRemoved callback is called with 2 elements AND it creates a remove of 2 elements, the + // tempProposal listeners are signaled once. + @Test + public void onRemoved_signalOnceOnMultipleChanges() { + ProtectedStorageEntry one = buildProtectedStorageEntry(); + ProtectedStorageEntry two = buildProtectedStorageEntry(); + this.proposalService.onAdded(Arrays.asList(one, two)); + this.proposalService.getTempProposals().addListener(this.tempProposalListener); + + this.proposalService.onRemoved(Arrays.asList(one, two)); + + verify(this.tempProposalListener).onChanged(any()); + } +} diff --git a/p2p/src/main/java/bisq/network/p2p/P2PModule.java b/p2p/src/main/java/bisq/network/p2p/P2PModule.java index 6356515f18b..d1f3aeb22c5 100644 --- a/p2p/src/main/java/bisq/network/p2p/P2PModule.java +++ b/p2p/src/main/java/bisq/network/p2p/P2PModule.java @@ -105,5 +105,6 @@ protected void configure() { bindConstant().annotatedWith(named(NetworkOptionKeys.MSG_THROTTLE_PER_10_SEC)).to(environment.getRequiredProperty(NetworkOptionKeys.MSG_THROTTLE_PER_10_SEC)); bindConstant().annotatedWith(named(NetworkOptionKeys.SEND_MSG_THROTTLE_TRIGGER)).to(environment.getRequiredProperty(NetworkOptionKeys.SEND_MSG_THROTTLE_TRIGGER)); bindConstant().annotatedWith(named(NetworkOptionKeys.SEND_MSG_THROTTLE_SLEEP)).to(environment.getRequiredProperty(NetworkOptionKeys.SEND_MSG_THROTTLE_SLEEP)); + bindConstant().annotatedWith(named("MAX_SEQUENCE_NUMBER_MAP_SIZE_BEFORE_PURGE")).to(1000); } } diff --git a/p2p/src/main/java/bisq/network/p2p/P2PService.java b/p2p/src/main/java/bisq/network/p2p/P2PService.java index d90fdeb2565..733cc385cce 100644 --- a/p2p/src/main/java/bisq/network/p2p/P2PService.java +++ b/p2p/src/main/java/bisq/network/p2p/P2PService.java @@ -75,6 +75,7 @@ import java.security.PublicKey; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -432,15 +433,15 @@ public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) { /////////////////////////////////////////////////////////////////////////////////////////// @Override - public void onAdded(ProtectedStorageEntry protectedStorageEntry) { - if (protectedStorageEntry instanceof ProtectedMailboxStorageEntry) - processMailboxEntry((ProtectedMailboxStorageEntry) protectedStorageEntry); + public void onAdded(Collection protectedStorageEntries) { + protectedStorageEntries.forEach(protectedStorageEntry -> { + if (protectedStorageEntry instanceof ProtectedMailboxStorageEntry) + processMailboxEntry((ProtectedMailboxStorageEntry) protectedStorageEntry); + }); } @Override - public void onRemoved(ProtectedStorageEntry data) { - } - + public void onRemoved(Collection protectedStorageEntries) { } /////////////////////////////////////////////////////////////////////////////////////////// // DirectMessages diff --git a/p2p/src/main/java/bisq/network/p2p/peers/getdata/RequestDataHandler.java b/p2p/src/main/java/bisq/network/p2p/peers/getdata/RequestDataHandler.java index f0c972780e3..a116d171b7d 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/getdata/RequestDataHandler.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/getdata/RequestDataHandler.java @@ -132,7 +132,7 @@ void requestData(NodeAddress nodeAddress, boolean isPreliminaryDataRequest) { .map(e -> e.bytes) .collect(Collectors.toSet()); - Set excludedKeysFromPersistedEntryMap = dataStorage.getProtectedDataStoreMap().keySet() + Set excludedKeysFromPersistedEntryMap = dataStorage.getMap().keySet() .stream() .map(e -> e.bytes) .collect(Collectors.toSet()); diff --git a/p2p/src/main/java/bisq/network/p2p/storage/HashMapChangedListener.java b/p2p/src/main/java/bisq/network/p2p/storage/HashMapChangedListener.java index a3ac1c20258..ce483889703 100644 --- a/p2p/src/main/java/bisq/network/p2p/storage/HashMapChangedListener.java +++ b/p2p/src/main/java/bisq/network/p2p/storage/HashMapChangedListener.java @@ -19,17 +19,11 @@ import bisq.network.p2p.storage.payload.ProtectedStorageEntry; +import java.util.Collection; + public interface HashMapChangedListener { - void onAdded(ProtectedStorageEntry data); + void onAdded(Collection protectedStorageEntries); @SuppressWarnings("UnusedParameters") - void onRemoved(ProtectedStorageEntry data); - - // We process all expired entries after a delay (60 s) after onBootstrapComplete. - // We notify listeners of start and completion so they can optimize to only update after batch processing is done. - default void onBatchRemoveExpiredDataStarted() { - } - - default void onBatchRemoveExpiredDataCompleted() { - } + void onRemoved(Collection protectedStorageEntries); } diff --git a/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java b/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java index 04313763d17..c0a22c691b2 100644 --- a/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java +++ b/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java @@ -42,7 +42,6 @@ import bisq.network.p2p.storage.payload.RequiresOwnerIsOnlinePayload; import bisq.network.p2p.storage.persistence.AppendOnlyDataStoreListener; import bisq.network.p2p.storage.persistence.AppendOnlyDataStoreService; -import bisq.network.p2p.storage.persistence.ProtectedDataStoreListener; import bisq.network.p2p.storage.persistence.ProtectedDataStoreService; import bisq.network.p2p.storage.persistence.ResourceDataStoreService; import bisq.network.p2p.storage.persistence.SequenceNumberMap; @@ -63,9 +62,13 @@ import com.google.protobuf.ByteString; +import com.google.inject.name.Named; + import javax.inject.Inject; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Maps; + import java.security.KeyPair; import java.security.PublicKey; @@ -73,6 +76,8 @@ import java.time.Clock; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.Comparator; import java.util.Date; import java.util.HashMap; @@ -115,13 +120,16 @@ public class P2PDataStorage implements MessageListener, ConnectionListener, Pers private Timer removeExpiredEntriesTimer; private final Storage sequenceNumberMapStorage; - private final SequenceNumberMap sequenceNumberMap = new SequenceNumberMap(); + + @VisibleForTesting + final SequenceNumberMap sequenceNumberMap = new SequenceNumberMap(); private final Set appendOnlyDataStoreListeners = new CopyOnWriteArraySet<>(); - private final Set protectedDataStoreListeners = new CopyOnWriteArraySet<>(); private final Clock clock; - protected int maxSequenceNumberMapSizeBeforePurge; + /// The maximum number of items that must exist in the SequenceNumberMap before it is scheduled for a purge + /// which removes entries after PURGE_AGE_DAYS. + private final int maxSequenceNumberMapSizeBeforePurge; /////////////////////////////////////////////////////////////////////////////////////////// // Constructor @@ -134,12 +142,14 @@ public P2PDataStorage(NetworkNode networkNode, ProtectedDataStoreService protectedDataStoreService, ResourceDataStoreService resourceDataStoreService, Storage sequenceNumberMapStorage, - Clock clock) { + Clock clock, + @Named("MAX_SEQUENCE_NUMBER_MAP_SIZE_BEFORE_PURGE") int maxSequenceNumberBeforePurge) { this.broadcaster = broadcaster; this.appendOnlyDataStoreService = appendOnlyDataStoreService; this.protectedDataStoreService = protectedDataStoreService; this.resourceDataStoreService = resourceDataStoreService; this.clock = clock; + this.maxSequenceNumberMapSizeBeforePurge = maxSequenceNumberBeforePurge; networkNode.addMessageListener(this); @@ -147,7 +157,6 @@ public P2PDataStorage(NetworkNode networkNode, this.sequenceNumberMapStorage = sequenceNumberMapStorage; sequenceNumberMapStorage.setNumMaxBackupFiles(5); - this.maxSequenceNumberMapSizeBeforePurge = 1000; } @Override @@ -191,17 +200,13 @@ void removeExpiredEntries() { .filter(entry -> entry.getValue().isExpired(this.clock)) .collect(Collectors.toCollection(ArrayList::new)); - // Batch processing can cause performance issues, so we give listeners a chance to deal with it by notifying - // about start and end of iteration. - hashMapChangedListeners.forEach(HashMapChangedListener::onBatchRemoveExpiredDataStarted); - toRemoveList.forEach(mapEntry -> { - ProtectedStorageEntry protectedStorageEntry = mapEntry.getValue(); - ByteArray payloadHash = mapEntry.getKey(); - - log.debug("We found an expired data entry. We remove the protectedData:\n\t" + Utilities.toTruncatedString(protectedStorageEntry)); - removeFromMapAndDataStore(protectedStorageEntry, payloadHash); + // Batch processing can cause performance issues, so do all of the removes first, then update the listeners + // to let them know about the removes. + toRemoveList.forEach(toRemoveItem -> { + log.debug("We found an expired data entry. We remove the protectedData:\n\t" + + Utilities.toTruncatedString(toRemoveItem.getValue())); }); - hashMapChangedListeners.forEach(HashMapChangedListener::onBatchRemoveExpiredDataCompleted); + removeFromMapAndDataStore(toRemoveList); if (sequenceNumberMap.size() > this.maxSequenceNumberMapSizeBeforePurge) sequenceNumberMap.setMap(getPurgedSequenceNumberMap(sequenceNumberMap.getMap())); @@ -215,11 +220,6 @@ public Map getAppendOnlyDataStoreMap() { return appendOnlyDataStoreService.getMap(); } - public Map getProtectedDataStoreMap() { - return protectedDataStoreService.getMap(); - } - - /////////////////////////////////////////////////////////////////////////////////////////// // MessageListener implementation /////////////////////////////////////////////////////////////////////////////////////////// @@ -388,23 +388,33 @@ public boolean addProtectedStorageEntry(ProtectedStorageEntry protectedStorageEn return false; } - // If we have seen a more recent operation for this payload, we ignore the current one - if(!hasSequenceNrIncreased(protectedStorageEntry.getSequenceNumber(), hashOfPayload)) + ProtectedStorageEntry storedEntry = map.get(hashOfPayload); + + // If we have seen a more recent operation for this payload and we have a payload locally, ignore it + if (storedEntry != null && + !hasSequenceNrIncreased(protectedStorageEntry.getSequenceNumber(), hashOfPayload)) { + return false; + } + + // We want to allow add operations for equal sequence numbers if we don't have the payload locally. This is + // the case for non-persistent Payloads that need to be reconstructed from peer and seed nodes each startup. + MapValue sequenceNumberMapValue = sequenceNumberMap.get(hashOfPayload); + if (sequenceNumberMapValue != null && + protectedStorageEntry.getSequenceNumber() < sequenceNumberMapValue.sequenceNr) { return false; + } // Verify the ProtectedStorageEntry is well formed and valid for the add operation if (!protectedStorageEntry.isValidForAddOperation()) return false; - ProtectedStorageEntry storedEntry = map.get(hashOfPayload); - // If we have already seen an Entry with the same hash, verify the metadata is equal if (storedEntry != null && !protectedStorageEntry.matchesRelevantPubKey(storedEntry)) return false; // This is an updated entry. Record it and signal listeners. map.put(hashOfPayload, protectedStorageEntry); - hashMapChangedListeners.forEach(e -> e.onAdded(protectedStorageEntry)); + hashMapChangedListeners.forEach(e -> e.onAdded(Collections.singletonList(protectedStorageEntry))); // Record the updated sequence number and persist it. Higher delay so we can batch more items. sequenceNumberMap.put(hashOfPayload, new MapValue(protectedStorageEntry.getSequenceNumber(), this.clock.millis())); @@ -414,13 +424,9 @@ public boolean addProtectedStorageEntry(ProtectedStorageEntry protectedStorageEn if (allowBroadcast) broadcastProtectedStorageEntry(protectedStorageEntry, sender, listener, isDataOwner); - // Persist ProtectedStorageEntrys carrying PersistablePayload payloads and signal listeners on changes - if (protectedStoragePayload instanceof PersistablePayload) { - ByteArray compactHash = P2PDataStorage.getCompactHashAsByteArray(protectedStoragePayload); - ProtectedStorageEntry previous = protectedDataStoreService.putIfAbsent(compactHash, protectedStorageEntry); - if (previous == null) - protectedDataStoreListeners.forEach(e -> e.onAdded(protectedStorageEntry)); - } + // Persist ProtectedStorageEntrys carrying PersistablePayload payloads + if (protectedStoragePayload instanceof PersistablePayload) + protectedDataStoreService.put(hashOfPayload, protectedStorageEntry); return true; } @@ -481,13 +487,6 @@ public boolean remove(ProtectedStorageEntry protectedStorageEntry, ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload(); ByteArray hashOfPayload = get32ByteHashAsByteArray(protectedStoragePayload); - // If we don't know about the target of this remove, ignore it - ProtectedStorageEntry storedEntry = map.get(hashOfPayload); - if (storedEntry == null) { - log.debug("Remove data ignored as we don't have an entry for that data."); - return false; - } - // If we have seen a more recent operation for this payload, ignore this one if (!hasSequenceNrIncreased(protectedStorageEntry.getSequenceNumber(), hashOfPayload)) return false; @@ -497,19 +496,26 @@ public boolean remove(ProtectedStorageEntry protectedStorageEntry, return false; // If we have already seen an Entry with the same hash, verify the metadata is the same - if (!protectedStorageEntry.matchesRelevantPubKey(storedEntry)) + ProtectedStorageEntry storedEntry = map.get(hashOfPayload); + if (storedEntry != null && !protectedStorageEntry.matchesRelevantPubKey(storedEntry)) return false; - // Valid remove entry, do the remove and signal listeners - removeFromMapAndDataStore(protectedStorageEntry, hashOfPayload); - printData("after remove"); - // Record the latest sequence number and persist it sequenceNumberMap.put(hashOfPayload, new MapValue(protectedStorageEntry.getSequenceNumber(), this.clock.millis())); sequenceNumberMapStorage.queueUpForSave(SequenceNumberMap.clone(sequenceNumberMap), 300); maybeAddToRemoveAddOncePayloads(protectedStoragePayload, hashOfPayload); + if (storedEntry != null) { + // Valid remove entry, do the remove and signal listeners + removeFromMapAndDataStore(protectedStorageEntry, hashOfPayload); + } /* else { + // This means the RemoveData or RemoveMailboxData was seen prior to the AddData. We have already updated + // the SequenceNumberMap appropriately so the stale Add will not pass validation, but we still want to + // broadcast the remove to peers so they can update their state appropriately + } */ + printData("after remove"); + if (protectedStorageEntry instanceof ProtectedMailboxStorageEntry) { broadcast(new RemoveMailboxDataMessage((ProtectedMailboxStorageEntry) protectedStorageEntry), sender, null, isDataOwner); } else { @@ -621,35 +627,37 @@ public void removeAppendOnlyDataStoreListener(AppendOnlyDataStoreListener listen appendOnlyDataStoreListeners.remove(listener); } - @SuppressWarnings("unused") - public void addProtectedDataStoreListener(ProtectedDataStoreListener listener) { - protectedDataStoreListeners.add(listener); - } - - @SuppressWarnings("unused") - public void removeProtectedDataStoreListener(ProtectedDataStoreListener listener) { - protectedDataStoreListeners.remove(listener); - } - - /////////////////////////////////////////////////////////////////////////////////////////// // Private /////////////////////////////////////////////////////////////////////////////////////////// private void removeFromMapAndDataStore(ProtectedStorageEntry protectedStorageEntry, ByteArray hashOfPayload) { - map.remove(hashOfPayload); - hashMapChangedListeners.forEach(e -> e.onRemoved(protectedStorageEntry)); + removeFromMapAndDataStore(Collections.singletonList(Maps.immutableEntry(hashOfPayload, protectedStorageEntry))); + } - ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload(); - if (protectedStoragePayload instanceof PersistablePayload) { - ByteArray compactHash = getCompactHashAsByteArray(protectedStoragePayload); - ProtectedStorageEntry previous = protectedDataStoreService.remove(compactHash, protectedStorageEntry); - if (previous != null) { - protectedDataStoreListeners.forEach(e -> e.onRemoved(protectedStorageEntry)); - } else { - log.info("We cannot remove the protectedStorageEntry from the persistedEntryMap as it does not exist."); + private void removeFromMapAndDataStore( + Collection> entriesToRemoveWithPayloadHash) { + + if (entriesToRemoveWithPayloadHash.isEmpty()) + return; + + ArrayList entriesForSignal = new ArrayList<>(entriesToRemoveWithPayloadHash.size()); + entriesToRemoveWithPayloadHash.forEach(entryToRemoveWithPayloadHash -> { + ByteArray hashOfPayload = entryToRemoveWithPayloadHash.getKey(); + ProtectedStorageEntry protectedStorageEntry = entryToRemoveWithPayloadHash.getValue(); + + map.remove(hashOfPayload); + entriesForSignal.add(protectedStorageEntry); + + ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload(); + if (protectedStoragePayload instanceof PersistablePayload) { + ProtectedStorageEntry previous = protectedDataStoreService.remove(hashOfPayload, protectedStorageEntry); + if (previous == null) + log.error("We cannot remove the protectedStorageEntry from the persistedEntryMap as it does not exist."); } - } + }); + + hashMapChangedListeners.forEach(e -> e.onRemoved(entriesForSignal)); } private boolean hasSequenceNrIncreased(int newSequenceNumber, ByteArray hashOfData) { @@ -690,14 +698,6 @@ public static ByteArray get32ByteHashAsByteArray(NetworkPayload data) { return new ByteArray(P2PDataStorage.get32ByteHash(data)); } - public static ByteArray getCompactHashAsByteArray(ProtectedStoragePayload protectedStoragePayload) { - return new ByteArray(getCompactHash(protectedStoragePayload)); - } - - private static byte[] getCompactHash(ProtectedStoragePayload protectedStoragePayload) { - return Hash.getSha256Ripemd160hash(protectedStoragePayload.toProtoMessage().toByteArray()); - } - // Get a new map with entries older than PURGE_AGE_DAYS purged from the given map. private Map getPurgedSequenceNumberMap(Map persisted) { Map purged = new HashMap<>(); diff --git a/p2p/src/main/java/bisq/network/p2p/storage/persistence/MapStoreService.java b/p2p/src/main/java/bisq/network/p2p/storage/persistence/MapStoreService.java index 5449a3f3a97..67088799c4a 100644 --- a/p2p/src/main/java/bisq/network/p2p/storage/persistence/MapStoreService.java +++ b/p2p/src/main/java/bisq/network/p2p/storage/persistence/MapStoreService.java @@ -56,6 +56,11 @@ public MapStoreService(File storageDir, Storage storage) { public abstract boolean canHandle(R payload); + void put(P2PDataStorage.ByteArray hash, R payload) { + getMap().put(hash, payload); + persist(); + } + R putIfAbsent(P2PDataStorage.ByteArray hash, R payload) { R previous = getMap().putIfAbsent(hash, payload); persist(); diff --git a/p2p/src/main/java/bisq/network/p2p/storage/persistence/ProtectedDataStoreListener.java b/p2p/src/main/java/bisq/network/p2p/storage/persistence/ProtectedDataStoreListener.java deleted file mode 100644 index 6eb1a12cbf2..00000000000 --- a/p2p/src/main/java/bisq/network/p2p/storage/persistence/ProtectedDataStoreListener.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * This file is part of Bisq. - * - * Bisq is free software: you can redistribute it and/or modify it - * under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or (at - * your option) any later version. - * - * Bisq is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public - * License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with Bisq. If not, see . - */ - -package bisq.network.p2p.storage.persistence; - -import bisq.network.p2p.storage.payload.ProtectedStorageEntry; - -public interface ProtectedDataStoreListener { - void onAdded(ProtectedStorageEntry protectedStorageEntry); - - void onRemoved(ProtectedStorageEntry protectedStorageEntry); -} diff --git a/p2p/src/main/java/bisq/network/p2p/storage/persistence/ProtectedDataStoreService.java b/p2p/src/main/java/bisq/network/p2p/storage/persistence/ProtectedDataStoreService.java index 9c75e567368..c671ad28658 100644 --- a/p2p/src/main/java/bisq/network/p2p/storage/persistence/ProtectedDataStoreService.java +++ b/p2p/src/main/java/bisq/network/p2p/storage/persistence/ProtectedDataStoreService.java @@ -65,24 +65,10 @@ public void put(P2PDataStorage.ByteArray hash, ProtectedStorageEntry entry) { services.stream() .filter(service -> service.canHandle(entry)) .forEach(service -> { - service.putIfAbsent(hash, entry); + service.put(hash, entry); }); } - public ProtectedStorageEntry putIfAbsent(P2PDataStorage.ByteArray hash, ProtectedStorageEntry entry) { - Map map = getMap(); - if (!map.containsKey(hash)) { - put(hash, entry); - return null; - } else { - return map.get(hash); - } - } - - public boolean containsKey(P2PDataStorage.ByteArray hash) { - return getMap().containsKey(hash); - } - public ProtectedStorageEntry remove(P2PDataStorage.ByteArray hash, ProtectedStorageEntry protectedStorageEntry) { final ProtectedStorageEntry[] result = new ProtectedStorageEntry[1]; services.stream() diff --git a/p2p/src/test/java/bisq/network/p2p/storage/P2PDataStorageClientAPITest.java b/p2p/src/test/java/bisq/network/p2p/storage/P2PDataStorageClientAPITest.java index 867cbea229a..4e0cb6c895f 100644 --- a/p2p/src/test/java/bisq/network/p2p/storage/P2PDataStorageClientAPITest.java +++ b/p2p/src/test/java/bisq/network/p2p/storage/P2PDataStorageClientAPITest.java @@ -186,9 +186,9 @@ public void getMailboxDataWithSignedSeqNr_RemoveNoExist() throws NoSuchAlgorithm this.testState.mockedStorage.getMailboxDataWithSignedSeqNr(mailboxStoragePayload, receiverKeys, receiverKeys.getPublic()); SavedTestState beforeState = this.testState.saveTestState(protectedMailboxStorageEntry); - Assert.assertFalse(this.testState.mockedStorage.remove(protectedMailboxStorageEntry, TestState.getTestNodeAddress(), true)); + Assert.assertTrue(this.testState.mockedStorage.remove(protectedMailboxStorageEntry, TestState.getTestNodeAddress(), true)); - this.testState.verifyProtectedStorageRemove(beforeState, protectedMailboxStorageEntry, false, true, true, true); + this.testState.verifyProtectedStorageRemove(beforeState, protectedMailboxStorageEntry, false, false, false, true, true); } // TESTCASE: Adding, then removing a mailbox message from the getMailboxDataWithSignedSeqNr API @@ -210,7 +210,7 @@ public void getMailboxDataWithSignedSeqNr_AddThenRemove() throws NoSuchAlgorithm SavedTestState beforeState = this.testState.saveTestState(protectedMailboxStorageEntry); Assert.assertTrue(this.testState.mockedStorage.remove(protectedMailboxStorageEntry, TestState.getTestNodeAddress(), true)); - this.testState.verifyProtectedStorageRemove(beforeState, protectedMailboxStorageEntry, true, true, true,true); + this.testState.verifyProtectedStorageRemove(beforeState, protectedMailboxStorageEntry, true, true, true, true,true); } // TESTCASE: Removing a mailbox message that was added from the onMessage handler @@ -237,6 +237,6 @@ public void getMailboxDataWithSignedSeqNr_ValidRemoveAddFromMessage() throws NoS SavedTestState beforeState = this.testState.saveTestState(protectedMailboxStorageEntry); Assert.assertTrue(this.testState.mockedStorage.remove(protectedMailboxStorageEntry, TestState.getTestNodeAddress(), true)); - this.testState.verifyProtectedStorageRemove(beforeState, protectedMailboxStorageEntry, true, true, true,true); + this.testState.verifyProtectedStorageRemove(beforeState, protectedMailboxStorageEntry, true, true, true, true,true); } } diff --git a/p2p/src/test/java/bisq/network/p2p/storage/P2PDataStoragePersistableNetworkPayloadTest.java b/p2p/src/test/java/bisq/network/p2p/storage/P2PDataStoragePersistableNetworkPayloadTest.java index c8a3d9c6134..9213bcb0fbe 100644 --- a/p2p/src/test/java/bisq/network/p2p/storage/P2PDataStoragePersistableNetworkPayloadTest.java +++ b/p2p/src/test/java/bisq/network/p2p/storage/P2PDataStoragePersistableNetworkPayloadTest.java @@ -55,6 +55,7 @@ * 2 & 3 Client API [addPersistableNetworkPayload(reBroadcast=(true && false))] * 4. onMessage() [onMessage(AddPersistableNetworkPayloadMessage)] */ +@SuppressWarnings("unused") public class P2PDataStoragePersistableNetworkPayloadTest { @RunWith(Parameterized.class) diff --git a/p2p/src/test/java/bisq/network/p2p/storage/P2PDataStorageProtectedStorageEntryTest.java b/p2p/src/test/java/bisq/network/p2p/storage/P2PDataStorageProtectedStorageEntryTest.java index 2aa8bc03603..9f5bd539234 100644 --- a/p2p/src/test/java/bisq/network/p2p/storage/P2PDataStorageProtectedStorageEntryTest.java +++ b/p2p/src/test/java/bisq/network/p2p/storage/P2PDataStorageProtectedStorageEntryTest.java @@ -38,6 +38,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Optional; import org.junit.Assert; @@ -65,6 +66,7 @@ * 1. Client API [addProtectedStorageEntry(), refreshTTL(), remove()] * 2. onMessage() [AddDataMessage, RefreshOfferMessage, RemoveDataMessage] */ +@SuppressWarnings("unused") public class P2PDataStorageProtectedStorageEntryTest { @RunWith(Parameterized.class) abstract public static class ProtectedStorageEntryTestBase { @@ -200,7 +202,10 @@ void doProtectedStorageAddAndVerify(ProtectedStorageEntry protectedStorageEntry, void doProtectedStorageRemoveAndVerify(ProtectedStorageEntry entry, boolean expectedReturnValue, - boolean expectInternalStateChange) { + boolean expectedHashMapAndDataStoreUpdated, + boolean expectedListenersSignaled, + boolean expectedBroadcast, + boolean expectedSeqNrWrite) { SavedTestState beforeState = this.testState.saveTestState(entry); @@ -209,7 +214,7 @@ void doProtectedStorageRemoveAndVerify(ProtectedStorageEntry entry, if (!this.useMessageHandler) Assert.assertEquals(expectedReturnValue, addResult); - this.testState.verifyProtectedStorageRemove(beforeState, entry, expectInternalStateChange, true, true, this.expectIsDataOwner()); + this.testState.verifyProtectedStorageRemove(beforeState, entry, expectedHashMapAndDataStoreUpdated, expectedListenersSignaled, expectedBroadcast, expectedSeqNrWrite, this.expectIsDataOwner()); } /// Valid Add Tests (isValidForAdd() and matchesRelevantPubKey() return true) @@ -262,7 +267,7 @@ public void addProtectectedStorageEntry_afterRemoveSameSeqNr() { ProtectedStorageEntry entryForRemove = this.getProtectedStorageEntryForRemove(1); doProtectedStorageAddAndVerify(entryForAdd, true, true); - doProtectedStorageRemoveAndVerify(entryForRemove, false, false); + doProtectedStorageRemoveAndVerify(entryForRemove, false, false, false, false, false); doProtectedStorageAddAndVerify(entryForAdd, false, false); } @@ -310,7 +315,7 @@ public void remove_seqNrEqAddSeqNr() { doProtectedStorageAddAndVerify(entryForAdd, true, true); - doProtectedStorageRemoveAndVerify(entryForRemove, false, false); + doProtectedStorageRemoveAndVerify(entryForRemove, false, false, false, false, false); } // TESTCASE: Removing an item after successfully added (remove seq # > add seq #) @@ -320,15 +325,15 @@ public void remove_seqNrGtAddSeqNr() { ProtectedStorageEntry entryForRemove = this.getProtectedStorageEntryForRemove(2); doProtectedStorageAddAndVerify(entryForAdd, true, true); - doProtectedStorageRemoveAndVerify(entryForRemove, true, true); + doProtectedStorageRemoveAndVerify(entryForRemove, true, true, true, true, true); } - // TESTCASE: Removing an item before it was added + // TESTCASE: Removing an item before it was added. This triggers a SequenceNumberMap write and broadcast @Test public void remove_notExists() { ProtectedStorageEntry entryForRemove = this.getProtectedStorageEntryForRemove(1); - doProtectedStorageRemoveAndVerify(entryForRemove, false, false); + doProtectedStorageRemoveAndVerify(entryForRemove, true, false, false, true, true); } // TESTCASE: Removing an item after successfully adding (remove seq # < add seq #) @@ -338,7 +343,7 @@ public void remove_seqNrLessAddSeqNr() { ProtectedStorageEntry entryForRemove = this.getProtectedStorageEntryForRemove(1); doProtectedStorageAddAndVerify(entryForAdd, true, true); - doProtectedStorageRemoveAndVerify(entryForRemove, false, false); + doProtectedStorageRemoveAndVerify(entryForRemove, false, false, false, false, false); } // TESTCASE: Add after removed (same seq #) @@ -348,7 +353,7 @@ public void add_afterRemoveSameSeqNr() { doProtectedStorageAddAndVerify(entryForAdd, true, true); ProtectedStorageEntry entryForRemove = this.getProtectedStorageEntryForRemove(2); - doProtectedStorageRemoveAndVerify(entryForRemove, true, true); + doProtectedStorageRemoveAndVerify(entryForRemove, true, true, true, true, true); doProtectedStorageAddAndVerify(entryForAdd, false, false); } @@ -360,7 +365,7 @@ public void add_afterRemoveGreaterSeqNr() { doProtectedStorageAddAndVerify(entryForAdd, true, true); ProtectedStorageEntry entryForRemove = this.getProtectedStorageEntryForRemove(2); - doProtectedStorageRemoveAndVerify(entryForRemove, true, true); + doProtectedStorageRemoveAndVerify(entryForRemove, true, true, true, true, true); entryForAdd = this.getProtectedStorageEntryForAdd(3); doProtectedStorageAddAndVerify(entryForAdd, true, true); @@ -375,7 +380,7 @@ public void remove_EntryNotisValidForRemoveOperation() { doProtectedStorageAddAndVerify(entryForAdd, true, true); ProtectedStorageEntry entryForRemove = this.getProtectedStorageEntryForRemove(2, false, true); - doProtectedStorageRemoveAndVerify(entryForRemove, false, false); + doProtectedStorageRemoveAndVerify(entryForRemove, false, false, false, false, false); } // TESTCASE: Remove fails if Entry is valid for remove, but metadata doesn't match remove target @@ -385,7 +390,7 @@ public void remove_EntryNotmatchesRelevantPubKey() { doProtectedStorageAddAndVerify(entryForAdd, true, true); ProtectedStorageEntry entryForRemove = this.getProtectedStorageEntryForRemove(2, true, false); - doProtectedStorageRemoveAndVerify(entryForRemove, false, false); + doProtectedStorageRemoveAndVerify(entryForRemove, false, false, false, false, false); } // TESTCASE: Remove fails if Entry is not valid for remove and metadata doesn't match remove target @@ -395,7 +400,7 @@ public void remove_EntryNotisValidForRemoveOperationNotmatchesRelevantPubKey() { doProtectedStorageAddAndVerify(entryForAdd, true, true); ProtectedStorageEntry entryForRemove = this.getProtectedStorageEntryForRemove(2, false, false); - doProtectedStorageRemoveAndVerify(entryForRemove, false, false); + doProtectedStorageRemoveAndVerify(entryForRemove, false, false, false, false, false); } @@ -406,24 +411,42 @@ public void add_afterRemoveLessSeqNr() { doProtectedStorageAddAndVerify(entryForAdd, true, true); ProtectedStorageEntry entryForRemove = this.getProtectedStorageEntryForRemove(3); - doProtectedStorageRemoveAndVerify(entryForRemove, true, true); + doProtectedStorageRemoveAndVerify(entryForRemove, true, true, true, true, true); entryForAdd = this.getProtectedStorageEntryForAdd(1); doProtectedStorageAddAndVerify(entryForAdd, false, false); } // TESTCASE: Received remove for nonexistent item that was later received - // XXXBUGXXX: There may be cases where removes are reordered with adds (remove during pending GetDataRequest?). - // The proper behavior may be to not add the late messages, but the current code will successfully add them - // even in the AddOncePayload (mailbox) case. @Test public void remove_lateAdd() { ProtectedStorageEntry entryForAdd = this.getProtectedStorageEntryForAdd(1); ProtectedStorageEntry entryForRemove = this.getProtectedStorageEntryForRemove(2); - doProtectedStorageRemoveAndVerify(entryForRemove, false, false); + this.doRemove(entryForRemove); + + doProtectedStorageAddAndVerify(entryForAdd, false, false); + } + + // TESTCASE: Invalid remove doesn't block a valid add (isValidForRemove == false | matchesRelevantPubKey == false) + @Test + public void remove_entryNotIsValidForRemoveDoesntBlockAdd1() { + ProtectedStorageEntry entryForAdd = this.getProtectedStorageEntryForAdd(1); + ProtectedStorageEntry entryForRemove = this.getProtectedStorageEntryForRemove(1, false, false); + + this.doRemove(entryForRemove); + + doProtectedStorageAddAndVerify(entryForAdd, true, true); + } + + // TESTCASE: Invalid remove doesn't block a valid add (isValidForRemove == false | matchesRelevantPubKey == true) + @Test + public void remove_entryNotIsValidForRemoveDoesntBlockAdd2() { + ProtectedStorageEntry entryForAdd = this.getProtectedStorageEntryForAdd(1); + ProtectedStorageEntry entryForRemove = this.getProtectedStorageEntryForRemove(1, false, true); + + this.doRemove(entryForRemove); - // should be (false, false) doProtectedStorageAddAndVerify(entryForAdd, true, true); } } @@ -539,7 +562,7 @@ public void refreshTTL_refreshAfterRemove() throws CryptoException { ProtectedStorageEntry entryForRemove = this.getProtectedStorageEntryForRemove(2); doProtectedStorageAddAndVerify(entryForAdd, true, true); - doProtectedStorageRemoveAndVerify(entryForRemove, true, true); + doProtectedStorageRemoveAndVerify(entryForRemove, true, true, true, true, true); doRefreshTTLAndVerify(buildRefreshOfferMessage(entryForAdd, this.payloadOwnerKeys,3), false, false); } @@ -553,6 +576,34 @@ public void refreshTTL_refreshEntryOwnerOriginalOwnerMismatch() throws CryptoExc KeyPair notOwner = TestUtils.generateKeyPair(); doRefreshTTLAndVerify(buildRefreshOfferMessage(entry, notOwner, 2), false, false); } + + // TESTCASE: After restart, identical sequence numbers are accepted ONCE. We need a way to reconstruct + // in-memory ProtectedStorageEntrys from seed and peer nodes around startup time. + @Test + public void addProtectedStorageEntry_afterRestartCanAddDuplicateSeqNr() { + ProtectedStorageEntry toAdd1 = this.getProtectedStorageEntryForAdd(1); + doProtectedStorageAddAndVerify(toAdd1, true, true); + + this.testState.simulateRestart(); + + // Can add equal seqNr only once + doProtectedStorageAddAndVerify(toAdd1, true, true); + + // Can't add equal seqNr twice + doProtectedStorageAddAndVerify(toAdd1, false, false); + } + + // TESTCASE: After restart, old sequence numbers are not accepted + @Test + public void addProtectedStorageEntry_afterRestartCanNotAddLowerSeqNr() { + ProtectedStorageEntry toAdd1 = this.getProtectedStorageEntryForAdd(1); + ProtectedStorageEntry toAdd2 = this.getProtectedStorageEntryForAdd(2); + doProtectedStorageAddAndVerify(toAdd2, true, true); + + this.testState.simulateRestart(); + + doProtectedStorageAddAndVerify(toAdd1, false, false); + } } /** @@ -570,6 +621,33 @@ protected Class getEntryClass() { return ProtectedStorageEntry.class; } + + // Tests that just apply to PersistablePayload objects + + // TESTCASE: Ensure the HashMap is the same before and after a restart + @Test + public void addProtectedStorageEntry_afterReadFromResourcesWithDuplicate_3629RegressionTest() { + ProtectedStorageEntry protectedStorageEntry = this.getProtectedStorageEntryForAdd(1); + doProtectedStorageAddAndVerify(protectedStorageEntry, true, true); + + Map beforeRestart = this.testState.mockedStorage.getMap(); + + this.testState.simulateRestart(); + + Assert.assertEquals(beforeRestart, this.testState.mockedStorage.getMap()); + } + + // TESTCASE: After restart, identical sequence numbers are not accepted for persistent payloads + @Test + public void addProtectedStorageEntry_afterRestartCanNotAddDuplicateSeqNr() { + ProtectedStorageEntry toAdd1 = this.getProtectedStorageEntryForAdd(1); + doProtectedStorageAddAndVerify(toAdd1, true, true); + + this.testState.simulateRestart(); + + // Can add equal seqNr only once + doProtectedStorageAddAndVerify(toAdd1, false, false); + } } /** @@ -621,7 +699,7 @@ public void add_afterRemoveGreaterSeqNr() { doProtectedStorageAddAndVerify(entryForAdd, true, true); ProtectedStorageEntry entryForRemove = this.getProtectedStorageEntryForRemove(2); - doProtectedStorageRemoveAndVerify(entryForRemove, true, true); + doProtectedStorageRemoveAndVerify(entryForRemove, true, true, true, true, true); entryForAdd = this.getProtectedStorageEntryForAdd(3); doProtectedStorageAddAndVerify(entryForAdd, false, false); diff --git a/p2p/src/test/java/bisq/network/p2p/storage/P2PDataStorageRemoveExpiredTest.java b/p2p/src/test/java/bisq/network/p2p/storage/P2PDataStorageRemoveExpiredTest.java index 7814004cd2e..454799ed515 100644 --- a/p2p/src/test/java/bisq/network/p2p/storage/P2PDataStorageRemoveExpiredTest.java +++ b/p2p/src/test/java/bisq/network/p2p/storage/P2PDataStorageRemoveExpiredTest.java @@ -32,6 +32,7 @@ import java.security.KeyPair; import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; import java.util.concurrent.TimeUnit; import org.junit.Assert; @@ -66,7 +67,7 @@ public void removeExpiredEntries_SkipsNonExpirableEntries() throws NoSuchAlgorit SavedTestState beforeState = this.testState.saveTestState(protectedStorageEntry); this.testState.mockedStorage.removeExpiredEntries(); - this.testState.verifyProtectedStorageRemove(beforeState, protectedStorageEntry, false, false, false, false); + this.testState.verifyProtectedStorageRemove(beforeState, protectedStorageEntry, false, false, false, false, false); } // TESTCASE: Correctly skips all PersistableNetworkPayloads since they are not expirable @@ -92,7 +93,7 @@ public void removeExpiredEntries_SkipNonExpiredExpirableEntries() throws CryptoE SavedTestState beforeState = this.testState.saveTestState(protectedStorageEntry); this.testState.mockedStorage.removeExpiredEntries(); - this.testState.verifyProtectedStorageRemove(beforeState, protectedStorageEntry, false, false, false, false); + this.testState.verifyProtectedStorageRemove(beforeState, protectedStorageEntry, false, false, false, false, false); } // TESTCASE: Correctly expires non-persistable entries that are expired @@ -109,7 +110,7 @@ public void removeExpiredEntries_ExpiresExpiredExpirableEntries() throws CryptoE SavedTestState beforeState = this.testState.saveTestState(protectedStorageEntry); this.testState.mockedStorage.removeExpiredEntries(); - this.testState.verifyProtectedStorageRemove(beforeState, protectedStorageEntry, true, false, false, false); + this.testState.verifyProtectedStorageRemove(beforeState, protectedStorageEntry, true, true, false, false, false); } // TESTCASE: Correctly skips persistable entries that are not expired @@ -123,7 +124,7 @@ public void removeExpiredEntries_SkipNonExpiredPersistableExpirableEntries() thr SavedTestState beforeState = this.testState.saveTestState(protectedStorageEntry); this.testState.mockedStorage.removeExpiredEntries(); - this.testState.verifyProtectedStorageRemove(beforeState, protectedStorageEntry, false, false, false, false); + this.testState.verifyProtectedStorageRemove(beforeState, protectedStorageEntry, false, false, false, false, false); } // TESTCASE: Correctly expires persistable entries that are expired @@ -140,7 +141,7 @@ public void removeExpiredEntries_ExpiresExpiredPersistableExpirableEntries() thr SavedTestState beforeState = this.testState.saveTestState(protectedStorageEntry); this.testState.mockedStorage.removeExpiredEntries(); - this.testState.verifyProtectedStorageRemove(beforeState, protectedStorageEntry, true, false, false, false); + this.testState.verifyProtectedStorageRemove(beforeState, protectedStorageEntry, true, true, false, false, false); } // TESTCASE: Ensure we try to purge old entries sequence number map when size exceeds the maximum size @@ -149,17 +150,21 @@ public void removeExpiredEntries_ExpiresExpiredPersistableExpirableEntries() thr public void removeExpiredEntries_PurgeSeqNrMap() throws CryptoException, NoSuchAlgorithmException { final int initialClockIncrement = 5; + ArrayList expectedRemoves = new ArrayList<>(); + // Add 4 entries to our sequence number map that will be purged KeyPair purgedOwnerKeys = TestUtils.generateKeyPair(); ProtectedStoragePayload purgedProtectedStoragePayload = new PersistableExpirableProtectedStoragePayloadStub(purgedOwnerKeys.getPublic(), 0); ProtectedStorageEntry purgedProtectedStorageEntry = testState.mockedStorage.getProtectedStorageEntry(purgedProtectedStoragePayload, purgedOwnerKeys); + expectedRemoves.add(purgedProtectedStorageEntry); Assert.assertTrue(testState.mockedStorage.addProtectedStorageEntry(purgedProtectedStorageEntry, TestState.getTestNodeAddress(), null, true)); - for (int i = 0; i < 4; ++i) { + for (int i = 0; i < MAX_SEQUENCE_NUMBER_MAP_SIZE_BEFORE_PURGE - 1; ++i) { KeyPair ownerKeys = TestUtils.generateKeyPair(); ProtectedStoragePayload protectedStoragePayload = new PersistableExpirableProtectedStoragePayloadStub(ownerKeys.getPublic(), 0); ProtectedStorageEntry tmpEntry = testState.mockedStorage.getProtectedStorageEntry(protectedStoragePayload, ownerKeys); + expectedRemoves.add(tmpEntry); Assert.assertTrue(testState.mockedStorage.addProtectedStorageEntry(tmpEntry, TestState.getTestNodeAddress(), null, true)); } @@ -171,6 +176,7 @@ public void removeExpiredEntries_PurgeSeqNrMap() throws CryptoException, NoSuchA KeyPair keepOwnerKeys = TestUtils.generateKeyPair(); ProtectedStoragePayload keepProtectedStoragePayload = new PersistableExpirableProtectedStoragePayloadStub(keepOwnerKeys.getPublic(), 0); ProtectedStorageEntry keepProtectedStorageEntry = testState.mockedStorage.getProtectedStorageEntry(keepProtectedStoragePayload, keepOwnerKeys); + expectedRemoves.add(keepProtectedStorageEntry); Assert.assertTrue(testState.mockedStorage.addProtectedStorageEntry(keepProtectedStorageEntry, TestState.getTestNodeAddress(), null, true)); @@ -178,19 +184,9 @@ public void removeExpiredEntries_PurgeSeqNrMap() throws CryptoException, NoSuchA // Advance time past it so they will be valid purge targets this.testState.clockFake.increment(TimeUnit.DAYS.toMillis(P2PDataStorage.PURGE_AGE_DAYS + 1 - initialClockIncrement)); - // The first entry (11 days old) should be purged + // The first 4 entries (11 days old) should be purged from the SequenceNumberMap SavedTestState beforeState = this.testState.saveTestState(purgedProtectedStorageEntry); this.testState.mockedStorage.removeExpiredEntries(); - this.testState.verifyProtectedStorageRemove(beforeState, purgedProtectedStorageEntry, true, false, false, false); - - // Which means that an addition of a purged entry should succeed. - beforeState = this.testState.saveTestState(purgedProtectedStorageEntry); - Assert.assertTrue(this.testState.mockedStorage.addProtectedStorageEntry(purgedProtectedStorageEntry, TestState.getTestNodeAddress(), null, false)); - this.testState.verifyProtectedStorageAdd(beforeState, purgedProtectedStorageEntry, true, false); - - // The second entry (5 days old) should still exist which means trying to add it again should fail. - beforeState = this.testState.saveTestState(keepProtectedStorageEntry); - Assert.assertFalse(this.testState.mockedStorage.addProtectedStorageEntry(keepProtectedStorageEntry, TestState.getTestNodeAddress(), null, false)); - this.testState.verifyProtectedStorageAdd(beforeState, keepProtectedStorageEntry, false, false); + this.testState.verifyProtectedStorageRemove(beforeState, expectedRemoves, true, true, false, false, false); } } diff --git a/p2p/src/test/java/bisq/network/p2p/storage/P2PDataStoreDisconnectTest.java b/p2p/src/test/java/bisq/network/p2p/storage/P2PDataStoreDisconnectTest.java index 7a54138a928..1f58b815188 100644 --- a/p2p/src/test/java/bisq/network/p2p/storage/P2PDataStoreDisconnectTest.java +++ b/p2p/src/test/java/bisq/network/p2p/storage/P2PDataStoreDisconnectTest.java @@ -39,10 +39,7 @@ import org.junit.Before; import org.junit.Test; -import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static bisq.network.p2p.storage.TestState.*; @@ -72,7 +69,7 @@ private static void verifyStateAfterDisconnect(TestState currentState, ProtectedStorageEntry protectedStorageEntry = beforeState.protectedStorageEntryBeforeOp; currentState.verifyProtectedStorageRemove(beforeState, protectedStorageEntry, - wasRemoved, false, false, false); + wasRemoved, wasRemoved, false, false, false); if (wasTTLReduced) Assert.assertTrue(protectedStorageEntry.getCreationTimeStamp() < beforeState.creationTimestampBeforeUpdate); @@ -173,7 +170,7 @@ public void connectionClosedReduceTTLAndExpireItemsFromPeerPersistable() class ExpirablePersistentProtectedStoragePayloadStub extends ExpirableProtectedStoragePayloadStub implements PersistablePayload { - public ExpirablePersistentProtectedStoragePayloadStub(PublicKey ownerPubKey) { + private ExpirablePersistentProtectedStoragePayloadStub(PublicKey ownerPubKey) { super(ownerPubKey, 0); } } diff --git a/p2p/src/test/java/bisq/network/p2p/storage/TestState.java b/p2p/src/test/java/bisq/network/p2p/storage/TestState.java index b13ec2a97a4..c4ceab5ca2a 100644 --- a/p2p/src/test/java/bisq/network/p2p/storage/TestState.java +++ b/p2p/src/test/java/bisq/network/p2p/storage/TestState.java @@ -29,14 +29,12 @@ import bisq.network.p2p.storage.messages.RemoveMailboxDataMessage; import bisq.network.p2p.storage.mocks.AppendOnlyDataStoreServiceFake; import bisq.network.p2p.storage.mocks.ClockFake; -import bisq.network.p2p.storage.mocks.ProtectedDataStoreServiceFake; +import bisq.network.p2p.storage.mocks.MapStoreServiceFake; import bisq.network.p2p.storage.payload.MailboxStoragePayload; import bisq.network.p2p.storage.payload.PersistableNetworkPayload; import bisq.network.p2p.storage.payload.ProtectedMailboxStorageEntry; import bisq.network.p2p.storage.payload.ProtectedStorageEntry; import bisq.network.p2p.storage.persistence.AppendOnlyDataStoreListener; -import bisq.network.p2p.storage.persistence.AppendOnlyDataStoreService; -import bisq.network.p2p.storage.persistence.ProtectedDataStoreListener; import bisq.network.p2p.storage.persistence.ProtectedDataStoreService; import bisq.network.p2p.storage.persistence.ResourceDataStoreService; import bisq.network.p2p.storage.persistence.SequenceNumberMap; @@ -47,8 +45,10 @@ import java.security.PublicKey; -import java.time.Clock; - +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.TimeUnit; import org.junit.Assert; @@ -63,57 +63,88 @@ * Used in the P2PDataStorage*Test(s) in order to leverage common test set up and validation. */ public class TestState { - final P2PDataStorage mockedStorage; + static final int MAX_SEQUENCE_NUMBER_MAP_SIZE_BEFORE_PURGE = 5; + + P2PDataStorage mockedStorage; final Broadcaster mockBroadcaster; final AppendOnlyDataStoreListener appendOnlyDataStoreListener; - private final ProtectedDataStoreListener protectedDataStoreListener; - final HashMapChangedListener hashMapChangedListener; + private final HashMapChangedListener hashMapChangedListener; private final Storage mockSeqNrStorage; + private final ProtectedDataStoreService protectedDataStoreService; final ClockFake clockFake; - /** - * Subclass of P2PDataStorage that allows for easier testing, but keeps all functionality - */ - static class P2PDataStorageForTest extends P2PDataStorage { - - P2PDataStorageForTest(NetworkNode networkNode, - Broadcaster broadcaster, - AppendOnlyDataStoreService appendOnlyDataStoreService, - ProtectedDataStoreService protectedDataStoreService, - ResourceDataStoreService resourceDataStoreService, - Storage sequenceNumberMapStorage, - Clock clock) { - super(networkNode, broadcaster, appendOnlyDataStoreService, protectedDataStoreService, resourceDataStoreService, sequenceNumberMapStorage, clock); - - this.maxSequenceNumberMapSizeBeforePurge = 5; - } - } - TestState() { this.mockBroadcaster = mock(Broadcaster.class); this.mockSeqNrStorage = mock(Storage.class); this.clockFake = new ClockFake(); + this.protectedDataStoreService = new ProtectedDataStoreService(); - this.mockedStorage = new P2PDataStorageForTest(mock(NetworkNode.class), + this.mockedStorage = new P2PDataStorage(mock(NetworkNode.class), this.mockBroadcaster, new AppendOnlyDataStoreServiceFake(), - new ProtectedDataStoreServiceFake(), mock(ResourceDataStoreService.class), - this.mockSeqNrStorage, this.clockFake); + this.protectedDataStoreService, mock(ResourceDataStoreService.class), + this.mockSeqNrStorage, this.clockFake, MAX_SEQUENCE_NUMBER_MAP_SIZE_BEFORE_PURGE); this.appendOnlyDataStoreListener = mock(AppendOnlyDataStoreListener.class); - this.protectedDataStoreListener = mock(ProtectedDataStoreListener.class); this.hashMapChangedListener = mock(HashMapChangedListener.class); + this.protectedDataStoreService.addService(new MapStoreServiceFake()); + + this.mockedStorage = createP2PDataStorageForTest( + this.mockBroadcaster, + this.protectedDataStoreService, + this.mockSeqNrStorage, + this.clockFake, + this.hashMapChangedListener, + this.appendOnlyDataStoreListener); + } + + + /** + * Re-initializes the in-memory data structures from the storage objects to simulate a node restarting. Important + * to note that the current TestState uses Test Doubles instead of actual disk storage so this is just "simulating" + * not running the entire storage code paths. + */ + void simulateRestart() { + when(this.mockSeqNrStorage.initAndGetPersisted(any(SequenceNumberMap.class), anyLong())) + .thenReturn(this.mockedStorage.sequenceNumberMap); - this.mockedStorage.addHashMapChangedListener(this.hashMapChangedListener); - this.mockedStorage.addAppendOnlyDataStoreListener(this.appendOnlyDataStoreListener); - this.mockedStorage.addProtectedDataStoreListener(this.protectedDataStoreListener); + this.mockedStorage = createP2PDataStorageForTest( + this.mockBroadcaster, + this.protectedDataStoreService, + this.mockSeqNrStorage, + this.clockFake, + this.hashMapChangedListener, + this.appendOnlyDataStoreListener); + } + + private static P2PDataStorage createP2PDataStorageForTest( + Broadcaster broadcaster, + ProtectedDataStoreService protectedDataStoreService, + Storage sequenceNrMapStorage, + ClockFake clock, + HashMapChangedListener hashMapChangedListener, + AppendOnlyDataStoreListener appendOnlyDataStoreListener) { + + P2PDataStorage p2PDataStorage = new P2PDataStorage(mock(NetworkNode.class), + broadcaster, + new AppendOnlyDataStoreServiceFake(), + protectedDataStoreService, mock(ResourceDataStoreService.class), + sequenceNrMapStorage, clock, MAX_SEQUENCE_NUMBER_MAP_SIZE_BEFORE_PURGE); + + // Currently TestState only supports reading ProtectedStorageEntries off disk. + p2PDataStorage.readFromResources("unused"); + p2PDataStorage.readPersisted(); + + p2PDataStorage.addHashMapChangedListener(hashMapChangedListener); + p2PDataStorage.addAppendOnlyDataStoreListener(appendOnlyDataStoreListener); + + return p2PDataStorage; } private void resetState() { reset(this.mockBroadcaster); reset(this.appendOnlyDataStoreListener); - reset(this.protectedDataStoreListener); reset(this.hashMapChangedListener); reset(this.mockSeqNrStorage); } @@ -177,24 +208,14 @@ void verifyProtectedStorageAdd(SavedTestState beforeState, boolean expectedStateChange, boolean expectedIsDataOwner) { P2PDataStorage.ByteArray hashMapHash = P2PDataStorage.get32ByteHashAsByteArray(protectedStorageEntry.getProtectedStoragePayload()); - P2PDataStorage.ByteArray storageHash = P2PDataStorage.getCompactHashAsByteArray(protectedStorageEntry.getProtectedStoragePayload()); if (expectedStateChange) { Assert.assertEquals(protectedStorageEntry, this.mockedStorage.getMap().get(hashMapHash)); - // PersistablePayload payloads need to be written to disk and listeners signaled... unless the hash already exists in the protectedDataStore. - // Note: this behavior is different from the HashMap listeners that are signaled on an increase in seq #, even if the hash already exists. - // TODO: Should the behavior be identical between this and the HashMap listeners? - // TODO: Do we want ot overwrite stale values in order to persist updated sequence numbers and timestamps? - if (protectedStorageEntry.getProtectedStoragePayload() instanceof PersistablePayload && beforeState.protectedStorageEntryBeforeOpDataStoreMap == null) { - Assert.assertEquals(protectedStorageEntry, this.mockedStorage.getProtectedDataStoreMap().get(storageHash)); - verify(this.protectedDataStoreListener).onAdded(protectedStorageEntry); - } else { - Assert.assertEquals(beforeState.protectedStorageEntryBeforeOpDataStoreMap, this.mockedStorage.getProtectedDataStoreMap().get(storageHash)); - verify(this.protectedDataStoreListener, never()).onAdded(protectedStorageEntry); - } + if (protectedStorageEntry.getProtectedStoragePayload() instanceof PersistablePayload) + Assert.assertEquals(protectedStorageEntry, this.protectedDataStoreService.getMap().get(hashMapHash)); - verify(this.hashMapChangedListener).onAdded(protectedStorageEntry); + verify(this.hashMapChangedListener).onAdded(Collections.singletonList(protectedStorageEntry)); final ArgumentCaptor captor = ArgumentCaptor.forClass(BroadcastMessage.class); verify(this.mockBroadcaster).broadcast(captor.capture(), any(NodeAddress.class), @@ -207,55 +228,86 @@ void verifyProtectedStorageAdd(SavedTestState beforeState, this.verifySequenceNumberMapWriteContains(P2PDataStorage.get32ByteHashAsByteArray(protectedStorageEntry.getProtectedStoragePayload()), protectedStorageEntry.getSequenceNumber()); } else { Assert.assertEquals(beforeState.protectedStorageEntryBeforeOp, this.mockedStorage.getMap().get(hashMapHash)); - Assert.assertEquals(beforeState.protectedStorageEntryBeforeOpDataStoreMap, this.mockedStorage.getProtectedDataStoreMap().get(storageHash)); + Assert.assertEquals(beforeState.protectedStorageEntryBeforeOpDataStoreMap, this.protectedDataStoreService.getMap().get(hashMapHash)); verify(this.mockBroadcaster, never()).broadcast(any(BroadcastMessage.class), any(NodeAddress.class), any(BroadcastHandler.Listener.class), anyBoolean()); // Internal state didn't change... nothing should be notified - verify(this.hashMapChangedListener, never()).onAdded(protectedStorageEntry); - verify(this.protectedDataStoreListener, never()).onAdded(protectedStorageEntry); + verify(this.hashMapChangedListener, never()).onAdded(Collections.singletonList(protectedStorageEntry)); verify(this.mockSeqNrStorage, never()).queueUpForSave(any(SequenceNumberMap.class), anyLong()); } } void verifyProtectedStorageRemove(SavedTestState beforeState, ProtectedStorageEntry protectedStorageEntry, - boolean expectedStateChange, - boolean expectedBroadcastOnStateChange, - boolean expectedSeqNrWriteOnStateChange, + boolean expectedHashMapAndDataStoreUpdated, + boolean expectedListenersSignaled, + boolean expectedBroadcast, + boolean expectedSeqNrWrite, boolean expectedIsDataOwner) { - P2PDataStorage.ByteArray hashMapHash = P2PDataStorage.get32ByteHashAsByteArray(protectedStorageEntry.getProtectedStoragePayload()); - P2PDataStorage.ByteArray storageHash = P2PDataStorage.getCompactHashAsByteArray(protectedStorageEntry.getProtectedStoragePayload()); - if (expectedStateChange) { - Assert.assertNull(this.mockedStorage.getMap().get(hashMapHash)); + verifyProtectedStorageRemove(beforeState, Collections.singletonList(protectedStorageEntry), + expectedHashMapAndDataStoreUpdated, expectedListenersSignaled, expectedBroadcast, + expectedSeqNrWrite, expectedIsDataOwner); + } - if (protectedStorageEntry.getProtectedStoragePayload() instanceof PersistablePayload) { - Assert.assertNull(this.mockedStorage.getProtectedDataStoreMap().get(storageHash)); + void verifyProtectedStorageRemove(SavedTestState beforeState, + Collection protectedStorageEntries, + boolean expectedHashMapAndDataStoreUpdated, + boolean expectedListenersSignaled, + boolean expectedBroadcast, + boolean expectedSeqNrWrite, + boolean expectedIsDataOwner) { - verify(this.protectedDataStoreListener).onRemoved(protectedStorageEntry); - } + // The default matcher expects orders to stay the same. So, create a custom matcher function since + // we don't care about the order. + if (expectedListenersSignaled) { + final ArgumentCaptor> argument = ArgumentCaptor.forClass(Collection.class); + verify(this.hashMapChangedListener).onRemoved(argument.capture()); - verify(this.hashMapChangedListener).onRemoved(protectedStorageEntry); + Set actual = new HashSet<>(argument.getValue()); + Set expected = new HashSet<>(protectedStorageEntries); + + // Ensure we didn't remove duplicates + Assert.assertEquals(protectedStorageEntries.size(), expected.size()); + Assert.assertEquals(argument.getValue().size(), actual.size()); + Assert.assertEquals(expected, actual); + } else { + verify(this.hashMapChangedListener, never()).onRemoved(any()); + } - if (expectedSeqNrWriteOnStateChange) - this.verifySequenceNumberMapWriteContains(P2PDataStorage.get32ByteHashAsByteArray(protectedStorageEntry.getProtectedStoragePayload()), protectedStorageEntry.getSequenceNumber()); + if (!expectedSeqNrWrite) + verify(this.mockSeqNrStorage, never()).queueUpForSave(any(SequenceNumberMap.class), anyLong()); + + if (!expectedBroadcast) + verify(this.mockBroadcaster, never()).broadcast(any(BroadcastMessage.class), any(NodeAddress.class), any(BroadcastHandler.Listener.class), anyBoolean()); + + + protectedStorageEntries.forEach(protectedStorageEntry -> { + P2PDataStorage.ByteArray hashMapHash = P2PDataStorage.get32ByteHashAsByteArray(protectedStorageEntry.getProtectedStoragePayload()); - if (expectedBroadcastOnStateChange) { + if (expectedSeqNrWrite) + this.verifySequenceNumberMapWriteContains(P2PDataStorage.get32ByteHashAsByteArray( + protectedStorageEntry.getProtectedStoragePayload()), protectedStorageEntry.getSequenceNumber()); + + if (expectedBroadcast) { if (protectedStorageEntry instanceof ProtectedMailboxStorageEntry) verify(this.mockBroadcaster).broadcast(any(RemoveMailboxDataMessage.class), any(NodeAddress.class), eq(null), eq(expectedIsDataOwner)); else verify(this.mockBroadcaster).broadcast(any(RemoveDataMessage.class), any(NodeAddress.class), eq(null), eq(expectedIsDataOwner)); } - } else { - Assert.assertEquals(beforeState.protectedStorageEntryBeforeOp, this.mockedStorage.getMap().get(hashMapHash)); - verify(this.mockBroadcaster, never()).broadcast(any(BroadcastMessage.class), any(NodeAddress.class), any(BroadcastHandler.Listener.class), anyBoolean()); - verify(this.hashMapChangedListener, never()).onAdded(protectedStorageEntry); - verify(this.protectedDataStoreListener, never()).onAdded(protectedStorageEntry); - verify(this.mockSeqNrStorage, never()).queueUpForSave(any(SequenceNumberMap.class), anyLong()); - } + if (expectedHashMapAndDataStoreUpdated) { + Assert.assertNull(this.mockedStorage.getMap().get(hashMapHash)); + + if (protectedStorageEntry.getProtectedStoragePayload() instanceof PersistablePayload) + Assert.assertNull(this.protectedDataStoreService.getMap().get(hashMapHash)); + + } else { + Assert.assertEquals(beforeState.protectedStorageEntryBeforeOp, this.mockedStorage.getMap().get(hashMapHash)); + } + }); } void verifyRefreshTTL(SavedTestState beforeState, @@ -352,11 +404,10 @@ private SavedTestState(TestState testState, PersistableNetworkPayload persistabl private SavedTestState(TestState testState, ProtectedStorageEntry protectedStorageEntry) { this(testState); - P2PDataStorage.ByteArray storageHash = P2PDataStorage.getCompactHashAsByteArray(protectedStorageEntry.getProtectedStoragePayload()); - this.protectedStorageEntryBeforeOpDataStoreMap = testState.mockedStorage.getProtectedDataStoreMap().get(storageHash); - P2PDataStorage.ByteArray hashMapHash = P2PDataStorage.get32ByteHashAsByteArray(protectedStorageEntry.getProtectedStoragePayload()); this.protectedStorageEntryBeforeOp = testState.mockedStorage.getMap().get(hashMapHash); + this.protectedStorageEntryBeforeOpDataStoreMap = testState.protectedDataStoreService.getMap().get(hashMapHash); + this.creationTimestampBeforeUpdate = (this.protectedStorageEntryBeforeOp != null) ? this.protectedStorageEntryBeforeOp.getCreationTimeStamp() : 0; } diff --git a/p2p/src/test/java/bisq/network/p2p/storage/mocks/ProtectedDataStoreServiceFake.java b/p2p/src/test/java/bisq/network/p2p/storage/mocks/MapStoreServiceFake.java similarity index 55% rename from p2p/src/test/java/bisq/network/p2p/storage/mocks/ProtectedDataStoreServiceFake.java rename to p2p/src/test/java/bisq/network/p2p/storage/mocks/MapStoreServiceFake.java index 717a3b806b4..9f282497d8a 100644 --- a/p2p/src/test/java/bisq/network/p2p/storage/mocks/ProtectedDataStoreServiceFake.java +++ b/p2p/src/test/java/bisq/network/p2p/storage/mocks/MapStoreServiceFake.java @@ -19,33 +19,52 @@ import bisq.network.p2p.storage.P2PDataStorage; import bisq.network.p2p.storage.payload.ProtectedStorageEntry; -import bisq.network.p2p.storage.persistence.ProtectedDataStoreService; +import bisq.network.p2p.storage.persistence.MapStoreService; + +import bisq.common.proto.persistable.PersistableEnvelope; +import bisq.common.proto.persistable.PersistablePayload; +import bisq.common.storage.Storage; + +import java.io.File; import java.util.HashMap; import java.util.Map; +import lombok.Getter; + +import static org.mockito.Mockito.mock; + /** - * Implementation of an in-memory ProtectedDataStoreService that can be used in tests. Removes overhead + * Implementation of an in-memory MapStoreService that can be used in tests. Removes overhead * involving files, resources, and services for tests that don't need it. * * @see Reference */ -public class ProtectedDataStoreServiceFake extends ProtectedDataStoreService { +public class MapStoreServiceFake extends MapStoreService { + @Getter private final Map map; - public ProtectedDataStoreServiceFake() { - super(); - map = new HashMap<>(); + public MapStoreServiceFake() { + super(mock(File.class), mock(Storage.class)); + this.map = new HashMap<>(); } - public Map getMap() { - return map; + @Override + public String getFileName() { + return null; } - public void put(P2PDataStorage.ByteArray hashAsByteArray, ProtectedStorageEntry entry) { - map.put(hashAsByteArray, entry); + @Override + protected PersistableEnvelope createStore() { + return null; } - public ProtectedStorageEntry remove(P2PDataStorage.ByteArray hash, ProtectedStorageEntry protectedStorageEntry) { - return map.remove(hash); + + @Override + public boolean canHandle(PersistablePayload payload) { + return true; + } + + protected void readFromResources(String postFix) { + // do nothing. This Fake only supports in-memory storage. } } diff --git a/p2p/src/test/java/bisq/network/p2p/storage/mocks/ProtectedStoragePayloadStub.java b/p2p/src/test/java/bisq/network/p2p/storage/mocks/ProtectedStoragePayloadStub.java index d6be958138f..5d9a9d3784f 100644 --- a/p2p/src/test/java/bisq/network/p2p/storage/mocks/ProtectedStoragePayloadStub.java +++ b/p2p/src/test/java/bisq/network/p2p/storage/mocks/ProtectedStoragePayloadStub.java @@ -45,7 +45,7 @@ public class ProtectedStoragePayloadStub implements ProtectedStoragePayload { @Getter private PublicKey ownerPubKey; - protected Message messageMock; + protected final Message messageMock; public ProtectedStoragePayloadStub(PublicKey ownerPubKey) { this.ownerPubKey = ownerPubKey;