Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel] Reduce deprecate Path class usage #3824

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ static void verifyDeltaVersions(

List<Long> commitVersions =
commitFiles.stream()
.map(fs -> FileNames.deltaVersion(new Path(fs.getPath())))
.map(fs -> FileNames.deltaVersion(fs.getPath()))
.collect(Collectors.toList());

for (int i = 1; i < commitVersions.size(); i++) {
Expand Down Expand Up @@ -280,7 +280,7 @@ private static List<FileStatus> listCommitFiles(
logger.debug("Ignoring non-commit file {}", fs.getPath());
continue;
}
if (FileNames.getFileVersion(new Path(fs.getPath())) > endVersion) {
if (FileNames.getFileVersion(fs.getPath()) > endVersion) {
logger.debug(
"Stopping listing found file {} with version > {}=endVersion",
fs.getPath(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public long getTimestamp(Engine engine) {
if (IN_COMMIT_TIMESTAMPS_ENABLED.fromMetadata(metadata)) {
if (!inCommitTimestampOpt.isPresent()) {
Optional<CommitInfo> commitInfoOpt =
CommitInfo.getCommitInfoOpt(engine, logPath, logSegment.version);
CommitInfo.getCommitInfoOpt(engine, logPath.toString(), logSegment.version);
inCommitTimestampOpt =
Optional.of(
CommitInfo.getRequiredInCommitTimestamp(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ public Transaction build(Engine engine) {

return new TransactionImpl(
isNewTable,
table.getDataPath(),
table.getLogPath(),
table.getDataPath().toString(),
table.getLogPath().toString(),
snapshot,
engineInfo,
operation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import io.delta.kernel.expressions.Column;
import io.delta.kernel.internal.actions.*;
import io.delta.kernel.internal.data.TransactionStateRow;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.replay.ConflictChecker;
import io.delta.kernel.internal.replay.ConflictChecker.TransactionRebaseState;
import io.delta.kernel.internal.util.Clock;
Expand Down Expand Up @@ -66,8 +65,8 @@ public class TransactionImpl implements Transaction {
private final boolean isNewTable; // the transaction is creating a new table
private final String engineInfo;
private final Operation operation;
private final Path dataPath;
private final Path logPath;
private final String dataPath;
private final String logPath;
private final Protocol protocol;
private final SnapshotImpl readSnapshot;
private final Optional<SetTransaction> setTxnOpt;
Expand All @@ -80,8 +79,8 @@ public class TransactionImpl implements Transaction {

public TransactionImpl(
boolean isNewTable,
Path dataPath,
Path logPath,
String dataPath,
String logPath,
SnapshotImpl readSnapshot,
String engineInfo,
Operation operation,
Expand All @@ -107,7 +106,7 @@ public TransactionImpl(

@Override
public Row getTransactionState(Engine engine) {
return TransactionStateRow.of(metadata, dataPath.toString());
return TransactionStateRow.of(metadata, dataPath);
}

@Override
Expand Down Expand Up @@ -230,7 +229,7 @@ private TransactionCommitResult doCommit(
if (commitAsVersion == 0) {
// New table, create a delta log directory
if (!wrapEngineExceptionThrowsIO(
() -> engine.getFileSystemClient().mkdirs(logPath.toString()),
() -> engine.getFileSystemClient().mkdirs(logPath),
"Creating directories for path %s",
logPath)) {
throw new RuntimeException("Failed to create delta log directory: " + logPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public static long getRequiredInCommitTimestamp(
}

/** Get the persisted commit info (if available) for the given delta file. */
public static Optional<CommitInfo> getCommitInfoOpt(Engine engine, Path logPath, long version) {
public static Optional<CommitInfo> getCommitInfoOpt(Engine engine, String logPath, long version) {
final FileStatus file =
FileStatus.of(
FileNames.deltaFile(logPath, version), /* path */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ public ColumnarBatch extractSidecarsFromBatch(
}
FileStatus sideCarFileStatus =
FileStatus.of(
FileNames.sidecarFile(deltaLogPath, sidecarFile.getPath()),
FileNames.sidecarFile(deltaLogPath.toString(), sidecarFile.getPath()),
sidecarFile.getSizeInBytes(),
sidecarFile.getModificationTime());

Expand All @@ -297,7 +297,7 @@ private CloseableIterator<ActionWrapper> getNextActionsIter() {
switch (nextLogFile.getLogType()) {
case COMMIT:
{
final long fileVersion = FileNames.deltaVersion(nextFilePath);
final long fileVersion = FileNames.deltaVersion(nextFilePath.toString());
// We can not read multiple JSON files in parallel (like the checkpoint files),
// because each one has a different version, and we need to associate the
// version with actions read from the JSON file for further optimizations later
Expand Down Expand Up @@ -328,7 +328,7 @@ private CloseableIterator<ActionWrapper> getNextActionsIter() {
// parts of the current multipart checkpoint.
CloseableIterator<ColumnarBatch> dataIter =
getActionsIterFromSinglePartOrV2Checkpoint(nextFile, fileName);
long version = checkpointVersion(nextFilePath);
long version = checkpointVersion(nextFilePath.toString());
return combine(dataIter, true /* isFromCheckpoint */, version, Optional.empty());
}
case MULTIPART_CHECKPOINT:
Expand All @@ -350,7 +350,7 @@ private CloseableIterator<ActionWrapper> getNextActionsIter() {
readSchema,
checkpointPredicate);

long version = checkpointVersion(nextFilePath);
long version = checkpointVersion(nextFilePath.toString());
return combine(dataIter, true /* isFromCheckpoint */, version, Optional.empty());
}
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ private void handleTxn(ColumnVector txnVector) {

private List<FileStatus> getWinningCommitFiles(Engine engine) {
String firstWinningCommitFile =
deltaFile(snapshot.getLogPath(), snapshot.getVersion(engine) + 1);
deltaFile(snapshot.getLogPath().toString(), snapshot.getVersion(engine) + 1);

try (CloseableIterator<FileStatus> files =
wrapEngineExceptionThrowsIO(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ protected final Optional<List<FileStatus>> listDeltaAndCheckpointFiles(
// Take files until the version we want to load
final boolean versionWithinRange =
versionToLoad
.map(v -> FileNames.getFileVersion(new Path(fileStatus.getPath())) <= v)
.map(v -> FileNames.getFileVersion(fileStatus.getPath()) <= v)
.orElse(true);

if (!versionWithinRange) {
Expand Down Expand Up @@ -737,7 +737,7 @@ protected Optional<LogSegment> getLogSegmentAtOrBeforeVersion(
versionToLoadOpt.orElseGet(
() -> {
final FileStatus lastDelta = deltas.get(deltas.size() - 1);
return FileNames.deltaVersion(new Path(lastDelta.getPath()));
return FileNames.deltaVersion(lastDelta.getPath());
});

return getLogSegmentWithMaxExclusiveCheckpointVersion(
Expand All @@ -762,8 +762,7 @@ protected Optional<LogSegment> getLogSegmentAtOrBeforeVersion(
final List<FileStatus> deltasAfterCheckpoint =
deltas.stream()
.filter(
fileStatus ->
FileNames.deltaVersion(new Path(fileStatus.getPath())) > newCheckpointVersion)
fileStatus -> FileNames.deltaVersion(fileStatus.getPath()) > newCheckpointVersion)
.collect(Collectors.toList());

logDebug(
Expand All @@ -777,7 +776,7 @@ protected Optional<LogSegment> getLogSegmentAtOrBeforeVersion(

final LinkedList<Long> deltaVersionsAfterCheckpoint =
deltasAfterCheckpoint.stream()
.map(fileStatus -> FileNames.deltaVersion(new Path(fileStatus.getPath())))
.map(fileStatus -> FileNames.deltaVersion(fileStatus.getPath()))
.collect(Collectors.toCollection(LinkedList::new));

logDebug(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package io.delta.kernel.internal.util;

import static io.delta.kernel.internal.fs.Path.getName;

import io.delta.kernel.internal.fs.Path;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -45,34 +47,26 @@ private FileNames() {}
public static final String SIDECAR_DIRECTORY = "_sidecars";

/** Returns the delta (json format) path for a given delta file. */
public static String deltaFile(Path path, long version) {
public static String deltaFile(String path, long version) {
return String.format("%s/%020d.json", path, version);
}

/** Returns the version for the given delta path. */
public static long deltaVersion(Path path) {
return Long.parseLong(path.getName().split("\\.")[0]);
}

public static long deltaVersion(String path) {
final int slashIdx = path.lastIndexOf(Path.SEPARATOR);
final String name = path.substring(slashIdx + 1);
return Long.parseLong(name.split("\\.")[0]);
}

/** Returns the version for the given checkpoint path. */
public static long checkpointVersion(Path path) {
return Long.parseLong(path.getName().split("\\.")[0]);
}

public static long checkpointVersion(String path) {
final int slashIdx = path.lastIndexOf(Path.SEPARATOR);
final String name = path.substring(slashIdx + 1);
return Long.parseLong(name.split("\\.")[0]);
}

public static String sidecarFile(Path path, String sidecar) {
return String.format("%s/%s/%s", path.toString(), SIDECAR_DIRECTORY, sidecar);
public static String sidecarFile(String path, String sidecar) {
return String.format("%s/%s/%s", path, SIDECAR_DIRECTORY, sidecar);
}

/**
Expand Down Expand Up @@ -105,8 +99,8 @@ public static Path topLevelV2CheckpointFile(
}

/** Returns the path for a V2 sidecar file with a given UUID. */
public static Path v2CheckpointSidecarFile(Path path, String uuid) {
return new Path(String.format("%s/_sidecars/%s.parquet", path.toString(), uuid));
public static String v2CheckpointSidecarFile(String path, String uuid) {
return String.format("%s/_sidecars/%s.parquet", path, uuid);
}

/**
Expand All @@ -128,8 +122,9 @@ public static List<Path> checkpointFileWithParts(Path path, long version, int nu
return output;
}

public static boolean isCheckpointFile(String fileName) {
return CHECKPOINT_FILE_PATTERN.matcher(new Path(fileName).getName()).matches();
public static boolean isCheckpointFile(String path) {
String fileName = getName(path);
return CHECKPOINT_FILE_PATTERN.matcher(fileName).matches();
}

public static boolean isClassicCheckpointFile(String fileName) {
Expand All @@ -144,10 +139,10 @@ public static boolean isV2CheckpointFile(String fileName) {
return V2_CHECKPOINT_FILE_PATTERN.matcher(fileName).matches();
}

public static boolean isCommitFile(String fileName) {
String filename = new Path(fileName).getName();
return DELTA_FILE_PATTERN.matcher(filename).matches()
|| UUID_DELTA_FILE_REGEX.matcher(filename).matches();
public static boolean isCommitFile(String path) {
String fileName = getName(path);
return DELTA_FILE_PATTERN.matcher(fileName).matches()
|| UUID_DELTA_FILE_REGEX.matcher(fileName).matches();
}

/**
Expand All @@ -156,10 +151,11 @@ public static boolean isCommitFile(String fileName) {
* compatibility in cases where new file types are added, but without an explicit protocol
* upgrade.
*/
public static long getFileVersion(Path path) {
if (isCheckpointFile(path.getName())) {
public static long getFileVersion(String path) {
String fileName = getName(path);
if (isCheckpointFile(fileName)) {
return checkpointVersion(path);
} else if (isCommitFile(path.getName())) {
} else if (isCommitFile(fileName)) {
return deltaVersion(path);
// } else if (isChecksumFile(path)) {
// checksumVersion(path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class DeltaLogActionUtilsSuite extends AnyFunSuite with MockFileSystemClientUtil

def getCommitFiles(versions: Seq[Long]): java.util.List[FileStatus] = {
versions
.map(v => FileStatus.of(FileNames.deltaFile(logPath, v), 0, 0))
.map(v => FileStatus.of(FileNames.deltaFile(logPath.toString, v), 0, 0))
.asJava
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils {
Optional.empty(),
versionToLoad,
Optional.of(
new MockTableCommitCoordinatorClientHandler(logPath, unbackfilledDeltaVersions))
new MockTableCommitCoordinatorClientHandler(logPath.toString, unbackfilledDeltaVersions))
)
assert(logSegmentOpt.isPresent())

Expand Down Expand Up @@ -907,7 +907,7 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils {
expectedErrorMessageContains = errMsg,
tableCommitCoordinatorClientHandlerOpt =
Optional.of(new MockTableCommitCoordinatorClientHandler(
logPath, e = new RuntimeException(errMsg)))
logPath.toString, e = new RuntimeException(errMsg)))
)
}
}
Expand Down Expand Up @@ -951,7 +951,7 @@ class MockSidecarJsonHandler(sidecars: Seq[FileStatus])
}

class MockTableCommitCoordinatorClientHandler(
logPath: Path, versions: Seq[Long] = Seq.empty, e: Throwable = null)
logPath: String, versions: Seq[Long] = Seq.empty, e: Throwable = null)
extends TableCommitCoordinatorClientHandler(null, null, null) {
override def getCommits(
startVersion: javaLang.Long, endVersion: javaLang.Long): GetCommitsResponse = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ class FileNamesSuite extends AnyFunSuite {
}

test("checkpointVersion") {
assert(checkpointVersion(new Path("/a/123.checkpoint.parquet")) == 123)
assert(checkpointVersion(new Path("/a/0.checkpoint.parquet")) == 0)
assert(checkpointVersion("/a/123.checkpoint.parquet") == 123)
assert(checkpointVersion("/a/0.checkpoint.parquet") == 0)
assert(checkpointVersion(
new Path("/a/00000000000000000151.checkpoint.parquet")) == 151)
"/a/00000000000000000151.checkpoint.parquet") == 151)
assert(checkpointVersion(
new Path("/a/999.checkpoint.0000000090.0000000099.parquet")) == 999)
"/a/999.checkpoint.0000000090.0000000099.parquet") == 999)
assert(checkpointVersion("/a/000000010.checkpoint.80a083e8-7026.json") == 10)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ trait MockFileSystemClientUtils extends MockEngineUtils {
/** Delta file statuses where the timestamp = 10*version */
def deltaFileStatuses(deltaVersions: Seq[Long]): Seq[FileStatus] = {
assert(deltaVersions.size == deltaVersions.toSet.size)
deltaVersions.map(v => FileStatus.of(FileNames.deltaFile(logPath, v), v, v*10))
deltaVersions.map(v => FileStatus.of(FileNames.deltaFile(logPath.toString, v), v, v*10))
}

/** Checkpoint file statuses where the timestamp = 10*version */
Expand Down Expand Up @@ -79,7 +79,7 @@ trait MockFileSystemClientUtils extends MockEngineUtils {
}
val sidecars = (0 until numSidecars).map { _ =>
FileStatus.of(
FileNames.v2CheckpointSidecarFile(logPath, UUID.randomUUID().toString).toString,
FileNames.v2CheckpointSidecarFile(logPath.toString, UUID.randomUUID().toString),
v, v * 10)
}
(topLevelFile, sidecars)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,7 @@ class DeltaTableReadsSuite extends AnyFunSuite with TestUtils {
private def generateCommits(path: String, commits: Long*): Unit = {
commits.zipWithIndex.foreach { case (ts, i) =>
spark.range(i*10, i*10 + 10).write.format("delta").mode("append").save(path)
val file = new File(FileNames.deltaFile(new Path(path, "_delta_log"), i))
val file = new File(FileNames.deltaFile(path + "/_delta_log", i))
file.setLastModified(ts)
}
}
Expand Down Expand Up @@ -974,7 +974,7 @@ class DeltaTableReadsSuite extends AnyFunSuite with TestUtils {
}

// Setup part 2 of 2: edit lastModified times
val logPath = new Path(dir.getCanonicalPath, "_delta_log")
val logPath = dir.getCanonicalPath + "/_delta_log"

val delta0 = new File(FileNames.deltaFile(logPath, 0))
val delta1 = new File(FileNames.deltaFile(logPath, 1))
Expand Down Expand Up @@ -1039,7 +1039,7 @@ class DeltaTableReadsSuite extends AnyFunSuite with TestUtils {
}

(0 to 35).foreach { i =>
val delta = new File(FileNames.deltaFile(logPath, i))
val delta = new File(FileNames.deltaFile(logPath.toString, i))
if (i >= 25) {
delta.setLastModified(nowEpochMs + i * 1000)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ trait DeltaTableWriteSuiteBase extends AnyFunSuite with TestUtils {
tablePath: String,
version: Long, consumer: Row => Option[Any]): Option[Any] = {
val table = Table.forPath(engine, tablePath)
val logPath = new DeltaPath(table.getPath(engine), "_delta_log")
val logPath = table.getPath(engine) + "/_delta_log"
val file = FileStatus.of(FileNames.deltaFile(logPath, version), 0, 0)
val columnarBatches = engine.getJsonHandler.readJsonFiles(
singletonCloseableIterator(file),
Expand Down
Loading
Loading