Skip to content

Commit

Permalink
try layered mechanism
Browse files Browse the repository at this point in the history
Signed-off-by: Karim TAAM <[email protected]>
  • Loading branch information
matkt committed Feb 24, 2023
1 parent 56a0a1f commit 2b94a9a
Show file tree
Hide file tree
Showing 10 changed files with 309 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,25 @@ public boolean containWorlStateStorage(final Hash blockHash) {
public Optional<BonsaiSnapshotWorldStateKeyValueStorage> getWorldStateStorage(
final Hash blockHash) {
if (cachedWorldStatesByHash.containsKey(blockHash)) {
return Optional.ofNullable(cachedWorldStatesByHash.get(blockHash).clone());
BonsaiSnapshotWorldStateKeyValueStorage worldStateKeyValueStorage = cachedWorldStatesByHash.get(blockHash);
BonsaiSnapshotWorldStateKeyValueStorage clone = cachedWorldStatesByHash.get(blockHash).clone();
System.out.println("getWorldStateStorage "+blockHash+" "+worldStateKeyValueStorage.getWorldStateBlockHash()+" "+clone.getWorldStateBlockHash());
return Optional.ofNullable(clone);
}
return Optional.empty();
}

@Override
public Optional<BonsaiSnapshotWorldStateKeyValueStorage> getNearestWorldStateStorage(final long blockNumber) {
Optional<BonsaiSnapshotWorldStateKeyValueStorage> nearest = Optional.empty();
for (BonsaiSnapshotWorldStateKeyValueStorage found : cachedWorldStatesByHash.values()) {
if(nearest.isEmpty() || Math.abs(blockNumber-nearest.get().getBlockNumber())>Math.abs(blockNumber-found.getBlockNumber())){
nearest = Optional.of(found);
}
}
return nearest;
}

@Override
public long getMaxLayersToLoad() {
return maxLayersToLoad;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,11 +524,5 @@ private StoredMerklePatriciaTrie<Bytes, Bytes> createTrie(
}

@Override
public void close() {
try {
this.worldStateStorage.close();
} catch (Exception e) {
// no op
}
}
public void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,7 @@ public Optional<MutableWorldState> getMutable(
.getWorldStateStorage(blockHeader.getHash())
.or(
() ->
worldStateStorage
.getWorldStateBlockHash()
.flatMap(trieLogManager::getWorldStateStorage))
trieLogManager.getNearestWorldStateStorage(blockHeader.getNumber()))
.orElse(
new BonsaiSnapshotWorldStateKeyValueStorage(
chainHeadBlockNumber, worldStateStorage));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.BlockHeader;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.tuweni.bytes.Bytes32;
import org.slf4j.Logger;
Expand All @@ -44,7 +44,7 @@ public CachedSnapshotWorldstateManager(
final Blockchain blockchain,
final BonsaiWorldStateKeyValueStorage worldStateStorage,
final long maxLayersToLoad) {
this(blockchain, worldStateStorage, maxLayersToLoad, new HashMap<>());
this(blockchain, worldStateStorage, maxLayersToLoad, new ConcurrentHashMap<>());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ void addCachedLayer(

Optional<BonsaiSnapshotWorldStateKeyValueStorage> getWorldStateStorage(final Hash blockHash);

Optional<BonsaiSnapshotWorldStateKeyValueStorage> getNearestWorldStateStorage(final long blockNumber);

long getMaxLayersToLoad();

Optional<TrieLogLayer> getTrieLogLayer(final Hash blockHash);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright Hyperledger Besu Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*
*/
package org.hyperledger.besu.plugin.services.storage.rocksdb.segmented;

import org.hyperledger.besu.plugin.services.storage.KeyValueStorageTransaction;

import java.util.Optional;

/** The Rocks db snapshot transaction. */
public interface KeyValueSnapshotTransaction extends KeyValueStorageTransaction, AutoCloseable {

/**
* Get data against given key.
*
* @param key the key
* @return the optional data
*/
Optional<byte[]> get(final byte[] key);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright Hyperledger Besu Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*
*/
package org.hyperledger.besu.plugin.services.storage.rocksdb.segmented;

import static java.util.stream.Collectors.toUnmodifiableSet;

import org.hyperledger.besu.plugin.services.exception.StorageException;
import org.hyperledger.besu.plugin.services.storage.KeyValueStorageTransaction;
import org.hyperledger.besu.plugin.services.storage.SnappedKeyValueStorage;
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDBMetrics;

import java.io.IOException;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Stream;

import org.apache.commons.lang3.tuple.Pair;

/** The RocksDb columnar key value snapshot. */
public class RocksDBColumnarKeyValueLayeredSnapshot implements SnappedKeyValueStorage {

private final RocksDBMetrics metrics;
final RocksDBLayeredSnapshotTransaction snapTx;

public RocksDBColumnarKeyValueLayeredSnapshot(
final RocksDBMetrics metrics, final RocksDBLayeredSnapshotTransaction snapTx) {
this.metrics = metrics;
this.snapTx = snapTx.copy();
}

public RocksDBColumnarKeyValueLayeredSnapshot(
final RocksDBMetrics metrics, final RocksDBSnapshotTransaction snapTx) {
this.metrics = metrics;
this.snapTx = snapTx.copy();
}

@Override
public Optional<byte[]> get(final byte[] key) throws StorageException {
return snapTx.get(key);
}

@Override
public Stream<Pair<byte[], byte[]>> stream() {
throw new UnsupportedOperationException("cannot stream");
}

@Override
public Stream<byte[]> streamKeys() {
throw new UnsupportedOperationException("cannot stream");
}

@Override
public boolean tryDelete(final byte[] key) throws StorageException {
snapTx.remove(key);
return true;
}

@Override
public Set<byte[]> getAllKeysThat(final Predicate<byte[]> returnCondition) {
return streamKeys().filter(returnCondition).collect(toUnmodifiableSet());
}

@Override
public Set<byte[]> getAllValuesFromKeysThat(final Predicate<byte[]> returnCondition) {
return stream()
.filter(pair -> returnCondition.test(pair.getKey()))
.map(Pair::getValue)
.collect(toUnmodifiableSet());
}

@Override
public KeyValueStorageTransaction startTransaction() throws StorageException {
// The use of a transaction on a transaction based key value store is dubious
// at best. return our snapshot transaction instead.
return snapTx;
}

@Override
public void clear() {
throw new UnsupportedOperationException(
"RocksDBColumnarKeyValueSnapshot does not support clear");
}

@Override
public boolean containsKey(final byte[] key) throws StorageException {
return snapTx.get(key).isPresent();
}

@Override
public void close() throws IOException {
snapTx.close();
}

@Override
public KeyValueStorageTransaction getSnapshotTransaction() {
return snapTx;
}

@Override
public SnappedKeyValueStorage cloneFromSnapshot() {
return new RocksDBColumnarKeyValueLayeredSnapshot(metrics, snapTx);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
public class RocksDBColumnarKeyValueSnapshot implements SnappedKeyValueStorage {
/** The Db. */
final OptimisticTransactionDB db;

private final RocksDBMetrics metrics;
/** The Snap tx. */
final RocksDBSnapshotTransaction snapTx;

Expand All @@ -51,15 +53,10 @@ public class RocksDBColumnarKeyValueSnapshot implements SnappedKeyValueStorage {
final RocksDbSegmentIdentifier segment,
final RocksDBMetrics metrics) {
this.db = db;
this.metrics = metrics;
this.snapTx = new RocksDBSnapshotTransaction(db, segment.get(), metrics);
}

private RocksDBColumnarKeyValueSnapshot(
final OptimisticTransactionDB db, final RocksDBSnapshotTransaction snapTx) {
this.db = db;
this.snapTx = snapTx;
}

@Override
public Optional<byte[]> get(final byte[] key) throws StorageException {
return snapTx.get(key);
Expand Down Expand Up @@ -124,6 +121,6 @@ public KeyValueStorageTransaction getSnapshotTransaction() {

@Override
public SnappedKeyValueStorage cloneFromSnapshot() {
return new RocksDBColumnarKeyValueSnapshot(db, snapTx.copy());
return new RocksDBColumnarKeyValueLayeredSnapshot(metrics, snapTx);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright Hyperledger Besu Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*
*/
package org.hyperledger.besu.plugin.services.storage.rocksdb.segmented;

import org.apache.tuweni.bytes.Bytes;
import org.hyperledger.besu.plugin.services.exception.StorageException;
import org.hyperledger.besu.plugin.services.metrics.OperationTimer;
import org.hyperledger.besu.plugin.services.storage.KeyValueStorageTransaction;
import org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDBMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;

/** The Rocks db snapshot transaction. */
public class RocksDBLayeredSnapshotTransaction implements KeyValueSnapshotTransaction, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(RocksDBLayeredSnapshotTransaction.class);

private final AtomicBoolean isClosed = new AtomicBoolean(false);

private final RocksDBMetrics metrics;

private final KeyValueSnapshotTransaction parentSnapshot;
private final Map<Bytes,Optional<byte[]>> inMemoryKeys;


RocksDBLayeredSnapshotTransaction(
final RocksDBMetrics metrics,
final KeyValueSnapshotTransaction parentSnapshot) {
this.metrics = metrics;
this.parentSnapshot = parentSnapshot;
this.inMemoryKeys = new ConcurrentHashMap<>();
}

/**
* Get data against given key.
*
* @param key the key
* @return the optional data
*/
@Override
public Optional<byte[]> get(final byte[] key) {
if (isClosed.get()) {
LOG.debug("Attempted to access closed snapshot");
return Optional.empty();
}

try (final OperationTimer.TimingContext ignored = metrics.getReadLatency().startTimer()) {
return inMemoryKeys.getOrDefault(Bytes.of(key),parentSnapshot.get(key));
}
}

@Override
public void put(final byte[] key, final byte[] value) {
if (isClosed.get()) {
LOG.debug("Attempted to access closed snapshot");
return;
}
try (final OperationTimer.TimingContext ignored = metrics.getWriteLatency().startTimer()) {
inMemoryKeys.put(Bytes.of(key), Optional.ofNullable(value));
}
}

@Override
public void remove(final byte[] key) {
if (isClosed.get()) {
LOG.debug("Attempted to access closed snapshot");
return;
}
try (final OperationTimer.TimingContext ignored = metrics.getRemoveLatency().startTimer()) {
inMemoryKeys.put(Bytes.of(key), Optional.empty());
}
}


public RocksDBLayeredSnapshotTransaction copy() {
if (isClosed.get()) {
throw new StorageException("Snapshot already closed");
}
return new RocksDBLayeredSnapshotTransaction(
metrics, this);
}

@Override
public void commit() throws StorageException {
// no op
}

@Override
public void rollback() {
inMemoryKeys.clear();
metrics.getRollbackCount().inc();
}


@Override
public void close() {
inMemoryKeys.clear();
isClosed.set(true);
}

}
Loading

0 comments on commit 2b94a9a

Please sign in to comment.