Skip to content

Commit

Permalink
[SPARK-48742][SS] Virtual Column Family for RocksDB
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Introducing virtual column family to RocksDB. We attach an 2-byte-Id prefix as column family identifier for each of the key row that is put into RocksDB. The encoding and decoding of the virtual column family prefix happens at the `RocksDBKeyEncoder` layer as we can pre-allocate extra 2 bytes and avoid additional memcpy.

- Remove Physical Column Family related codes as this becomes potentially dead code till some caller starts using this.
- Remove `useColumnFamilies` from `StateStoreChangelogV2` API.

### Why are the changes needed?

Currently within  the scope of the arbitrary stateful API v2 (transformWithState)  project, each state variable is stored inside one [physical column family](https://github.com/facebook/rocksdb/wiki/Column-Families) within the RocksDB state store instance. Column families are also used to implement secondary indexes for various features. Each physical column family has its own memtables, creates its own SST files, and handles  compaction independently on those independent SST files.

When the number of operations to RocksDB is relatively small and the number of column families is relatively large, the overhead of handling small SST files becomes high, especially since all of these have to be uploaded in the snapshot dir and referenced in the metadata file for the uploaded RocksDB snapshot. Using prefix to manage different key spaces / virtual column family could reduce such overheads.

### Does this PR introduce _any_ user-facing change?

No. If `useColumnFamilies` are set to true in the `StateStore.init()`, virtual column family will be used.

### How was this patch tested?

Unit tests in `RocksDBStateStoreSuite`, and integration tests in `TransformWithStateSuite`.
Moved test suites in `RocksDBSuite` into `RocksDBStateStoreSuite` because some previous verification functions are now moved into `RocksDBStateProvider`

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#47107 from jingz-db/virtual-col-family.

Lead-authored-by: jingz-db <[email protected]>
Co-authored-by: Jing Zhan <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
  • Loading branch information
2 people authored and ericm-db committed Jul 10, 2024
1 parent dca4402 commit 0493594
Show file tree
Hide file tree
Showing 8 changed files with 724 additions and 738 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ import java.util.concurrent.TimeUnit
import javax.annotation.concurrent.GuardedBy

import scala.collection.{mutable, Map}
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.jdk.CollectionConverters._
import scala.collection.mutable.ListBuffer
import scala.ref.WeakReference
import scala.util.Try

Expand Down Expand Up @@ -64,7 +63,6 @@ case object StoreMaintenance extends RocksDBOpType("store_maintenance")
* @param localRootDir Root directory in local disk that is used to working and checkpointing dirs
* @param hadoopConf Hadoop configuration for talking to the remote file system
* @param loggingId Id that will be prepended in logs for isolating concurrent RocksDBs
* @param useColumnFamilies Used to determine whether a single or multiple column families are used
*/
class RocksDB(
dfsRootDir: String,
Expand Down Expand Up @@ -141,11 +139,6 @@ class RocksDB(
dbOptions.setWriteBufferManager(writeBufferManager)
}

// Maintain mapping of column family name to handle
@GuardedBy("acquireLock")
private val colFamilyNameToHandleMap =
scala.collection.mutable.Map[String, ColumnFamilyHandle]()

private val dbLogger = createLogger() // for forwarding RocksDB native logs to log4j
dbOptions.setStatistics(new Statistics())
private val nativeStats = dbOptions.statistics()
Expand Down Expand Up @@ -318,20 +311,16 @@ class RocksDB(
var changelogReader: StateStoreChangelogReader = null
try {
changelogReader = fileManager.getChangelogReader(v, useColumnFamilies)
changelogReader.foreach { case (recordType, key, value, colFamilyName) =>
if (useColumnFamilies && !checkColFamilyExists(colFamilyName)) {
createColFamilyIfAbsent(colFamilyName, checkInternalColumnFamilies(colFamilyName))
}

changelogReader.foreach { case (recordType, key, value) =>
recordType match {
case RecordType.PUT_RECORD =>
put(key, value, colFamilyName)
put(key, value)

case RecordType.DELETE_RECORD =>
remove(key, colFamilyName)
remove(key)

case RecordType.MERGE_RECORD =>
merge(key, value, colFamilyName)
merge(key, value)
}
}
} finally {
Expand All @@ -341,145 +330,28 @@ class RocksDB(
loadedVersion = endVersion
}

/**
* Function to check if the column family exists in the state store instance.
* @param colFamilyName - name of the column family
* @return - true if the column family exists, false otherwise
*/
private def checkColFamilyExists(colFamilyName: String): Boolean = {
colFamilyNameToHandleMap.contains(colFamilyName)
}

private val multColFamiliesDisabledStr = "multiple column families disabled in " +
"RocksDBStateStoreProvider"

/**
* Function to verify invariants for column family based operations such as get, put, remove etc.
* @param operationName - name of the store operation
* @param colFamilyName - name of the column family
*/
private def verifyColFamilyOperations(
operationName: String,
colFamilyName: String): Unit = {
if (colFamilyName != StateStore.DEFAULT_COL_FAMILY_NAME) {
// if the state store instance does not support multiple column families, throw an exception
if (!useColumnFamilies) {
throw StateStoreErrors.unsupportedOperationException(operationName,
multColFamiliesDisabledStr)
}

// if the column family name is empty or contains leading/trailing whitespaces, throw an
// exception
if (colFamilyName.isEmpty || colFamilyName.trim != colFamilyName) {
throw StateStoreErrors.cannotUseColumnFamilyWithInvalidName(operationName, colFamilyName)
}

// if the column family does not exist, throw an exception
if (!checkColFamilyExists(colFamilyName)) {
throw StateStoreErrors.unsupportedOperationOnMissingColumnFamily(operationName,
colFamilyName)
}
}
}

/**
* Function to verify invariants for column family creation or deletion operations.
* @param operationName - name of the store operation
* @param colFamilyName - name of the column family
*/
private def verifyColFamilyCreationOrDeletion(
operationName: String,
colFamilyName: String,
isInternal: Boolean = false): Unit = {
// if the state store instance does not support multiple column families, throw an exception
if (!useColumnFamilies) {
throw StateStoreErrors.unsupportedOperationException(operationName,
multColFamiliesDisabledStr)
}

// if the column family name is empty or contains leading/trailing whitespaces
// or using the reserved "default" column family, throw an exception
if (colFamilyName.isEmpty
|| colFamilyName.trim != colFamilyName
|| colFamilyName == StateStore.DEFAULT_COL_FAMILY_NAME) {
throw StateStoreErrors.cannotUseColumnFamilyWithInvalidName(operationName, colFamilyName)
}

// if the column family is not internal and uses reserved characters, throw an exception
if (!isInternal && colFamilyName.charAt(0) == '_') {
throw StateStoreErrors.cannotCreateColumnFamilyWithReservedChars(colFamilyName)
}
}

/**
* Check whether the column family name is for internal column families.
* @param cfName - column family name
* @return - true if the column family is for internal use, false otherwise
*/
private def checkInternalColumnFamilies(cfName: String): Boolean = cfName.charAt(0) == '_'

/**
* Create RocksDB column family, if not created already
*/
def createColFamilyIfAbsent(colFamilyName: String, isInternal: Boolean = false): Unit = {
verifyColFamilyCreationOrDeletion("create_col_family", colFamilyName, isInternal)
if (!checkColFamilyExists(colFamilyName)) {
assert(db != null)
val descriptor = new ColumnFamilyDescriptor(colFamilyName.getBytes, columnFamilyOptions)
val handle = db.createColumnFamily(descriptor)
colFamilyNameToHandleMap(handle.getName.map(_.toChar).mkString) = handle
}
}

/**
* Remove RocksDB column family, if exists
*/
def removeColFamilyIfExists(colFamilyName: String): Boolean = {
verifyColFamilyCreationOrDeletion("remove_col_family", colFamilyName)
if (checkColFamilyExists(colFamilyName)) {
assert(db != null)
val handle = colFamilyNameToHandleMap(colFamilyName)
db.dropColumnFamily(handle)
colFamilyNameToHandleMap.remove(colFamilyName)
true
} else {
false
}
}

/**
* Get the value for the given key if present, or null.
* @note This will return the last written value even if it was uncommitted.
*/
def get(
key: Array[Byte],
colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Array[Byte] = {
verifyColFamilyOperations("get", colFamilyName)
db.get(colFamilyNameToHandleMap(colFamilyName), readOptions, key)
def get(key: Array[Byte]): Array[Byte] = {
db.get(readOptions, key)
}

/**
* Put the given value for the given key.
* @note This update is not committed to disk until commit() is called.
*/
def put(
key: Array[Byte],
value: Array[Byte],
colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = {
verifyColFamilyOperations("put", colFamilyName)
def put(key: Array[Byte], value: Array[Byte]): Unit = {
if (conf.trackTotalNumberOfRows) {
val oldValue = db.get(colFamilyNameToHandleMap(colFamilyName), readOptions, key)
val oldValue = db.get(readOptions, key)
if (oldValue == null) {
numKeysOnWritingVersion += 1
}
}

db.put(colFamilyNameToHandleMap(colFamilyName), writeOptions, key, value)
if (useColumnFamilies) {
changelogWriter.foreach(_.put(key, value, colFamilyName))
} else {
changelogWriter.foreach(_.put(key, value))
}
db.put(writeOptions, key, value)
changelogWriter.foreach(_.put(key, value))
}

/**
Expand All @@ -493,57 +365,40 @@ class RocksDB(
*
* @note This update is not committed to disk until commit() is called.
*/
def merge(
key: Array[Byte],
value: Array[Byte],
colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = {
if (!useColumnFamilies) {
throw StateStoreErrors.unsupportedOperationException("merge",
multColFamiliesDisabledStr)
}
verifyColFamilyOperations("merge", colFamilyName)
def merge(key: Array[Byte], value: Array[Byte]): Unit = {

if (conf.trackTotalNumberOfRows) {
val oldValue = db.get(colFamilyNameToHandleMap(colFamilyName), readOptions, key)
val oldValue = db.get(readOptions, key)
if (oldValue == null) {
numKeysOnWritingVersion += 1
}
}
db.merge(colFamilyNameToHandleMap(colFamilyName), writeOptions, key, value)
db.merge(writeOptions, key, value)

changelogWriter.foreach(_.merge(key, value, colFamilyName))
changelogWriter.foreach(_.merge(key, value))
}

/**
* Remove the key if present.
* @note This update is not committed to disk until commit() is called.
*/
def remove(
key: Array[Byte],
colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = {
verifyColFamilyOperations("remove", colFamilyName)
def remove(key: Array[Byte]): Unit = {
if (conf.trackTotalNumberOfRows) {
val value = db.get(colFamilyNameToHandleMap(colFamilyName), readOptions, key)
val value = db.get(readOptions, key)
if (value != null) {
numKeysOnWritingVersion -= 1
}
}
db.delete(colFamilyNameToHandleMap(colFamilyName), writeOptions, key)
if (useColumnFamilies) {
changelogWriter.foreach(_.delete(key, colFamilyName))
} else {
changelogWriter.foreach(_.delete(key))
}
db.delete(writeOptions, key)
changelogWriter.foreach(_.delete(key))
}

/**
* Get an iterator of all committed and uncommitted key-value pairs.
*/
def iterator(colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME):
Iterator[ByteArrayPair] = {
verifyColFamilyOperations("iterator", colFamilyName)
def iterator(): Iterator[ByteArrayPair] = {

val iter = db.newIterator(colFamilyNameToHandleMap(colFamilyName))
val iter = db.newIterator()
logInfo(log"Getting iterator from version ${MDC(LogKeys.LOADED_VERSION, loadedVersion)}")
iter.seekToFirst()

Expand All @@ -569,9 +424,8 @@ class RocksDB(
}
}

private def countKeys(colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Long = {
verifyColFamilyOperations("countKeys", colFamilyName)
val iter = db.newIterator(colFamilyNameToHandleMap(colFamilyName))
private def countKeys(): Long = {
val iter = db.newIterator()

try {
logInfo(log"Counting keys - getting iterator from version " +
Expand All @@ -591,10 +445,8 @@ class RocksDB(
}
}

def prefixScan(prefix: Array[Byte], colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME):
Iterator[ByteArrayPair] = {
verifyColFamilyOperations("prefixScan", colFamilyName)
val iter = db.newIterator(colFamilyNameToHandleMap(colFamilyName))
def prefixScan(prefix: Array[Byte]): Iterator[ByteArrayPair] = {
val iter = db.newIterator()
iter.seek(prefix)

// Attempt to close this iterator if there is a task failure, or a task interruption.
Expand Down Expand Up @@ -639,17 +491,13 @@ class RocksDB(
// because rocksdb wal is disabled.
logInfo(log"Flushing updates for ${MDC(LogKeys.VERSION_NUM, newVersion)}")
flushTimeMs = timeTakenMs {
// Flush updates to all available column families
assert(!colFamilyNameToHandleMap.isEmpty)
db.flush(flushOptions, colFamilyNameToHandleMap.values.toSeq.asJava)
db.flush(flushOptions)
}

if (conf.compactOnCommit) {
logInfo("Compacting")
compactTimeMs = timeTakenMs {
// Perform compaction on all available column families
assert(!colFamilyNameToHandleMap.isEmpty)
colFamilyNameToHandleMap.values.foreach(db.compactRange(_))
db.compactRange()
}
}

Expand Down Expand Up @@ -860,11 +708,6 @@ class RocksDB(
nativeStats.getTickerCount(typ)
}

// Used for metrics reporting around internal/external column families
val numInternalColFamilies = colFamilyNameToHandleMap
.keys.filter(checkInternalColumnFamilies(_)).size
val numExternalColFamilies = colFamilyNameToHandleMap.keys.size - numInternalColFamilies

// if bounded memory usage is enabled, we share the block cache across all state providers
// running on the same node and account the usage to this single cache. In this case, its not
// possible to provide partition level or query level memory usage.
Expand All @@ -886,8 +729,6 @@ class RocksDB(
filesCopied = fileManagerMetrics.filesCopied,
filesReused = fileManagerMetrics.filesReused,
zipFileBytesUncompressed = fileManagerMetrics.zipFileBytesUncompressed,
numExternalColFamilies = numExternalColFamilies,
numInternalColFamilies = numInternalColFamilies,
nativeOpsMetrics = nativeOpsMetrics)
}

Expand Down Expand Up @@ -959,47 +800,16 @@ class RocksDB(
acquireLock.notifyAll()
}

private def getDBProperty(property: String): Long = {
// get cumulative sum across all available column families
assert(!colFamilyNameToHandleMap.isEmpty)
colFamilyNameToHandleMap
.values
.map(handle => db.getProperty(handle, property).toLong)
.sum
}
private def getDBProperty(property: String): Long = db.getProperty(property).toLong

private def openDB(): Unit = {
assert(db == null)
val colFamilies = NativeRocksDB.listColumnFamilies(dbOptions, workingDir.toString)

val colFamilyDescriptors = new ArrayBuffer[ColumnFamilyDescriptor]
// populate the list of available col family descriptors
colFamilies.asScala.toList.foreach { family =>
val descriptor = new ColumnFamilyDescriptor(family, columnFamilyOptions)
colFamilyDescriptors += descriptor
}

if (colFamilyDescriptors.isEmpty) {
colFamilyDescriptors += new ColumnFamilyDescriptor(NativeRocksDB.DEFAULT_COLUMN_FAMILY,
columnFamilyOptions)
}

val colFamilyHandles = new java.util.ArrayList[ColumnFamilyHandle]()
db = NativeRocksDB.open(new DBOptions(dbOptions), workingDir.toString,
colFamilyDescriptors.asJava, colFamilyHandles)

// Store the mapping of names to handles in the internal map
colFamilyHandles.asScala.toList.foreach { handle =>
colFamilyNameToHandleMap(handle.getName.map(_.toChar).mkString) = handle
}
db = NativeRocksDB.open(dbOptions, workingDir.toString)
logInfo(log"Opened DB with conf ${MDC(LogKeys.CONFIG, conf)}")
}

private def closeDB(): Unit = {
if (db != null) {
// Close the column family handles in case multiple column families are used
colFamilyNameToHandleMap.values.map(handle => handle.close)
colFamilyNameToHandleMap.clear()

// Cancel and wait until all background work finishes
db.cancelAllBackgroundWork(true)
Expand Down Expand Up @@ -1298,8 +1108,6 @@ case class RocksDBMetrics(
bytesCopied: Long,
filesReused: Long,
zipFileBytesUncompressed: Option[Long],
numExternalColFamilies: Long,
numInternalColFamilies: Long,
nativeOpsMetrics: Map[String, Long]) {
def json: String = Serialization.write(this)(RocksDBMetrics.format)
}
Expand Down
Loading

0 comments on commit 0493594

Please sign in to comment.