Skip to content

Commit

Permalink
[#778] core(feat): Support Tree lock (#1264)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Add the tree-like locks to enhance performance. 

### Why are the changes needed?

Add the tree-like locks to enhance performance. 

Fix: #778 

### Does this PR introduce _any_ user-facing change?

N/A

### How was this patch tested?

UT, `TestLockManager`
  • Loading branch information
yuqi1129 authored Feb 1, 2024
1 parent 8f9c26d commit 513d591
Show file tree
Hide file tree
Showing 9 changed files with 1,327 additions and 1 deletion.
1 change: 1 addition & 0 deletions api/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ plugins {
dependencies {
implementation(libs.guava)
implementation(libs.slf4j.api)
implementation(libs.commons.lang3)

testImplementation(libs.junit.jupiter.api)
testImplementation(libs.junit.jupiter.params)
Expand Down
3 changes: 2 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ project.extra["extraJvmArgs"] = if (extra["jdkVersion"] in listOf("8", "11")) {
"--add-opens", "java.base/sun.nio.cs=ALL-UNNAMED",
"--add-opens", "java.base/sun.security.action=ALL-UNNAMED",
"--add-opens", "java.base/sun.util.calendar=ALL-UNNAMED",
"--add-opens", "java.security.jgss/sun.security.krb5=ALL-UNNAMED"
"--add-opens", "java.security.jgss/sun.security.krb5=ALL-UNNAMED",
"--add-opens", "java.base/java.lang.reflect=ALL-UNNAMED"
)
}

Expand Down
29 changes: 29 additions & 0 deletions core/src/main/java/com/datastrato/gravitino/Configs.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ public interface Configs {
String DEFAULT_KV_ROCKSDB_BACKEND_PATH =
String.join(File.separator, System.getenv("GRAVITINO_HOME"), "data", "rocksdb");

long MAX_NODE_IN_MEMORY = 100000L;

long MIN_NODE_IN_MEMORY = 1000L;

long CLEAN_INTERVAL_IN_SECS = 60L;

ConfigEntry<String> ENTITY_STORE =
new ConfigBuilder(ENTITY_STORE_KEY)
.doc("Which storage implementation to use")
Expand Down Expand Up @@ -88,4 +94,27 @@ public interface Configs {
.version("0.3.0")
.longConf()
.createWithDefault(DEFAULT_KV_DELETE_AFTER_TIME);

// The followings are configurations for tree lock

ConfigEntry<Long> TREE_LOCK_MAX_NODE_IN_MEMORY =
new ConfigBuilder("gravitino.lock.maxNodes")
.doc("The maximum number of tree lock nodes to keep in memory")
.version("0.4.0")
.longConf()
.createWithDefault(MAX_NODE_IN_MEMORY);

ConfigEntry<Long> TREE_LOCK_MIN_NODE_IN_MEMORY =
new ConfigBuilder("gravitino.lock.minNodes")
.doc("The minimum number of tree lock nodes to keep in memory")
.version("0.4.0")
.longConf()
.createWithDefault(MIN_NODE_IN_MEMORY);

ConfigEntry<Long> TREE_LOCK_CLEAN_INTERVAL =
new ConfigBuilder("gravitino.lock.cleanIntervalInSecs")
.doc("The interval in seconds to clean up the stale tree lock nodes")
.version("0.4.0")
.longConf()
.createWithDefault(CLEAN_INTERVAL_IN_SECS);
}
279 changes: 279 additions & 0 deletions core/src/main/java/com/datastrato/gravitino/lock/LockManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,279 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.gravitino.lock;

import static com.datastrato.gravitino.Configs.TREE_LOCK_CLEAN_INTERVAL;
import static com.datastrato.gravitino.Configs.TREE_LOCK_MAX_NODE_IN_MEMORY;
import static com.datastrato.gravitino.Configs.TREE_LOCK_MIN_NODE_IN_MEMORY;

import com.datastrato.gravitino.Config;
import com.datastrato.gravitino.NameIdentifier;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* LockManager is a lock manager that manages the tree locks. It will serve as a factory to create
* the tree lock and do the cleanup for the stale tree lock nodes. For more, please refer to {@link
* TreeLock} and {@link TreeLockNode}.
*
* <p>It has two main functions: 1. Create the tree lock. 2. Clean up the stale tree lock nodes
* shared by all tree lock instances.
*/
public class LockManager {
private static final Logger LOG = LoggerFactory.getLogger(LockManager.class);

static final NameIdentifier ROOT = NameIdentifier.of("/");

@VisibleForTesting TreeLockNode treeLockRootNode;
final AtomicLong totalNodeCount = new AtomicLong(1);

// The maximum number of tree lock nodes to keep in memory. If the total node count is greater
// than this value, we will do the cleanup.
long maxTreeNodeInMemory;
// If the total node count is less than this value, we will not do the cleanup.
@VisibleForTesting long minTreeNodeInMemory;

// The interval in seconds to clean up the stale tree lock nodes.
@VisibleForTesting long cleanTreeNodeIntervalInSecs;

private void initParameters(Config config) {
long maxNodesInMemory = config.get(TREE_LOCK_MAX_NODE_IN_MEMORY);
if (maxNodesInMemory <= 0) {
throw new IllegalArgumentException(
String.format(
"The maximum number of tree lock nodes '%d' should be greater than 0",
maxNodesInMemory));
}

long minNodesInMemory = config.get(TREE_LOCK_MIN_NODE_IN_MEMORY);
if (minNodesInMemory <= 0) {
throw new IllegalArgumentException(
String.format(
"The minimum number of tree lock nodes '%d' should be greater than 0",
minNodesInMemory));
}

if (maxNodesInMemory <= minNodesInMemory) {
throw new IllegalArgumentException(
String.format(
"The maximum number of tree lock nodes '%d' should be greater than the minimum number of tree lock nodes '%d'",
maxNodesInMemory, minNodesInMemory));
}
this.maxTreeNodeInMemory = maxNodesInMemory;
this.minTreeNodeInMemory = minNodesInMemory;

long cleanIntervalInSecs = config.get(TREE_LOCK_CLEAN_INTERVAL);
if (cleanIntervalInSecs <= 0) {
throw new IllegalArgumentException(
String.format(
"The interval in seconds to clean up the stale tree lock nodes '%d' should be greater than 0",
cleanIntervalInSecs));
}

this.cleanTreeNodeIntervalInSecs = cleanIntervalInSecs;
}

private void startDeadLockChecker() {
ScheduledThreadPoolExecutor deadLockChecker =
new ScheduledThreadPoolExecutor(
1,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("tree-lock-dead-lock-checker-%d")
.build());

deadLockChecker.scheduleAtFixedRate(
() -> {
LOG.info("Start to check the dead lock...");
checkDeadLock(treeLockRootNode);
LOG.info("Finish to check the dead lock...");
},
0,
60,
TimeUnit.SECONDS);
}

/**
* Check the deadlock for the given root node.
*
* @param node The root node to check.
*/
void checkDeadLock(TreeLockNode node) {
// Check child first
node.getAllChildren().forEach(this::checkDeadLock);

// Check self
node.getHoldingThreadTimestamp()
.forEach(
(thread, ts) -> {
// If the thread is holding the lock for more than 30 seconds, we will log it.
if (System.currentTimeMillis() - ts > 30000) {
LOG.warn(
"Dead lock detected for thread {} on node {}, threads that holding the node: {} ",
thread,
node,
node.getHoldingThreadTimestamp());
}
});
}

private void startNodeCleaner() {
ScheduledThreadPoolExecutor lockCleaner =
new ScheduledThreadPoolExecutor(
1,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("tree-lock-cleaner-%d")
.build());

lockCleaner.scheduleAtFixedRate(
() -> {
long nodeCount = totalNodeCount.get();
LOG.info("Total tree lock node count: {}", nodeCount);
// If the total node count is greater than the maxTreeNodeInMemory * 0.5, we will do the
// clear up in case of the memory explosion.
if (nodeCount > maxTreeNodeInMemory * 0.5) {
StopWatch watch = StopWatch.createStarted();
LOG.trace("Start to clean up the stale tree lock nodes...");
treeLockRootNode
.getAllChildren()
.forEach(child -> evictStaleNodes(child, treeLockRootNode));
LOG.info(
"Finish to clean up the stale tree lock nodes, cost: {}, after clean node count: {}",
watch.getTime(),
totalNodeCount.get());
}
},
cleanTreeNodeIntervalInSecs,
cleanTreeNodeIntervalInSecs,
TimeUnit.SECONDS);
}

public LockManager(Config config) {
treeLockRootNode = new TreeLockNode(ROOT.name());

// Init the parameters.
initParameters(config);

// Start tree lock cleaner.
startNodeCleaner();

// Start deadlock checker.
startDeadLockChecker();
}

/**
* Evict the stale nodes from the tree lock node.
*
* @param treeNode The tree lock node to evict.
* @param parent The parent of the tree lock node.
*/
@VisibleForTesting
void evictStaleNodes(TreeLockNode treeNode, TreeLockNode parent) {
// We will not evict the node tree if the total node count is less than the
// MIN_TREE_NODE_IN_MEMORY.
// Do not need to consider thread-safe issues.
if (totalNodeCount.get() < minTreeNodeInMemory) {
return;
}

// Handle from leaf nodes first.
treeNode.getAllChildren().forEach(child -> evictStaleNodes(child, treeNode));

// Handle self node.
if (treeNode.getReference() == 0) {
synchronized (parent) {
// Once goes here, the parent node has been locked, so the reference of child (treeNode)
// could not be changed.
if (treeNode.getReference() == 0) {
parent.removeChild(treeNode.getName());
long leftNodeCount = totalNodeCount.decrementAndGet();
LOG.trace(
"Evict stale tree lock node '{}', current left nodes '{}'",
treeNode.getName(),
leftNodeCount);
}
}
}
}

/**
* Create a tree lock with the given identifier.
*
* @param identifier The identifier of the tree lock.
* @return The created tree lock.
*/
public TreeLock createTreeLock(NameIdentifier identifier) {
checkTreeNodeIsFull();

List<TreeLockNode> treeLockNodes = Lists.newArrayList();
try {
TreeLockNode lockNode = treeLockRootNode;
lockNode.addReference();
treeLockNodes.add(lockNode);

if (identifier == ROOT) {
// The lock tree root node
return new TreeLock(treeLockNodes, identifier);
}

String[] levels = identifier.namespace().levels();
levels = ArrayUtils.add(levels, identifier.name());

TreeLockNode child;
for (String level : levels) {
synchronized (lockNode) {
Pair<TreeLockNode, Boolean> pair = lockNode.getOrCreateChild(level);
child = pair.getKey();
// If the child node is newly created, we should increase the total node counts.
if (pair.getValue()) {
totalNodeCount.incrementAndGet();
}
}
treeLockNodes.add(child);
lockNode = child;
}

return new TreeLock(treeLockNodes, identifier);
} catch (Exception e) {
LOG.error("Failed to create tree lock {}", identifier, e);
// Release reference if fails.
for (TreeLockNode node : treeLockNodes) {
node.decReference();
}

throw e;
}
}

/**
* Check if the total node count is greater than the maxTreeNodeInMemory, if so, we should throw
* an exception.
*/
private void checkTreeNodeIsFull() {
// If the total node count is greater than the max node counts, in case of memory
// leak and explosion, we should throw an exception.
long currentNodeCount = totalNodeCount.get();
if (currentNodeCount > maxTreeNodeInMemory) {
throw new IllegalStateException(
"The total node count '"
+ currentNodeCount
+ "' has reached the max node count '"
+ maxTreeNodeInMemory
+ "', please increase the max node count or wait for a while to avoid the performance issue.");
}
}
}
18 changes: 18 additions & 0 deletions core/src/main/java/com/datastrato/gravitino/lock/LockType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.gravitino.lock;

/**
* Type of the lock for tree lock. READ lock is a shared lock, while WRITE lock is an exclusive
* lock.
*
* <p>It's possible to acquire multiple READ locks at the same time, but only one WRITE lock can be
* acquired at a time. Please see {@link java.util.concurrent.locks.ReadWriteLock} for more details.
*/
public enum LockType {
READ,
WRITE
}
Loading

0 comments on commit 513d591

Please sign in to comment.