Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Commit

Permalink
Add a new behavior of the vacuum index (#479)
Browse files Browse the repository at this point in the history
  • Loading branch information
paryoja authored Nov 4, 2021
1 parent 661df17 commit 2f8d32b
Show file tree
Hide file tree
Showing 15 changed files with 531 additions and 34 deletions.
27 changes: 17 additions & 10 deletions python/hyperspace/tests/test_indexmanagement.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,44 +38,50 @@ def test_index_delete(self):
idx_config = IndexConfig('idx2', ['name'], ['age'])
self.hyperspace.createIndex(self.df, idx_config)
self.assertEqual(self.hyperspace.indexes().filter(
"""name = "idx2" and state = "ACTIVE" """).count(), 1)
"""name = "idx2" and state = "ACTIVE" """).count(), 1)
self.assertEqual(self.hyperspace.indexes().filter(
"""name = "idx2" and state = "DELETED" """).count(), 0)
"""name = "idx2" and state = "DELETED" """).count(), 0)
self.hyperspace.deleteIndex("idx2")
self.assertEqual(self.hyperspace.indexes().filter(
"""name = "idx2" and state = "DELETED" """).count(), 1)
"""name = "idx2" and state = "DELETED" """).count(), 1)
self.assertEqual(self.hyperspace.indexes().filter(
"""name = "idx2" and state = "ACTIVE" """).count(), 0)
"""name = "idx2" and state = "ACTIVE" """).count(), 0)

def test_index_restore(self):
idx_config = IndexConfig('idx3', ['name'], ['age'])
self.hyperspace.createIndex(self.df, idx_config)
self.hyperspace.deleteIndex("idx3")
self.assertEqual(self.hyperspace.indexes().filter(
"""name = "idx3" and state = "DELETED" """).count(), 1)
"""name = "idx3" and state = "DELETED" """).count(), 1)
self.hyperspace.restoreIndex("idx3")
self.assertEqual(self.hyperspace.indexes().filter(
"""name = "idx3" and state = "ACTIVE" """).count(), 1)
"""name = "idx3" and state = "ACTIVE" """).count(), 1)
self.assertEqual(self.hyperspace.indexes().filter(
"""name = "idx3" and state = "DELETED" """).count(), 0)
"""name = "idx3" and state = "DELETED" """).count(), 0)

def test_index_vacuum(self):
idx_config = IndexConfig('idx4', ['name'], ['age'])
self.hyperspace.createIndex(self.df, idx_config)
self.hyperspace.deleteIndex("idx4")
self.assertEqual(self.hyperspace.indexes().filter(
"""name = "idx4" and state = "DELETED" """).count(), 1)
"""name = "idx4" and state = "DELETED" """).count(), 1)
self.hyperspace.vacuumIndex("idx4")
self.assertEqual(self.hyperspace.indexes().filter("""name = "idx4" """).count(), 0)

def test_index_refresh(self):
# vacuuming of active index leaves the index as active
idx_config = IndexConfig('idx5', ['name'], ['age'])
self.hyperspace.createIndex(self.df, idx_config)
self.assertEqual(self.hyperspace.indexes().filter(
"""name = "idx5" and state = "ACTIVE" """).count(), 1)

def test_index_refresh_incremental(self):
idx_config = IndexConfig('idx1', ['name'], ['age'])
self.hyperspace.createIndex(self.df, idx_config)
# Test the inter-op works fine for refreshIndex.
self.hyperspace.refreshIndex('idx1')
self.hyperspace.refreshIndex('idx1', 'incremental')

def test_index_refresh(self):
def test_index_refresh_full(self):
idx_config = IndexConfig('idx1', ['name'], ['age'])
self.hyperspace.createIndex(self.df, idx_config)
# Test the inter-op works fine for optimizeIndex.
Expand All @@ -89,6 +95,7 @@ def test_index_metadata(self):
df = self.hyperspace.index('idx1')
df.show()


hyperspace_test = unittest.TestLoader().loadTestsFromTestCase(HyperspaceIndexManagementTests)
result = unittest.TextTestRunner(verbosity=3).run(hyperspace_test)
sys.exit(not result.wasSuccessful())
5 changes: 3 additions & 2 deletions src/main/scala/com/microsoft/hyperspace/Hyperspace.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,10 @@ class Hyperspace(spark: SparkSession) {
}

/**
* Does hard delete of indexes marked as `DELETED`.
* Does hard delete of the entire indexes if it is marked as `DELETED`.
* Does clean up index (hard delete of the old indexes) if the index is 'ACTIVE'.
*
* @param indexName Name of the index to restore.
* @param indexName Name of the index to vacuum.
*/
def vacuumIndex(indexName: String): Unit = {
indexManager.vacuum(indexName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ object Constants {
val DELETED = "DELETED"
val REFRESHING = "REFRESHING"
val VACUUMING = "VACUUMING"
val VACUUMINGOUTDATED = "VACUUMINGOUTDATED"
val RESTORING = "RESTORING"
val OPTIMIZING = "OPTIMIZING"
val DOESNOTEXIST = "DOESNOTEXIST"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Copyright (2020) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.actions

import org.apache.hadoop.fs.Path

import com.microsoft.hyperspace.{Hyperspace, HyperspaceException}
import com.microsoft.hyperspace.actions.Constants.States.{ACTIVE, VACUUMINGOUTDATED}
import com.microsoft.hyperspace.index.{IndexConstants, IndexDataManager, IndexLogEntry, IndexLogManager, LogEntry}
import com.microsoft.hyperspace.index.sources.delta.DeltaLakeRelationMetadata
import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEvent, VacuumOutdatedActionEvent}
import com.microsoft.hyperspace.util.FileUtils

/**
* Vacuum outdated data of indexes.
*
* Algorithm:
* - Delete every version except the latest versions.
*/
class VacuumOutdatedAction(
final override protected val logManager: IndexLogManager,
dataManager: IndexDataManager)
extends Action {
private lazy val previousIndexLogEntry = {
logManager.getLog(baseId) match {
case Some(e: IndexLogEntry) => e
case _ =>
throw HyperspaceException("LogEntry must exist for vacuum outdated operation.")
}
}

final override lazy val logEntry: LogEntry = {
previousIndexLogEntry.relations match {
case null => previousIndexLogEntry

case relations if relations.nonEmpty =>
val relationMetadata = Hyperspace
.getContext(spark)
.sourceProviderManager
.getRelationMetadata(relations.head)

val updatedDerivedDataset = relationMetadata match {
case deltaLakeRelationMetadata: DeltaLakeRelationMetadata =>
// Reset Delta Lake version mapping.
val resetProperty = deltaLakeRelationMetadata.resetDeltaVersionHistory(
previousIndexLogEntry.derivedDataset.properties)

val newProperty = deltaLakeRelationMetadata.enrichIndexProperties(
resetProperty + (IndexConstants.INDEX_LOG_VERSION -> endId.toString))

previousIndexLogEntry.derivedDataset.withNewProperties(newProperty)
case _ => previousIndexLogEntry.derivedDataset
}
previousIndexLogEntry.copy(derivedDataset = updatedDerivedDataset)

case _ => previousIndexLogEntry
}
}

override def transientState: String = VACUUMINGOUTDATED

override def finalState: String = ACTIVE

override def validate(): Unit = {
if (!previousIndexLogEntry.state.equalsIgnoreCase(ACTIVE)) {
throw HyperspaceException(
s"VacuumOutdated is only supported in $ACTIVE state. " +
s"Current state is ${previousIndexLogEntry.state}.")
}
}

final override def op(): Unit = {
// Delete unused directory first, then delete unused files in used directories.
val indexVersionsInUse: Set[Int] = logEntry match {
case indexLogEntry: IndexLogEntry =>
dataVersionInfos(indexLogEntry)

case other =>
throw HyperspaceException(
s"VacuumOutdated is not supported for log entry class ${other.getClass.getName}")
}

// Delete version directories not used.
dataManager.getAllVersionIds().foreach { id =>
if (!indexVersionsInUse.contains(id)) {
dataManager.delete(id)
}
}

val filesInUse = logEntry match {
case indexLogEntry: IndexLogEntry =>
indexLogEntry.content.fileInfos.map { info =>
info.name
}
}

// Delete unused files.
dataManager.getAllFilePaths().foreach { path =>
// Ignore files such as "_SUCCESS" and "._SUCCESS.crc".
if (!path.getName.startsWith("_") &&
!path.getName.startsWith("._") &&
!filesInUse.contains(path.toString)) {
FileUtils.delete(path)
}
}
}

/**
* Extracts latest versions of an index.
*
* @return List of directory paths containing index files for latest index version.
*/
private[actions] def dataVersionInfos(entry: IndexLogEntry): Set[Int] = {
// Get used versions using the filenames of contents.
// length + 1 due to '=' between prefix and version number.
val prefixLength = IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX.length + 1
entry
.indexDataDirectoryPaths()
.map(dirname => new Path(dirname).getName)
.collect {
case name if name.startsWith(IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX) =>
name.drop(prefixLength).toInt
}
.toSet
}

override protected def event(appInfo: AppInfo, message: String): HyperspaceEvent = {
VacuumOutdatedActionEvent(appInfo, logEntry.asInstanceOf[IndexLogEntry], message)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.sql.internal.SQLConf

import com.microsoft.hyperspace.HyperspaceException
import com.microsoft.hyperspace.actions._
import com.microsoft.hyperspace.actions.Constants.States.DOESNOTEXIST
import com.microsoft.hyperspace.actions.Constants.States.{ACTIVE, DOESNOTEXIST}
import com.microsoft.hyperspace.index.IndexConstants.{REFRESH_MODE_FULL, REFRESH_MODE_INCREMENTAL, REFRESH_MODE_QUICK}

class IndexCollectionManager(
Expand Down Expand Up @@ -60,13 +60,23 @@ class IndexCollectionManager(
}

override def vacuum(indexName: String): Unit = {
// Note that the behavior of vacuum index is different when the state is ACTIVE.
// The event that action creates is also different.

withLogManager(indexName) { logManager =>
val hadoopConf = spark.sessionState.newHadoopConf()
val indexPath = PathResolver(spark.sessionState.conf, hadoopConf)
.getIndexPath(indexName)
val dataManager =
indexDataManagerFactory.create(indexPath, hadoopConf)
new VacuumAction(logManager, dataManager).run()

logManager.getLatestLog() match {
case Some(index) if index.state == ACTIVE =>
// clean up only if state is ACTIVE.
new VacuumOutdatedAction(logManager, dataManager).run()
case _ =>
new VacuumAction(logManager, dataManager).run()
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,12 @@ import com.microsoft.hyperspace.util.FileUtils
* f1.parquet
*/
trait IndexDataManager {
def getAllFilePaths(): Seq[Path]

def getLatestVersionId(): Option[Int]

def getAllVersionIds(): Seq[Int]

def getPath(id: Int): Path

def delete(id: Int): Unit
Expand All @@ -48,24 +52,54 @@ class IndexDataManagerImpl(indexPath: Path, configuration: Configuration)
// TODO: Investigate whether FileContext should be used instead of FileSystem for atomic renames.
private lazy val fs: FileSystem = indexPath.getFileSystem(configuration)

/**
* Get latest version id of the index data directory.
*/
override def getLatestVersionId(): Option[Int] = {
val ids = getAllVersionIds()
if (ids.isEmpty) None else Some(ids.max)
}

/**
* This method relies on the naming convention that directory name will be similar to hive
* partitioning scheme, i.e. "root/v__=value/f1.parquet" etc. Here the value represents the
* partitioning scheme, i.e. {{{"root/v__=value/f1.parquet"}}} etc. Here the value represents the
* version id of the data.
*/
override def getLatestVersionId(): Option[Int] = {
override def getAllVersionIds(): Seq[Int] = {
if (!fs.exists(indexPath)) {
return None
return Nil
}
val prefixLength = IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX.length + 1
val ids = fs.listStatus(indexPath).collect {
fs.listStatus(indexPath)
.collect {
case status
if status.getPath.getName.startsWith(IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX) =>
status.getPath.getName.drop(prefixLength).toInt
}
}

/**
* Get all file paths in the index directory.
*/
override def getAllFilePaths(): Seq[Path] = {
if (!fs.exists(indexPath)) {
return Nil
}
val directories = fs.listStatus(indexPath).collect {
case status
if status.getPath.getName.startsWith(IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX) =>
status.getPath.getName.drop(prefixLength).toInt
status.getPath
}
directories.flatMap { dir =>
fs.listStatus(dir).collect {
case status => status.getPath
}
}
if (ids.isEmpty) None else Some(ids.max)
}

/**
* Get directory path of the given id.
*/
override def getPath(id: Int): Path = {
new Path(indexPath, s"${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=${id.toString}")
}
Expand Down
18 changes: 18 additions & 0 deletions src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,24 @@ case class IndexLogEntry(
(name, derivedDataset, signature, content).hashCode
}

/**
* Extracts paths to top-level directories paths which
* contain the latest version index files.
*
* @return List of directory paths containing index files for latest index version.
*/
def indexDataDirectoryPaths(): Seq[String] = {
var prefix = content.root.name
var directory = content.root
while (directory.subDirs.size == 1 &&
!directory.subDirs.head.name.startsWith(IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX)) {
prefix += s"${directory.subDirs.head.name}/"
directory = directory.subDirs.head
}

directory.subDirs.map(d => s"$prefix${d.name}")
}

/**
* A mutable map for holding auxiliary information of this index log entry while applying rules.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ trait IndexManager {
def restore(indexName: String): Unit

/**
* Does hard delete of indexes marked as `DELETED`. Once vacuumed, an index can't be 'restore'd.
* If the index is marked as `DELETED`, does hard delete of indexes while does
* If it is 'ACTIVE', does clean up of indexes (hard delete of unused index files).
* Once vacuumed, hard deleted index files can't be 'restore'd.
*
* @param indexName Name of the index to vacuum.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,15 +139,7 @@ private[hyperspace] object IndexStatistics {
* @return List of directory paths containing index files for latest index version.
*/
private def getIndexContentDirectoryPaths(entry: IndexLogEntry): Seq[String] = {
var root = entry.content.root
var prefix = entry.content.root.name
while (root.subDirs.size == 1 &&
!root.subDirs.head.name.startsWith(IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX)) {
prefix += s"${root.subDirs.head.name}/"
root = root.subDirs.head
}

root.subDirs.map(d => s"$prefix${d.name}")
entry.indexDataDirectoryPaths()
}

/**
Expand Down
Loading

0 comments on commit 2f8d32b

Please sign in to comment.