From 513d59156450eb24d75310784e5bf1506d321ae0 Mon Sep 17 00:00:00 2001 From: Qi Yu Date: Thu, 1 Feb 2024 21:00:27 +0800 Subject: [PATCH] [#778] core(feat): Support Tree lock (#1264) ### What changes were proposed in this pull request? Add the tree-like locks to enhance performance. ### Why are the changes needed? Add the tree-like locks to enhance performance. Fix: #778 ### Does this PR introduce _any_ user-facing change? N/A ### How was this patch tested? UT, `TestLockManager` --- api/build.gradle.kts | 1 + build.gradle.kts | 3 +- .../com/datastrato/gravitino/Configs.java | 29 + .../gravitino/lock/LockManager.java | 279 ++++++++ .../datastrato/gravitino/lock/LockType.java | 18 + .../datastrato/gravitino/lock/TreeLock.java | 125 ++++ .../gravitino/lock/TreeLockNode.java | 199 ++++++ .../gravitino/lock/TestLockManager.java | 646 ++++++++++++++++++ .../gravitino/lock/TestTreeLock.java | 28 + 9 files changed, 1327 insertions(+), 1 deletion(-) create mode 100644 core/src/main/java/com/datastrato/gravitino/lock/LockManager.java create mode 100644 core/src/main/java/com/datastrato/gravitino/lock/LockType.java create mode 100644 core/src/main/java/com/datastrato/gravitino/lock/TreeLock.java create mode 100644 core/src/main/java/com/datastrato/gravitino/lock/TreeLockNode.java create mode 100644 core/src/test/java/com/datastrato/gravitino/lock/TestLockManager.java create mode 100644 core/src/test/java/com/datastrato/gravitino/lock/TestTreeLock.java diff --git a/api/build.gradle.kts b/api/build.gradle.kts index 55acdc6e2f7..33c6d2fb3ea 100644 --- a/api/build.gradle.kts +++ b/api/build.gradle.kts @@ -11,6 +11,7 @@ plugins { dependencies { implementation(libs.guava) implementation(libs.slf4j.api) + implementation(libs.commons.lang3) testImplementation(libs.junit.jupiter.api) testImplementation(libs.junit.jupiter.params) diff --git a/build.gradle.kts b/build.gradle.kts index ea3cd0ab7e7..543a976f7bd 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -79,7 +79,8 @@ project.extra["extraJvmArgs"] = if (extra["jdkVersion"] in listOf("8", "11")) { "--add-opens", "java.base/sun.nio.cs=ALL-UNNAMED", "--add-opens", "java.base/sun.security.action=ALL-UNNAMED", "--add-opens", "java.base/sun.util.calendar=ALL-UNNAMED", - "--add-opens", "java.security.jgss/sun.security.krb5=ALL-UNNAMED" + "--add-opens", "java.security.jgss/sun.security.krb5=ALL-UNNAMED", + "--add-opens", "java.base/java.lang.reflect=ALL-UNNAMED" ) } diff --git a/core/src/main/java/com/datastrato/gravitino/Configs.java b/core/src/main/java/com/datastrato/gravitino/Configs.java index da032c16253..3b3d6246f69 100644 --- a/core/src/main/java/com/datastrato/gravitino/Configs.java +++ b/core/src/main/java/com/datastrato/gravitino/Configs.java @@ -25,6 +25,12 @@ public interface Configs { String DEFAULT_KV_ROCKSDB_BACKEND_PATH = String.join(File.separator, System.getenv("GRAVITINO_HOME"), "data", "rocksdb"); + long MAX_NODE_IN_MEMORY = 100000L; + + long MIN_NODE_IN_MEMORY = 1000L; + + long CLEAN_INTERVAL_IN_SECS = 60L; + ConfigEntry ENTITY_STORE = new ConfigBuilder(ENTITY_STORE_KEY) .doc("Which storage implementation to use") @@ -88,4 +94,27 @@ public interface Configs { .version("0.3.0") .longConf() .createWithDefault(DEFAULT_KV_DELETE_AFTER_TIME); + + // The followings are configurations for tree lock + + ConfigEntry TREE_LOCK_MAX_NODE_IN_MEMORY = + new ConfigBuilder("gravitino.lock.maxNodes") + .doc("The maximum number of tree lock nodes to keep in memory") + .version("0.4.0") + .longConf() + .createWithDefault(MAX_NODE_IN_MEMORY); + + ConfigEntry TREE_LOCK_MIN_NODE_IN_MEMORY = + new ConfigBuilder("gravitino.lock.minNodes") + .doc("The minimum number of tree lock nodes to keep in memory") + .version("0.4.0") + .longConf() + .createWithDefault(MIN_NODE_IN_MEMORY); + + ConfigEntry TREE_LOCK_CLEAN_INTERVAL = + new ConfigBuilder("gravitino.lock.cleanIntervalInSecs") + .doc("The interval in seconds to clean up the stale tree lock nodes") + .version("0.4.0") + .longConf() + .createWithDefault(CLEAN_INTERVAL_IN_SECS); } diff --git a/core/src/main/java/com/datastrato/gravitino/lock/LockManager.java b/core/src/main/java/com/datastrato/gravitino/lock/LockManager.java new file mode 100644 index 00000000000..e9f6724b17c --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/lock/LockManager.java @@ -0,0 +1,279 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.lock; + +import static com.datastrato.gravitino.Configs.TREE_LOCK_CLEAN_INTERVAL; +import static com.datastrato.gravitino.Configs.TREE_LOCK_MAX_NODE_IN_MEMORY; +import static com.datastrato.gravitino.Configs.TREE_LOCK_MIN_NODE_IN_MEMORY; + +import com.datastrato.gravitino.Config; +import com.datastrato.gravitino.NameIdentifier; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.List; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.time.StopWatch; +import org.apache.commons.lang3.tuple.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * LockManager is a lock manager that manages the tree locks. It will serve as a factory to create + * the tree lock and do the cleanup for the stale tree lock nodes. For more, please refer to {@link + * TreeLock} and {@link TreeLockNode}. + * + *

It has two main functions: 1. Create the tree lock. 2. Clean up the stale tree lock nodes + * shared by all tree lock instances. + */ +public class LockManager { + private static final Logger LOG = LoggerFactory.getLogger(LockManager.class); + + static final NameIdentifier ROOT = NameIdentifier.of("/"); + + @VisibleForTesting TreeLockNode treeLockRootNode; + final AtomicLong totalNodeCount = new AtomicLong(1); + + // The maximum number of tree lock nodes to keep in memory. If the total node count is greater + // than this value, we will do the cleanup. + long maxTreeNodeInMemory; + // If the total node count is less than this value, we will not do the cleanup. + @VisibleForTesting long minTreeNodeInMemory; + + // The interval in seconds to clean up the stale tree lock nodes. + @VisibleForTesting long cleanTreeNodeIntervalInSecs; + + private void initParameters(Config config) { + long maxNodesInMemory = config.get(TREE_LOCK_MAX_NODE_IN_MEMORY); + if (maxNodesInMemory <= 0) { + throw new IllegalArgumentException( + String.format( + "The maximum number of tree lock nodes '%d' should be greater than 0", + maxNodesInMemory)); + } + + long minNodesInMemory = config.get(TREE_LOCK_MIN_NODE_IN_MEMORY); + if (minNodesInMemory <= 0) { + throw new IllegalArgumentException( + String.format( + "The minimum number of tree lock nodes '%d' should be greater than 0", + minNodesInMemory)); + } + + if (maxNodesInMemory <= minNodesInMemory) { + throw new IllegalArgumentException( + String.format( + "The maximum number of tree lock nodes '%d' should be greater than the minimum number of tree lock nodes '%d'", + maxNodesInMemory, minNodesInMemory)); + } + this.maxTreeNodeInMemory = maxNodesInMemory; + this.minTreeNodeInMemory = minNodesInMemory; + + long cleanIntervalInSecs = config.get(TREE_LOCK_CLEAN_INTERVAL); + if (cleanIntervalInSecs <= 0) { + throw new IllegalArgumentException( + String.format( + "The interval in seconds to clean up the stale tree lock nodes '%d' should be greater than 0", + cleanIntervalInSecs)); + } + + this.cleanTreeNodeIntervalInSecs = cleanIntervalInSecs; + } + + private void startDeadLockChecker() { + ScheduledThreadPoolExecutor deadLockChecker = + new ScheduledThreadPoolExecutor( + 1, + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("tree-lock-dead-lock-checker-%d") + .build()); + + deadLockChecker.scheduleAtFixedRate( + () -> { + LOG.info("Start to check the dead lock..."); + checkDeadLock(treeLockRootNode); + LOG.info("Finish to check the dead lock..."); + }, + 0, + 60, + TimeUnit.SECONDS); + } + + /** + * Check the deadlock for the given root node. + * + * @param node The root node to check. + */ + void checkDeadLock(TreeLockNode node) { + // Check child first + node.getAllChildren().forEach(this::checkDeadLock); + + // Check self + node.getHoldingThreadTimestamp() + .forEach( + (thread, ts) -> { + // If the thread is holding the lock for more than 30 seconds, we will log it. + if (System.currentTimeMillis() - ts > 30000) { + LOG.warn( + "Dead lock detected for thread {} on node {}, threads that holding the node: {} ", + thread, + node, + node.getHoldingThreadTimestamp()); + } + }); + } + + private void startNodeCleaner() { + ScheduledThreadPoolExecutor lockCleaner = + new ScheduledThreadPoolExecutor( + 1, + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("tree-lock-cleaner-%d") + .build()); + + lockCleaner.scheduleAtFixedRate( + () -> { + long nodeCount = totalNodeCount.get(); + LOG.info("Total tree lock node count: {}", nodeCount); + // If the total node count is greater than the maxTreeNodeInMemory * 0.5, we will do the + // clear up in case of the memory explosion. + if (nodeCount > maxTreeNodeInMemory * 0.5) { + StopWatch watch = StopWatch.createStarted(); + LOG.trace("Start to clean up the stale tree lock nodes..."); + treeLockRootNode + .getAllChildren() + .forEach(child -> evictStaleNodes(child, treeLockRootNode)); + LOG.info( + "Finish to clean up the stale tree lock nodes, cost: {}, after clean node count: {}", + watch.getTime(), + totalNodeCount.get()); + } + }, + cleanTreeNodeIntervalInSecs, + cleanTreeNodeIntervalInSecs, + TimeUnit.SECONDS); + } + + public LockManager(Config config) { + treeLockRootNode = new TreeLockNode(ROOT.name()); + + // Init the parameters. + initParameters(config); + + // Start tree lock cleaner. + startNodeCleaner(); + + // Start deadlock checker. + startDeadLockChecker(); + } + + /** + * Evict the stale nodes from the tree lock node. + * + * @param treeNode The tree lock node to evict. + * @param parent The parent of the tree lock node. + */ + @VisibleForTesting + void evictStaleNodes(TreeLockNode treeNode, TreeLockNode parent) { + // We will not evict the node tree if the total node count is less than the + // MIN_TREE_NODE_IN_MEMORY. + // Do not need to consider thread-safe issues. + if (totalNodeCount.get() < minTreeNodeInMemory) { + return; + } + + // Handle from leaf nodes first. + treeNode.getAllChildren().forEach(child -> evictStaleNodes(child, treeNode)); + + // Handle self node. + if (treeNode.getReference() == 0) { + synchronized (parent) { + // Once goes here, the parent node has been locked, so the reference of child (treeNode) + // could not be changed. + if (treeNode.getReference() == 0) { + parent.removeChild(treeNode.getName()); + long leftNodeCount = totalNodeCount.decrementAndGet(); + LOG.trace( + "Evict stale tree lock node '{}', current left nodes '{}'", + treeNode.getName(), + leftNodeCount); + } + } + } + } + + /** + * Create a tree lock with the given identifier. + * + * @param identifier The identifier of the tree lock. + * @return The created tree lock. + */ + public TreeLock createTreeLock(NameIdentifier identifier) { + checkTreeNodeIsFull(); + + List treeLockNodes = Lists.newArrayList(); + try { + TreeLockNode lockNode = treeLockRootNode; + lockNode.addReference(); + treeLockNodes.add(lockNode); + + if (identifier == ROOT) { + // The lock tree root node + return new TreeLock(treeLockNodes, identifier); + } + + String[] levels = identifier.namespace().levels(); + levels = ArrayUtils.add(levels, identifier.name()); + + TreeLockNode child; + for (String level : levels) { + synchronized (lockNode) { + Pair pair = lockNode.getOrCreateChild(level); + child = pair.getKey(); + // If the child node is newly created, we should increase the total node counts. + if (pair.getValue()) { + totalNodeCount.incrementAndGet(); + } + } + treeLockNodes.add(child); + lockNode = child; + } + + return new TreeLock(treeLockNodes, identifier); + } catch (Exception e) { + LOG.error("Failed to create tree lock {}", identifier, e); + // Release reference if fails. + for (TreeLockNode node : treeLockNodes) { + node.decReference(); + } + + throw e; + } + } + + /** + * Check if the total node count is greater than the maxTreeNodeInMemory, if so, we should throw + * an exception. + */ + private void checkTreeNodeIsFull() { + // If the total node count is greater than the max node counts, in case of memory + // leak and explosion, we should throw an exception. + long currentNodeCount = totalNodeCount.get(); + if (currentNodeCount > maxTreeNodeInMemory) { + throw new IllegalStateException( + "The total node count '" + + currentNodeCount + + "' has reached the max node count '" + + maxTreeNodeInMemory + + "', please increase the max node count or wait for a while to avoid the performance issue."); + } + } +} diff --git a/core/src/main/java/com/datastrato/gravitino/lock/LockType.java b/core/src/main/java/com/datastrato/gravitino/lock/LockType.java new file mode 100644 index 00000000000..7a68cb54ca4 --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/lock/LockType.java @@ -0,0 +1,18 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.lock; + +/** + * Type of the lock for tree lock. READ lock is a shared lock, while WRITE lock is an exclusive + * lock. + * + *

It's possible to acquire multiple READ locks at the same time, but only one WRITE lock can be + * acquired at a time. Please see {@link java.util.concurrent.locks.ReadWriteLock} for more details. + */ +public enum LockType { + READ, + WRITE +} diff --git a/core/src/main/java/com/datastrato/gravitino/lock/TreeLock.java b/core/src/main/java/com/datastrato/gravitino/lock/TreeLock.java new file mode 100644 index 00000000000..1f8238b6d6e --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/lock/TreeLock.java @@ -0,0 +1,125 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.lock; + +import com.datastrato.gravitino.NameIdentifier; +import java.util.List; +import java.util.Stack; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * TreeLock is a lock that manages the lock process of the resource path. It will lock the whole + * path from root to the resource path. + * + *

Assuming we need to load the table `metalake.catalog.db1.table1`, the lock manager will lock + * the following + * + *

+ *   /                                    readLock
+ *   /metalake                            readLock
+ *   /metalake/catalog                    readLock
+ *   /metalake/catalog/db1                readLock
+ *   /metalake/catalog/db/table1          readLock
+ * 
+ * + * If we need to alter a table `metalake.catalog.db1.table1` (without changing the name of it), the + * lock manager will lock the following: + * + *
+ *   /                                    readLock
+ *   /metalake                            readLock
+ *   /metalake/catalog                    readLock
+ *   /metalake/catalog/db1                readLock
+ *   /metalake/catalog/db/table1          writeLock
+ * 
+ * + * When we need to rename a table or drop a table `metalake.catalog.db1.table1`, the lock manager + * will lock the following: + * + *
+ *   /                                    readLock
+ *   /metalake                            readLock
+ *   /metalake/catalog                    readLock
+ *   /metalake/catalog/db1                writeLock
+ * 
+ * + * If the lock manager fails to lock the resource path, it will release all the locks that have been + * locked in the inverse sequences it locks the resource path. + * + *

The core of {@link TreeLock} is {@link TreeLockNode}. A TreeLock will hold several tree lock + * nodes, all treeLock nodes shared by all tree lock instances will be stored in the {@link + * LockManager} and can be reused later. + */ +public class TreeLock { + public static final Logger LOG = LoggerFactory.getLogger(TreeLock.class); + + // The name identifier of the resource path. + private final NameIdentifier identifier; + // TreeLockNode to be locked + private final List lockNodes; + + // TreeLockNode that has been locked. + private final Stack heldLocks = new Stack<>(); + private LockType lockType; + + TreeLock(List lockNodes, NameIdentifier identifier) { + this.lockNodes = lockNodes; + this.identifier = identifier; + } + + /** + * Lock the tree lock with the given lock type. + * + * @param lockType The lock type to lock the tree lock. + */ + public void lock(LockType lockType) { + this.lockType = lockType; + + int length = lockNodes.size(); + for (int i = 0; i < length; i++) { + TreeLockNode treeLockNode = lockNodes.get(i); + LockType type = i == length - 1 ? lockType : LockType.READ; + treeLockNode.lock(type); + heldLocks.push(treeLockNode); + } + + LOG.trace( + "Locked the tree lock, ident: {}, lockNodes: [{}], lock type: {}", + identifier, + lockNodes, + lockType); + } + + /** Unlock the tree lock. */ + public void unlock() { + if (lockType == null) { + throw new IllegalStateException("We must lock the tree lock before unlock it."); + } + + boolean lastNode = false; + TreeLockNode current; + while (!heldLocks.isEmpty()) { + LockType type; + if (!lastNode) { + lastNode = true; + type = lockType; + } else { + type = LockType.READ; + } + + current = heldLocks.pop(); + // Unlock the node and decrease the reference count. + current.unlock(type); + } + + LOG.trace( + "Unlocked the tree lock, ident: {}, lockNodes: [{}], lock type: {}", + identifier, + lockNodes, + lockType); + } +} diff --git a/core/src/main/java/com/datastrato/gravitino/lock/TreeLockNode.java b/core/src/main/java/com/datastrato/gravitino/lock/TreeLockNode.java new file mode 100644 index 00000000000..bdb0fbbb9b8 --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/lock/TreeLockNode.java @@ -0,0 +1,199 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.lock; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Objects; +import com.google.common.collect.Lists; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.commons.lang3.tuple.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * TreeLockNode is a node in the tree lock; all tree lock nodes will be assembled to a tree + * structure, which corresponds to the resource path like name identifier space. + * + *

Each node will have a read-write lock to protect the node. The node will also have a map to + * store the children. For more, please refer to {@link TreeLock}. + */ +public class TreeLockNode { + public static final Logger LOG = LoggerFactory.getLogger(TreeLockNode.class); + private final String name; + private final ReentrantReadWriteLock readWriteLock; + @VisibleForTesting final Map childMap; + private final Map holdingThreadTimestamp = new ConcurrentHashMap<>(); + + // The reference count of this node. The reference count is used to track the number of the + // TreeLocks that are using this node. If the reference count is 0, it means that no TreeLock is + // using this node, and this node can be removed from the tree. + private final AtomicLong referenceCount = new AtomicLong(); + + protected TreeLockNode(String name) { + this.name = name; + this.readWriteLock = new ReentrantReadWriteLock(); + this.childMap = new ConcurrentHashMap<>(); + } + + public String getName() { + return name; + } + + Map getHoldingThreadTimestamp() { + return holdingThreadTimestamp; + } + + /** + * Increase the reference count of this node. The reference count should always be greater than or + * equal to 0. + */ + synchronized void addReference() { + referenceCount.getAndIncrement(); + } + + /** + * Decrease the reference count of this node. The reference count should always be greater than or + * equal to 0. + */ + synchronized void decReference() { + referenceCount.getAndDecrement(); + } + + long getReference() { + return referenceCount.get(); + } + + /** + * Lock the node with the given lock type. This method should be followed by {@link + * #unlock(LockType)}. + * + * @param lockType The lock type to lock the node. + */ + void lock(LockType lockType) { + if (lockType == LockType.READ) { + readWriteLock.readLock().lock(); + } else { + readWriteLock.writeLock().lock(); + } + + holdingThreadTimestamp.put(Thread.currentThread(), System.currentTimeMillis()); + LOG.trace( + "Node {} has been lock with '{}' lock, hold by {} at {}, current holding threads: {}", + this, + lockType, + Thread.currentThread(), + System.currentTimeMillis(), + holdingThreadTimestamp); + } + + /** + * Unlock the node with the given lock type. This method should be called after {@link + * #lock(LockType)}, and the lock type should be the same as the lock type in {@link + * #lock(LockType)}. + * + * @param lockType The lock type to unlock the node. + */ + void unlock(LockType lockType) { + if (lockType == LockType.READ) { + readWriteLock.readLock().unlock(); + } else { + readWriteLock.writeLock().unlock(); + } + + this.referenceCount.decrementAndGet(); + + long holdStartTime = holdingThreadTimestamp.remove(Thread.currentThread()); + LOG.trace( + "Node {} has been unlock with '{}' lock, hold by {} for {} ms, current holding threads: {}", + this, + lockType, + Thread.currentThread(), + System.currentTimeMillis() - holdStartTime, + holdingThreadTimestamp); + } + + /** + * Get the tree lock node by the given name. If the node doesn't exist, create a new TreeNode. + * + *

Note: This method should always be guarded by object lock. + * + * @param name The name of a resource such as entity or others. + * @return A pair of the tree lock node and a boolean value indicating whether the node is newly + * created. + */ + Pair getOrCreateChild(String name) { + boolean[] newCreated = new boolean[] {false}; + TreeLockNode childNode = + childMap.computeIfAbsent( + name, + k -> { + TreeLockNode newNode = new TreeLockNode(name); + LOG.trace("Create tree lock node '{}' as a child of '{}'", name, this.name); + newCreated[0] = true; + return newNode; + }); + + childNode.addReference(); + return Pair.of(childNode, newCreated[0]); + } + + /** + * Get all the children of this node. The returned list is unmodifiable and the order is random. + * The reason why we return a random order list is that we want to avoid the cases that the first + * child is always to be dropped firstly and the last child is always the last to be dropped, + * chances are that the first child is always the most popular one and the last child is always + * the least popular. + * + * @return The list of all the children of this node. + */ + synchronized List getAllChildren() { + List children = Lists.newArrayList(childMap.values()); + Collections.shuffle(children); + return Collections.unmodifiableList(children); + } + + /** + * Remove the child node by the given name identifier. + * + *

Note: This method should be guarded by object lock. + * + * @param name The name of a resource such as entity or others. + */ + void removeChild(String name) { + childMap.remove(name); + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("TreeLockNode{"); + sb.append("ident=").append(name).append(","); + sb.append("hashCode=").append(hashCode()); + sb.append('}'); + return sb.toString(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TreeLockNode that = (TreeLockNode) o; + return Objects.equal(name, that.name); + } + + @Override + public int hashCode() { + return Objects.hashCode(name); + } +} diff --git a/core/src/test/java/com/datastrato/gravitino/lock/TestLockManager.java b/core/src/test/java/com/datastrato/gravitino/lock/TestLockManager.java new file mode 100644 index 00000000000..17e870589ca --- /dev/null +++ b/core/src/test/java/com/datastrato/gravitino/lock/TestLockManager.java @@ -0,0 +1,646 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.lock; + +import static com.datastrato.gravitino.Configs.TREE_LOCK_CLEAN_INTERVAL; +import static com.datastrato.gravitino.Configs.TREE_LOCK_MAX_NODE_IN_MEMORY; +import static com.datastrato.gravitino.Configs.TREE_LOCK_MIN_NODE_IN_MEMORY; + +import com.datastrato.gravitino.Config; +import com.datastrato.gravitino.NameIdentifier; +import com.datastrato.gravitino.Namespace; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.CompletionService; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.Mockito; + +public class TestLockManager { + private static final String[] ENTITY_NAMES = { + "entity1", + "entity2", + "entity3", + "entity4", + "entity5", + "entity6", + "entity7", + "entity8", + "entity9", + "entity10" + }; + + private static String getName(int i) { + return "entity_" + i; + } + + static Config getConfig() { + Config config = Mockito.mock(Config.class); + Mockito.when(config.get(TREE_LOCK_MAX_NODE_IN_MEMORY)).thenReturn(100000L); + Mockito.when(config.get(TREE_LOCK_MIN_NODE_IN_MEMORY)).thenReturn(1000L); + Mockito.when(config.get(TREE_LOCK_CLEAN_INTERVAL)).thenReturn(60L); + return config; + } + + private CompletionService createCompletionService() { + ThreadPoolExecutor executor = + new ThreadPoolExecutor( + 10, + 10, + 0L, + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(), + new ThreadFactoryBuilder().setDaemon(true).build()); + + CompletionService completionService = new ExecutorCompletionService(executor); + return completionService; + } + + static NameIdentifier randomNameIdentifier() { + Random random = new Random(); + int level = random.nextInt(10); + NameIdentifier nameIdentifier; + switch (level) { + case 0: + nameIdentifier = LockManager.ROOT; + break; + case 1: + nameIdentifier = NameIdentifier.of(getName(random.nextInt(5))); + break; + case 2: + nameIdentifier = NameIdentifier.of(getName(random.nextInt(5)), getName(random.nextInt(20))); + break; + case 3: + nameIdentifier = + NameIdentifier.of( + getName(random.nextInt(5)), + getName(random.nextInt(20)), + getName(random.nextInt(30))); + break; + default: + nameIdentifier = + NameIdentifier.of( + getName(random.nextInt(5)), + getName(random.nextInt(20)), + getName(random.nextInt(30)), + getName(random.nextInt(100))); + } + + return nameIdentifier; + } + + @Test + void multipleThreadTestLockManager() throws InterruptedException, ExecutionException { + LockManager lockManager = new LockManager(getConfig()); + CompletionService completionService = createCompletionService(); + for (int i = 0; i < 10; i++) { + completionService.submit(() -> this.testLockManager(lockManager)); + } + + for (int i = 0; i < 10; i++) { + completionService.take().get(); + } + } + + int testNormalLock() throws InterruptedException { + ThreadLocalRandom threadLocalRandom = ThreadLocalRandom.current(); + ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock(); + for (int i = 0; i < 1000; i++) { + NameIdentifier identifier = randomNameIdentifier(); + int num = threadLocalRandom.nextInt(5); + LockType lockType = num >= 4 ? LockType.WRITE : LockType.READ; + // LockType lockType = LockType.values()[threadLocalRandom.nextInt(2)]; + if (lockType == LockType.WRITE) { + reentrantReadWriteLock.writeLock().lock(); + // App logic here... + Thread.sleep(1); + reentrantReadWriteLock.writeLock().unlock(); + } else { + reentrantReadWriteLock.readLock().lock(); + // App logic here... + Thread.sleep(1); + reentrantReadWriteLock.readLock().unlock(); + } + } + + return 0; + } + + int testLockManager(LockManager lockManager) throws InterruptedException { + ThreadLocalRandom threadLocalRandom = ThreadLocalRandom.current(); + for (int i = 0; i < 1000; i++) { + NameIdentifier identifier = randomNameIdentifier(); + int num = threadLocalRandom.nextInt(5); + LockType lockType = num >= 4 ? LockType.WRITE : LockType.READ; + TreeLock lock = lockManager.createTreeLock(identifier); + try { + lock.lock(lockType); + // App logic here... + Thread.sleep(1); + } catch (Exception e) { + if (e.getMessage().contains("mock")) { + return 0; + } + throw e; + } finally { + lock.unlock(); + } + } + + return 0; + } + + @Test + @Disabled + void testLockWithError() { + LockManager lockManager = new LockManager(getConfig()); + LockManager spy = Mockito.spy(lockManager); + + // one fifth (2 /10 = 0.2) of tests will fail + Mockito.doThrow(new RuntimeException("mock")) + .when(spy) + .createTreeLock(Mockito.eq(NameIdentifier.of(Namespace.of(), ENTITY_NAMES[0]))); + Mockito.doThrow(new RuntimeException("mock")) + .when(spy) + .createTreeLock(Mockito.eq(NameIdentifier.of(Namespace.of(), ENTITY_NAMES[1]))); + + CompletionService completionService = createCompletionService(); + for (int i = 0; i < 10; i++) { + completionService.submit(() -> this.testLockManager(spy)); + } + + for (int i = 0; i < 10; i++) { + Assertions.assertDoesNotThrow(() -> completionService.take().get()); + } + } + + @Disabled + @ParameterizedTest + @ValueSource(ints = {1, 2, 4, 8, 10}) + void compare(int threadCount) throws InterruptedException, ExecutionException { + LockManager lockManager = new LockManager(getConfig()); + CompletionService completionService = createCompletionService(); + for (int i = 0; i < 2; i++) { + completionService.submit(() -> this.testLockManager(lockManager)); + } + for (int i = 0; i < 2; i++) { + completionService.take().get(); + } + + long start = System.currentTimeMillis(); + for (int i = 0; i < threadCount; i++) { + completionService.submit(() -> this.testLockManager(lockManager)); + } + for (int i = 0; i < threadCount; i++) { + completionService.take().get(); + } + System.out.println("LockManager use tree lock: " + (System.currentTimeMillis() - start) + "ms"); + + start = System.currentTimeMillis(); + for (int i = 0; i < threadCount; i++) { + completionService.submit(() -> this.testNormalLock()); + } + + for (int i = 0; i < threadCount; i++) { + completionService.take().get(); + } + System.out.println( + "LockManager use normal lock: " + (System.currentTimeMillis() - start) + "ms"); + } + + @Test + void testLockCleaner() throws InterruptedException, ExecutionException { + LockManager lockManager = new LockManager(getConfig()); + Random random = new Random(); + CompletionService service = createCompletionService(); + + for (int i = 0; i < 1000; i++) { + NameIdentifier nameIdentifier = randomNameIdentifier(); + TreeLock lock = lockManager.createTreeLock(nameIdentifier); + try { + lock.lock(LockType.READ); + } finally { + lock.unlock(); + } + } + + lockManager + .treeLockRootNode + .getAllChildren() + .forEach( + child -> { + lockManager.evictStaleNodes(child, lockManager.treeLockRootNode); + }); + + Assertions.assertFalse(lockManager.treeLockRootNode.getAllChildren().isEmpty()); + + for (int i = 0; i < 10; i++) { + service.submit( + () -> { + for (int j = 0; j < 10000; j++) { + NameIdentifier nameIdentifier = randomNameIdentifier(); + TreeLock lock = lockManager.createTreeLock(nameIdentifier); + try { + lock.lock(random.nextInt(2) == 0 ? LockType.READ : LockType.WRITE); + } finally { + lock.unlock(); + } + } + return 0; + }); + } + + for (int i = 0; i < 10; i++) { + service.take().get(); + } + + // Check the lock reference + checkReferenceCount(lockManager.treeLockRootNode); + + List> futures = Lists.newArrayList(); + for (int i = 0; i < 5; i++) { + futures.add( + service.submit( + () -> { + for (int j = 0; j < 10000; j++) { + NameIdentifier nameIdentifier = randomNameIdentifier(); + TreeLock lock = lockManager.createTreeLock(nameIdentifier); + try { + lock.lock(LockType.READ); + } finally { + lock.unlock(); + } + } + return 0; + })); + } + + for (int i = 0; i < 5; i++) { + service.submit( + () -> { + lockManager + .treeLockRootNode + .getAllChildren() + .forEach( + child -> { + while (!futures.stream().allMatch(Future::isDone)) { + try { + lockManager.evictStaleNodes(child, lockManager.treeLockRootNode); + Thread.sleep(1); + } catch (Exception e) { + // Ignore + } + } + }); + return 0; + }); + } + + for (int i = 0; i < 10; i++) { + service.take().get(); + } + + checkReferenceCount(lockManager.treeLockRootNode); + } + + private void checkReferenceCount(TreeLockNode node) { + Assertions.assertEquals(0, node.getReference()); + node.getAllChildren().forEach(this::checkReferenceCount); + } + + @Test + void testNodeCountAndCleaner() throws ExecutionException, InterruptedException { + LockManager lockManager = new LockManager(getConfig()); + CompletionService service = createCompletionService(); + + Future future = + service.submit( + () -> { + for (int i = 0; i < 20000; i++) { + TreeLock treeLock = lockManager.createTreeLock(randomNameIdentifier()); + treeLock.lock(i % 2 == 0 ? LockType.WRITE : LockType.READ); + treeLock.unlock(); + } + + return 0; + }); + + future.get(); + long totalCount = lockManager.totalNodeCount.get(); + Assertions.assertTrue(totalCount > 0); + } + + @Test + void testConcurrentRead() throws InterruptedException { + LockManager lockManager = new LockManager(getConfig()); + Map stringMap = Maps.newHashMap(); + stringMap.put("total", 0); + + CompletionService service = createCompletionService(); + NameIdentifier nameIdentifier = NameIdentifier.of("a", "b", "c", "d"); + + CyclicBarrier cyclicBarrier = new CyclicBarrier(5); + // Can 2000 times ensure that the test is correct? + for (int t = 0; t < 200000; t++) { + for (int i = 0; i < 5; i++) { + service.submit( + () -> { + TreeLock treeLock = lockManager.createTreeLock(nameIdentifier); + treeLock.lock(LockType.READ); + try { + cyclicBarrier.await(); + stringMap.compute("total", (k, v) -> ++v); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + treeLock.unlock(); + } + return 0; + }); + } + + for (int i = 0; i < 5; i++) { + service.take(); + } + + int total = stringMap.get("total"); + if (total < 5) { + return; + } + + cyclicBarrier.reset(); + stringMap.put("total", 0); + } + + Assertions.fail("This should not happen..."); + } + + @Test + void testConcurrentWrite() throws InterruptedException { + LockManager lockManager = new LockManager(getConfig()); + Map stringMap = Maps.newHashMap(); + stringMap.put("total", 0); + + CompletionService service = createCompletionService(); + NameIdentifier nameIdentifier = NameIdentifier.of("a", "b", "c", "d"); + + for (int t = 0; t < 200; t++) { + for (int i = 0; i < 10; i++) { + service.submit( + () -> { + TreeLock treeLock = lockManager.createTreeLock(nameIdentifier); + treeLock.lock(LockType.WRITE); + try { + stringMap.compute("total", (k, v) -> ++v); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + treeLock.unlock(); + } + return 0; + }); + } + + for (int i = 0; i < 10; i++) { + service.take(); + } + + int total = stringMap.get("total"); + Assertions.assertEquals(10, total, "Total should always 10"); + + stringMap.put("total", 0); + } + } + + private NameIdentifier completeRandomNameIdentifier() { + Random random = new Random(); + + int level = 4; + int nameLength = 16; + String[] names = new String[level]; + for (int i = 0; i < level; i++) { + String name = ""; + for (int j = 0; j < nameLength; j++) { + int v; + while ((v = random.nextInt(123)) < 97) {} + name += ((char) v); + } + + names[i] = name; + } + + return NameIdentifier.of(names); + } + + @Test + void testNodeCount() throws InterruptedException, ExecutionException { + LockManager lockManager = new LockManager(getConfig()); + CompletionService service = createCompletionService(); + + for (int i = 0; i < 10; i++) { + service.submit( + () -> { + TreeLock treeLock; + for (int j = 0; j < 1000; j++) { + NameIdentifier nameIdentifier = completeRandomNameIdentifier(); + treeLock = lockManager.createTreeLock(nameIdentifier); + treeLock.lock(LockType.READ); + treeLock.unlock(); + } + return 0; + }); + } + + for (int i = 0; i < 10; i++) { + service.take().get(); + } + + Assertions.assertEquals(10 * 1000 * 4 + 1, lockManager.totalNodeCount.get()); + + // Pay attention to the lockCleaner thread. + lockManager + .treeLockRootNode + .getAllChildren() + .forEach(node -> lockManager.evictStaleNodes(node, lockManager.treeLockRootNode)); + Assertions.assertTrue(lockManager.totalNodeCount.get() > 1); + Assertions.assertTrue(lockManager.totalNodeCount.get() < lockManager.maxTreeNodeInMemory); + } + + private TreeLockNode getTreeNode(TreeLockNode root, int depth) { + if (depth == 0) { + return root; + } + + int i = 0; + TreeLockNode result = root; + while (i++ < depth) { + // We know it only has one child; + result = result.getAllChildren().get(0); + } + + return result; + } + + @ParameterizedTest + @ValueSource(ints = {0, 1, 2, 3}) + void testCreateTreeLock(int level) throws Exception { + LockManager lockManager = new LockManager(getConfig()); + TreeLockNode rootNode = lockManager.treeLockRootNode; + TreeLock treeLock = lockManager.createTreeLock(NameIdentifier.of("a", "b", "c", "d")); + treeLock.lock(LockType.READ); + treeLock.unlock(); + checkReferenceCount(rootNode); + + // Start to use concurrent threads to create tree lock. + TreeLockNode treeLockNode = getTreeNode(rootNode, level); + TreeLockNode spyNode = Mockito.spy(treeLockNode); + Mockito.doThrow(new RuntimeException("Mock exception")) + .when(spyNode) + .getOrCreateChild(Mockito.any()); + + if (level == 0) { + lockManager.treeLockRootNode = spyNode; + } else { + int parentLevel = level - 1; + TreeLockNode parentNode = getTreeNode(rootNode, parentLevel); + parentNode.childMap.put(treeLockNode.getName(), spyNode); + } + + CompletionService service = createCompletionService(); + int concurrentThreadCount = 1; + for (int i = 0; i < concurrentThreadCount; i++) { + service.submit( + () -> { + for (int j = 0; j < 1000; j++) { + Assertions.assertThrows( + RuntimeException.class, + () -> lockManager.createTreeLock(NameIdentifier.of("a", "b", "c", "d"))); + } + return 0; + }); + } + + for (int i = 0; i < concurrentThreadCount; i++) { + service.take().get(); + } + + TreeLockNode lockNode = lockManager.treeLockRootNode; + checkReferenceCount(lockNode); + } + + @Test + void testConfigs() { + Config config = getConfig(); + Mockito.when(config.get(TREE_LOCK_MAX_NODE_IN_MEMORY)).thenReturn(20000L); + Mockito.when(config.get(TREE_LOCK_MIN_NODE_IN_MEMORY)).thenReturn(2000L); + Mockito.when(config.get(TREE_LOCK_CLEAN_INTERVAL)).thenReturn(2000L); + + LockManager manager = new LockManager(config); + Assertions.assertEquals(20000L, manager.maxTreeNodeInMemory); + Assertions.assertEquals(2000L, manager.minTreeNodeInMemory); + Assertions.assertEquals(2000L, manager.cleanTreeNodeIntervalInSecs); + } + + @Test + void testMaxTreeNode() { + Config config = getConfig(); + Mockito.when(config.get(TREE_LOCK_MAX_NODE_IN_MEMORY)).thenReturn(2000L); + Mockito.when(config.get(TREE_LOCK_MIN_NODE_IN_MEMORY)).thenReturn(100L); + + LockManager manager = new LockManager(config); + + Assertions.assertThrows( + IllegalStateException.class, + () -> { + for (int i = 0; i < 1000; i++) { + TreeLock lock = manager.createTreeLock(completeRandomNameIdentifier()); + } + }); + } + + @Test + void testReferenceCount() throws InterruptedException, ExecutionException { + LockManager lockManager = new LockManager(getConfig()); + CompletionService service = createCompletionService(); + int concurrentThreadCount = 10; + for (int i = 0; i < concurrentThreadCount; i++) { + service.submit( + () -> { + for (int j = 0; j < 1000; j++) { + TreeLock lock = lockManager.createTreeLock(NameIdentifier.of("a", "b", "c", "d")); + lock.lock(j % 2 == 0 ? LockType.READ : LockType.WRITE); + try { + // Deliberately throw an exception here. + int a = 1 / 0; + } catch (Exception e) { + // Ignore + } finally { + lock.unlock(); + } + } + return 0; + }); + } + + for (int i = 0; i < concurrentThreadCount; i++) { + service.take().get(); + } + + checkReferenceCount(lockManager.treeLockRootNode); + } + + @Test + void testDeadLockChecker() throws InterruptedException, ExecutionException { + LockManager lockManager = new LockManager(getConfig()); + CompletionService service = createCompletionService(); + int concurrentThreadCount = 9; + for (int i = 0; i < concurrentThreadCount; i++) { + service.submit( + () -> { + for (int j = 0; j < 1000; j++) { + TreeLock lock = lockManager.createTreeLock(completeRandomNameIdentifier()); + lock.lock(j % 2 == 0 ? LockType.READ : LockType.WRITE); + try { + Thread.sleep(1); + } catch (Exception e) { + // Ignore + } finally { + lock.unlock(); + } + } + return 0; + }); + } + + service.submit( + () -> { + for (int i = 0; i < 1000; i++) { + lockManager.checkDeadLock(lockManager.treeLockRootNode); + } + return 0; + }); + + for (int i = 0; i < concurrentThreadCount; i++) { + service.take().get(); + } + } +} diff --git a/core/src/test/java/com/datastrato/gravitino/lock/TestTreeLock.java b/core/src/test/java/com/datastrato/gravitino/lock/TestTreeLock.java new file mode 100644 index 00000000000..235afd3e859 --- /dev/null +++ b/core/src/test/java/com/datastrato/gravitino/lock/TestTreeLock.java @@ -0,0 +1,28 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.lock; + +import static com.datastrato.gravitino.lock.TestLockManager.getConfig; + +import org.junit.jupiter.api.Test; + +class TestTreeLock { + + @Test + void testLock() { + LockManager lockManager = new LockManager(getConfig()); + + for (int i = 0; i < 1000; i++) { + TreeLock lock = lockManager.createTreeLock(TestLockManager.randomNameIdentifier()); + lock.lock(i % 2 == 0 ? LockType.READ : LockType.WRITE); + try { + // Business logic here. + } finally { + lock.unlock(); + } + } + } +}